Skip to main content

fakecloud_dynamodb/service/
mod.rs

1mod batch;
2mod global_tables;
3mod items;
4mod queries;
5mod streams;
6mod tables;
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use base64::Engine;
13use http::StatusCode;
14use serde_json::{json, Value};
15
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
18
19use fakecloud_persistence::S3Store;
20use fakecloud_s3::state::SharedS3State;
21
22use crate::state::{
23    attribute_type_and_value, AttributeDefinition, AttributeValue, DynamoTable,
24    GlobalSecondaryIndex, KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection,
25    ProvisionedThroughput, SharedDynamoDbState,
26};
27
28/// Minimal subset of a ``DynamoTable`` that Kinesis streaming delivery needs.
29///
30/// A table can carry megabytes of items; cloning the whole table just to
31/// release the write lock and deliver one change record is extremely wasteful.
32/// Extracting only the fields the delivery path actually reads (destinations,
33/// arn, name) keeps the clone small.
34pub(super) struct KinesisDeliveryTarget {
35    pub destinations: Vec<KinesisDestination>,
36    pub arn: String,
37    pub name: String,
38}
39
40pub struct DynamoDbService {
41    state: SharedDynamoDbState,
42    pub(crate) s3_state: Option<SharedS3State>,
43    pub(crate) s3_store: Option<Arc<dyn S3Store>>,
44    delivery: Option<Arc<DeliveryBus>>,
45}
46
47impl DynamoDbService {
48    pub fn new(state: SharedDynamoDbState) -> Self {
49        Self {
50            state,
51            s3_state: None,
52            s3_store: None,
53            delivery: None,
54        }
55    }
56
57    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
58        self.s3_state = Some(s3_state);
59        self
60    }
61
62    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
63        self.s3_store = Some(store);
64        self
65    }
66
67    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
68        self.delivery = Some(delivery);
69        self
70    }
71
72    fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
73        if table
74            .kinesis_destinations
75            .iter()
76            .any(|d| d.destination_status == "ACTIVE")
77        {
78            Some(KinesisDeliveryTarget {
79                destinations: table.kinesis_destinations.clone(),
80                arn: table.arn.clone(),
81                name: table.name.clone(),
82            })
83        } else {
84            None
85        }
86    }
87
88    /// Deliver a change record to all active Kinesis streaming destinations for a table.
89    pub(super) fn deliver_to_kinesis_destinations(
90        &self,
91        target: &KinesisDeliveryTarget,
92        event_name: &str,
93        keys: &HashMap<String, AttributeValue>,
94        old_image: Option<&HashMap<String, AttributeValue>>,
95        new_image: Option<&HashMap<String, AttributeValue>>,
96    ) {
97        let delivery = match &self.delivery {
98            Some(d) => d,
99            None => return,
100        };
101
102        let active_destinations: Vec<_> = target
103            .destinations
104            .iter()
105            .filter(|d| d.destination_status == "ACTIVE")
106            .collect();
107
108        if active_destinations.is_empty() {
109            return;
110        }
111
112        let mut record = json!({
113            "eventID": uuid::Uuid::new_v4().to_string(),
114            "eventName": event_name,
115            "eventVersion": "1.1",
116            "eventSource": "aws:dynamodb",
117            "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
118            "dynamodb": {
119                "Keys": keys,
120                "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
121                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
122                "StreamViewType": "NEW_AND_OLD_IMAGES",
123            },
124            "eventSourceARN": &target.arn,
125            "tableName": &target.name,
126        });
127
128        if let Some(old) = old_image {
129            record["dynamodb"]["OldImage"] = json!(old);
130        }
131        if let Some(new) = new_image {
132            record["dynamodb"]["NewImage"] = json!(new);
133        }
134
135        let record_str = serde_json::to_string(&record).unwrap_or_default();
136        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
137        let partition_key = serde_json::to_string(keys).unwrap_or_default();
138
139        for dest in active_destinations {
140            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
141        }
142    }
143
144    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
145        serde_json::from_slice(&req.body).map_err(|e| {
146            AwsServiceError::aws_error(
147                StatusCode::BAD_REQUEST,
148                "SerializationException",
149                format!("Invalid JSON: {e}"),
150            )
151        })
152    }
153
154    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
155        Ok(AwsResponse::ok_json(body))
156    }
157}
158
159#[async_trait]
160impl AwsService for DynamoDbService {
161    fn service_name(&self) -> &str {
162        "dynamodb"
163    }
164
165    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
166        match req.action.as_str() {
167            "CreateTable" => self.create_table(&req),
168            "DeleteTable" => self.delete_table(&req),
169            "DescribeTable" => self.describe_table(&req),
170            "ListTables" => self.list_tables(&req),
171            "UpdateTable" => self.update_table(&req),
172            "PutItem" => self.put_item(&req),
173            "GetItem" => self.get_item(&req),
174            "DeleteItem" => self.delete_item(&req),
175            "UpdateItem" => self.update_item(&req),
176            "Query" => self.query(&req),
177            "Scan" => self.scan(&req),
178            "BatchGetItem" => self.batch_get_item(&req),
179            "BatchWriteItem" => self.batch_write_item(&req),
180            "TagResource" => self.tag_resource(&req),
181            "UntagResource" => self.untag_resource(&req),
182            "ListTagsOfResource" => self.list_tags_of_resource(&req),
183            "TransactGetItems" => self.transact_get_items(&req),
184            "TransactWriteItems" => self.transact_write_items(&req),
185            "ExecuteStatement" => self.execute_statement(&req),
186            "BatchExecuteStatement" => self.batch_execute_statement(&req),
187            "ExecuteTransaction" => self.execute_transaction(&req),
188            "UpdateTimeToLive" => self.update_time_to_live(&req),
189            "DescribeTimeToLive" => self.describe_time_to_live(&req),
190            "PutResourcePolicy" => self.put_resource_policy(&req),
191            "GetResourcePolicy" => self.get_resource_policy(&req),
192            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
193            // Stubs
194            "DescribeEndpoints" => self.describe_endpoints(&req),
195            "DescribeLimits" => self.describe_limits(&req),
196            // Backups
197            "CreateBackup" => self.create_backup(&req),
198            "DeleteBackup" => self.delete_backup(&req),
199            "DescribeBackup" => self.describe_backup(&req),
200            "ListBackups" => self.list_backups(&req),
201            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
202            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
203            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
204            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
205            // Global tables
206            "CreateGlobalTable" => self.create_global_table(&req),
207            "DescribeGlobalTable" => self.describe_global_table(&req),
208            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
209            "ListGlobalTables" => self.list_global_tables(&req),
210            "UpdateGlobalTable" => self.update_global_table(&req),
211            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
212            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
213            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
214            // Kinesis streaming
215            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
216            "DisableKinesisStreamingDestination" => {
217                self.disable_kinesis_streaming_destination(&req)
218            }
219            "DescribeKinesisStreamingDestination" => {
220                self.describe_kinesis_streaming_destination(&req)
221            }
222            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
223            // Contributor insights
224            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
225            "UpdateContributorInsights" => self.update_contributor_insights(&req),
226            "ListContributorInsights" => self.list_contributor_insights(&req),
227            // Import/Export
228            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
229            "DescribeExport" => self.describe_export(&req),
230            "ListExports" => self.list_exports(&req),
231            "ImportTable" => self.import_table(&req),
232            "DescribeImport" => self.describe_import(&req),
233            "ListImports" => self.list_imports(&req),
234            _ => Err(AwsServiceError::action_not_implemented(
235                "dynamodb",
236                &req.action,
237            )),
238        }
239    }
240
241    fn supported_actions(&self) -> &[&str] {
242        &[
243            "CreateTable",
244            "DeleteTable",
245            "DescribeTable",
246            "ListTables",
247            "UpdateTable",
248            "PutItem",
249            "GetItem",
250            "DeleteItem",
251            "UpdateItem",
252            "Query",
253            "Scan",
254            "BatchGetItem",
255            "BatchWriteItem",
256            "TagResource",
257            "UntagResource",
258            "ListTagsOfResource",
259            "TransactGetItems",
260            "TransactWriteItems",
261            "ExecuteStatement",
262            "BatchExecuteStatement",
263            "ExecuteTransaction",
264            "UpdateTimeToLive",
265            "DescribeTimeToLive",
266            "PutResourcePolicy",
267            "GetResourcePolicy",
268            "DeleteResourcePolicy",
269            "DescribeEndpoints",
270            "DescribeLimits",
271            "CreateBackup",
272            "DeleteBackup",
273            "DescribeBackup",
274            "ListBackups",
275            "RestoreTableFromBackup",
276            "RestoreTableToPointInTime",
277            "UpdateContinuousBackups",
278            "DescribeContinuousBackups",
279            "CreateGlobalTable",
280            "DescribeGlobalTable",
281            "DescribeGlobalTableSettings",
282            "ListGlobalTables",
283            "UpdateGlobalTable",
284            "UpdateGlobalTableSettings",
285            "DescribeTableReplicaAutoScaling",
286            "UpdateTableReplicaAutoScaling",
287            "EnableKinesisStreamingDestination",
288            "DisableKinesisStreamingDestination",
289            "DescribeKinesisStreamingDestination",
290            "UpdateKinesisStreamingDestination",
291            "DescribeContributorInsights",
292            "UpdateContributorInsights",
293            "ListContributorInsights",
294            "ExportTableToPointInTime",
295            "DescribeExport",
296            "ListExports",
297            "ImportTable",
298            "DescribeImport",
299            "ListImports",
300        ]
301    }
302}
303// ── Helper functions ────────────────────────────────────────────────────
304
305fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
306    body[field].as_str().ok_or_else(|| {
307        AwsServiceError::aws_error(
308            StatusCode::BAD_REQUEST,
309            "ValidationException",
310            format!("{field} is required"),
311        )
312    })
313}
314
315fn require_object(
316    body: &Value,
317    field: &str,
318) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
319    let obj = body[field].as_object().ok_or_else(|| {
320        AwsServiceError::aws_error(
321            StatusCode::BAD_REQUEST,
322            "ValidationException",
323            format!("{field} is required"),
324        )
325    })?;
326    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
327}
328
329fn get_table<'a>(
330    tables: &'a HashMap<String, DynamoTable>,
331    name: &str,
332) -> Result<&'a DynamoTable, AwsServiceError> {
333    tables.get(name).ok_or_else(|| {
334        AwsServiceError::aws_error(
335            StatusCode::BAD_REQUEST,
336            "ResourceNotFoundException",
337            format!("Requested resource not found: Table: {name} not found"),
338        )
339    })
340}
341
342fn get_table_mut<'a>(
343    tables: &'a mut HashMap<String, DynamoTable>,
344    name: &str,
345) -> Result<&'a mut DynamoTable, AwsServiceError> {
346    tables.get_mut(name).ok_or_else(|| {
347        AwsServiceError::aws_error(
348            StatusCode::BAD_REQUEST,
349            "ResourceNotFoundException",
350            format!("Requested resource not found: Table: {name} not found"),
351        )
352    })
353}
354
355fn find_table_by_arn<'a>(
356    tables: &'a HashMap<String, DynamoTable>,
357    arn: &str,
358) -> Result<&'a DynamoTable, AwsServiceError> {
359    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
360        AwsServiceError::aws_error(
361            StatusCode::BAD_REQUEST,
362            "ResourceNotFoundException",
363            format!("Requested resource not found: {arn}"),
364        )
365    })
366}
367
368fn find_table_by_arn_mut<'a>(
369    tables: &'a mut HashMap<String, DynamoTable>,
370    arn: &str,
371) -> Result<&'a mut DynamoTable, AwsServiceError> {
372    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
373        AwsServiceError::aws_error(
374            StatusCode::BAD_REQUEST,
375            "ResourceNotFoundException",
376            format!("Requested resource not found: {arn}"),
377        )
378    })
379}
380
381fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
382    let arr = val.as_array().ok_or_else(|| {
383        AwsServiceError::aws_error(
384            StatusCode::BAD_REQUEST,
385            "ValidationException",
386            "KeySchema is required",
387        )
388    })?;
389    Ok(arr
390        .iter()
391        .map(|elem| KeySchemaElement {
392            attribute_name: elem["AttributeName"]
393                .as_str()
394                .unwrap_or_default()
395                .to_string(),
396            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
397        })
398        .collect())
399}
400
401fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
402    let arr = val.as_array().ok_or_else(|| {
403        AwsServiceError::aws_error(
404            StatusCode::BAD_REQUEST,
405            "ValidationException",
406            "AttributeDefinitions is required",
407        )
408    })?;
409    Ok(arr
410        .iter()
411        .map(|elem| AttributeDefinition {
412            attribute_name: elem["AttributeName"]
413                .as_str()
414                .unwrap_or_default()
415                .to_string(),
416            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
417        })
418        .collect())
419}
420
421fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
422    Ok(ProvisionedThroughput {
423        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
424        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
425    })
426}
427
428fn parse_gsi(val: &Value, billing_mode: &str) -> Vec<GlobalSecondaryIndex> {
429    let Some(arr) = val.as_array() else {
430        return Vec::new();
431    };
432    arr.iter()
433        .filter_map(|g| {
434            Some(GlobalSecondaryIndex {
435                index_name: g["IndexName"].as_str()?.to_string(),
436                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
437                projection: parse_projection(&g["Projection"]),
438                provisioned_throughput: Some(parse_gsi_throughput(
439                    &g["ProvisionedThroughput"],
440                    billing_mode,
441                )),
442            })
443        })
444        .collect()
445}
446
447/// Resolve the provisioned-throughput slot for a GSI on a CreateTable or
448/// UpdateTable Create action. Real DynamoDB returns `{0, 0}` for GSIs on
449/// PAY_PER_REQUEST tables regardless of whether the caller sent a
450/// `ProvisionedThroughput` block, and the Terraform provider's `flatten`
451/// code keys `name`/`read_capacity`/`write_capacity` off the presence of
452/// that field — returning `None` would desynchronise state.
453fn parse_gsi_throughput(val: &Value, billing_mode: &str) -> ProvisionedThroughput {
454    if billing_mode == "PAY_PER_REQUEST" {
455        return ProvisionedThroughput {
456            read_capacity_units: 0,
457            write_capacity_units: 0,
458        };
459    }
460    ProvisionedThroughput {
461        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
462        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
463    }
464}
465
466fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
467    let Some(arr) = val.as_array() else {
468        return Vec::new();
469    };
470    arr.iter()
471        .filter_map(|l| {
472            Some(LocalSecondaryIndex {
473                index_name: l["IndexName"].as_str()?.to_string(),
474                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
475                projection: parse_projection(&l["Projection"]),
476            })
477        })
478        .collect()
479}
480
481pub(super) fn parse_projection(val: &Value) -> Projection {
482    Projection {
483        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
484        non_key_attributes: val["NonKeyAttributes"]
485            .as_array()
486            .map(|arr| {
487                arr.iter()
488                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
489                    .collect()
490            })
491            .unwrap_or_default(),
492    }
493}
494
495fn parse_tags(val: &Value) -> HashMap<String, String> {
496    let mut tags = HashMap::new();
497    if let Some(arr) = val.as_array() {
498        for tag in arr {
499            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
500                tags.insert(k.to_string(), v.to_string());
501            }
502        }
503    }
504    tags
505}
506
507fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
508    let mut names = HashMap::new();
509    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
510        for (k, v) in obj {
511            if let Some(s) = v.as_str() {
512                names.insert(k.clone(), s.to_string());
513            }
514        }
515    }
516    names
517}
518
519fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
520    let mut values = HashMap::new();
521    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
522        for (k, v) in obj {
523            values.insert(k.clone(), v.clone());
524        }
525    }
526    values
527}
528
529fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
530    if name.starts_with('#') {
531        expr_attr_names
532            .get(name)
533            .cloned()
534            .unwrap_or_else(|| name.to_string())
535    } else {
536        name.to_string()
537    }
538}
539
540fn extract_key(
541    table: &DynamoTable,
542    item: &HashMap<String, AttributeValue>,
543) -> HashMap<String, AttributeValue> {
544    let mut key = HashMap::new();
545    let hash_key = table.hash_key_name();
546    if let Some(v) = item.get(hash_key) {
547        key.insert(hash_key.to_string(), v.clone());
548    }
549    if let Some(range_key) = table.range_key_name() {
550        if let Some(v) = item.get(range_key) {
551            key.insert(range_key.to_string(), v.clone());
552        }
553    }
554    key
555}
556
557/// Parse a JSON object into a key map (used for ExclusiveStartKey).
558fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
559    let obj = value.as_object()?;
560    if obj.is_empty() {
561        return None;
562    }
563    Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
564}
565
566/// Check whether an item's key attributes match the given key map.
567fn item_matches_key(
568    item: &HashMap<String, AttributeValue>,
569    key: &HashMap<String, AttributeValue>,
570    hash_key_name: &str,
571    range_key_name: Option<&str>,
572) -> bool {
573    let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
574        (Some(a), Some(b)) => a == b,
575        _ => false,
576    };
577    if !hash_match {
578        return false;
579    }
580    match range_key_name {
581        Some(rk) => match (item.get(rk), key.get(rk)) {
582            (Some(a), Some(b)) => a == b,
583            (None, None) => true,
584            _ => false,
585        },
586        None => true,
587    }
588}
589
590/// Extract the primary key from an item given explicit key attribute names.
591fn extract_key_for_schema(
592    item: &HashMap<String, AttributeValue>,
593    hash_key_name: &str,
594    range_key_name: Option<&str>,
595) -> HashMap<String, AttributeValue> {
596    let mut key = HashMap::new();
597    if let Some(v) = item.get(hash_key_name) {
598        key.insert(hash_key_name.to_string(), v.clone());
599    }
600    if let Some(rk) = range_key_name {
601        if let Some(v) = item.get(rk) {
602            key.insert(rk.to_string(), v.clone());
603        }
604    }
605    key
606}
607
608fn validate_key_in_item(
609    table: &DynamoTable,
610    item: &HashMap<String, AttributeValue>,
611) -> Result<(), AwsServiceError> {
612    let hash_key = table.hash_key_name();
613    if !item.contains_key(hash_key) {
614        return Err(AwsServiceError::aws_error(
615            StatusCode::BAD_REQUEST,
616            "ValidationException",
617            format!("Missing the key {hash_key} in the item"),
618        ));
619    }
620    if let Some(range_key) = table.range_key_name() {
621        if !item.contains_key(range_key) {
622            return Err(AwsServiceError::aws_error(
623                StatusCode::BAD_REQUEST,
624                "ValidationException",
625                format!("Missing the key {range_key} in the item"),
626            ));
627        }
628    }
629    Ok(())
630}
631
632fn validate_key_attributes_in_key(
633    table: &DynamoTable,
634    key: &HashMap<String, AttributeValue>,
635) -> Result<(), AwsServiceError> {
636    let hash_key = table.hash_key_name();
637    if !key.contains_key(hash_key) {
638        return Err(AwsServiceError::aws_error(
639            StatusCode::BAD_REQUEST,
640            "ValidationException",
641            format!("Missing the key {hash_key} in the item"),
642        ));
643    }
644    Ok(())
645}
646
647fn project_item(
648    item: &HashMap<String, AttributeValue>,
649    body: &Value,
650) -> HashMap<String, AttributeValue> {
651    let projection = body["ProjectionExpression"].as_str();
652    match projection {
653        Some(proj) if !proj.is_empty() => {
654            let expr_attr_names = parse_expression_attribute_names(body);
655            let attrs: Vec<String> = proj
656                .split(',')
657                .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
658                .collect();
659            let mut result = HashMap::new();
660            for attr in &attrs {
661                if let Some(v) = resolve_nested_path(item, attr) {
662                    insert_nested_value(&mut result, attr, v);
663                }
664            }
665            result
666        }
667        _ => item.clone(),
668    }
669}
670
671/// Resolve expression attribute names within each segment of a projection path.
672/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
673fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
674    // Split on dots, resolve each part, rejoin
675    let mut result = String::new();
676    for (i, segment) in path.split('.').enumerate() {
677        if i > 0 {
678            result.push('.');
679        }
680        // A segment might be like "#n" or "people[0]" or "#attr[0]"
681        if let Some(bracket_pos) = segment.find('[') {
682            let key_part = &segment[..bracket_pos];
683            let index_part = &segment[bracket_pos..];
684            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
685            result.push_str(index_part);
686        } else {
687            result.push_str(&resolve_attr_name(segment, expr_attr_names));
688        }
689    }
690    result
691}
692
693/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
694fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
695    let segments = parse_path_segments(path);
696    if segments.is_empty() {
697        return None;
698    }
699
700    let first = &segments[0];
701    let top_key = match first {
702        PathSegment::Key(k) => k.as_str(),
703        _ => return None,
704    };
705
706    let mut current = item.get(top_key)?.clone();
707
708    for segment in &segments[1..] {
709        match segment {
710            PathSegment::Key(k) => {
711                // Navigate into a Map: {"M": {"key": ...}}
712                current = current.get("M")?.get(k)?.clone();
713            }
714            PathSegment::Index(idx) => {
715                // Navigate into a List: {"L": [...]}
716                current = current.get("L")?.get(*idx)?.clone();
717            }
718        }
719    }
720
721    Some(current)
722}
723
724#[derive(Debug)]
725enum PathSegment {
726    Key(String),
727    Index(usize),
728}
729
730/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
731fn parse_path_segments(path: &str) -> Vec<PathSegment> {
732    let mut segments = Vec::new();
733    let mut current = String::new();
734
735    let chars: Vec<char> = path.chars().collect();
736    let mut i = 0;
737    while i < chars.len() {
738        match chars[i] {
739            '.' => {
740                if !current.is_empty() {
741                    segments.push(PathSegment::Key(current.clone()));
742                    current.clear();
743                }
744            }
745            '[' => {
746                if !current.is_empty() {
747                    segments.push(PathSegment::Key(current.clone()));
748                    current.clear();
749                }
750                i += 1;
751                let mut num = String::new();
752                while i < chars.len() && chars[i] != ']' {
753                    num.push(chars[i]);
754                    i += 1;
755                }
756                if let Ok(idx) = num.parse::<usize>() {
757                    segments.push(PathSegment::Index(idx));
758                }
759                // skip ']'
760            }
761            c => {
762                current.push(c);
763            }
764        }
765        i += 1;
766    }
767    if !current.is_empty() {
768        segments.push(PathSegment::Key(current));
769    }
770    segments
771}
772
773/// Insert a value at a nested path in the result HashMap.
774/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
775fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
776    // Simple case: no nesting
777    if !path.contains('.') && !path.contains('[') {
778        result.insert(path.to_string(), value);
779        return;
780    }
781
782    let segments = parse_path_segments(path);
783    if segments.is_empty() {
784        return;
785    }
786
787    let top_key = match &segments[0] {
788        PathSegment::Key(k) => k.clone(),
789        _ => return,
790    };
791
792    if segments.len() == 1 {
793        result.insert(top_key, value);
794        return;
795    }
796
797    // For nested paths, wrap the value back into the nested structure
798    let wrapped = wrap_value_in_path(&segments[1..], value);
799    // Merge into existing value if present
800    let existing = result.remove(&top_key);
801    let merged = match existing {
802        Some(existing) => merge_attribute_values(existing, wrapped),
803        None => wrapped,
804    };
805    result.insert(top_key, merged);
806}
807
808/// Wrap a value in the nested path structure.
809fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
810    if segments.is_empty() {
811        return value;
812    }
813    let inner = wrap_value_in_path(&segments[1..], value);
814    match &segments[0] {
815        PathSegment::Key(k) => {
816            json!({"M": {k.clone(): inner}})
817        }
818        PathSegment::Index(idx) => {
819            let mut arr = vec![Value::Null; idx + 1];
820            arr[*idx] = inner;
821            json!({"L": arr})
822        }
823    }
824}
825
826/// Merge two attribute values (for overlapping projections).
827fn merge_attribute_values(a: Value, b: Value) -> Value {
828    if let (Some(a_map), Some(b_map)) = (
829        a.get("M").and_then(|v| v.as_object()),
830        b.get("M").and_then(|v| v.as_object()),
831    ) {
832        let mut merged = a_map.clone();
833        for (k, v) in b_map {
834            if let Some(existing) = merged.get(k) {
835                merged.insert(
836                    k.clone(),
837                    merge_attribute_values(existing.clone(), v.clone()),
838                );
839            } else {
840                merged.insert(k.clone(), v.clone());
841            }
842        }
843        json!({"M": merged})
844    } else {
845        b
846    }
847}
848
849fn evaluate_condition(
850    condition: &str,
851    existing: Option<&HashMap<String, AttributeValue>>,
852    expr_attr_names: &HashMap<String, String>,
853    expr_attr_values: &HashMap<String, Value>,
854) -> Result<(), AwsServiceError> {
855    // ConditionExpression and FilterExpression share the same DynamoDB grammar,
856    // so we delegate to evaluate_filter_expression. An empty map models "item
857    // doesn't exist" correctly: attribute_exists → false, attribute_not_exists
858    // → true, comparisons against missing attributes → None vs Some(val).
859    let empty = HashMap::new();
860    let item = existing.unwrap_or(&empty);
861    if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
862        Ok(())
863    } else {
864        Err(AwsServiceError::aws_error(
865            StatusCode::BAD_REQUEST,
866            "ConditionalCheckFailedException",
867            "The conditional request failed",
868        ))
869    }
870}
871
872fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
873    // aws-sdk-go v2's expression builder emits function calls with a space
874    // between the name and the opening paren (`attribute_exists (#0)`),
875    // while hand-written expressions usually don't — accept both.
876    let with_paren = format!("{func_name}(");
877    let with_space = format!("{func_name} (");
878    let rest = expr
879        .strip_prefix(&with_paren)
880        .or_else(|| expr.strip_prefix(&with_space))?;
881    let inner = rest.strip_suffix(')')?;
882    Some(inner.trim())
883}
884
885fn evaluate_key_condition(
886    expr: &str,
887    item: &HashMap<String, AttributeValue>,
888    hash_key_name: &str,
889    _range_key_name: Option<&str>,
890    expr_attr_names: &HashMap<String, String>,
891    expr_attr_values: &HashMap<String, Value>,
892) -> bool {
893    let parts: Vec<&str> = split_on_and(expr);
894    for part in &parts {
895        let part = part.trim();
896        if !evaluate_single_key_condition(
897            part,
898            item,
899            hash_key_name,
900            expr_attr_names,
901            expr_attr_values,
902        ) {
903            return false;
904        }
905    }
906    true
907}
908
909/// Split a DynamoDB condition expression on a top-level keyword (``" AND "``,
910/// ``" OR "``), case-insensitively. Parenthesised groups are skipped so only
911/// unparenthesised occurrences of the keyword act as separators.
912fn split_on_top_level_keyword<'a>(expr: &'a str, keyword: &str) -> Vec<&'a str> {
913    let mut parts = Vec::new();
914    let mut start = 0;
915    let len = expr.len();
916    let mut i = 0;
917    let mut depth = 0;
918    while i < len {
919        let ch = expr.as_bytes()[i];
920        if ch == b'(' {
921            depth += 1;
922        } else if ch == b')' {
923            if depth > 0 {
924                depth -= 1;
925            }
926        } else if depth == 0
927            && i + keyword.len() <= len
928            && expr.is_char_boundary(i)
929            && expr.is_char_boundary(i + keyword.len())
930            && expr[i..i + keyword.len()].eq_ignore_ascii_case(keyword)
931        {
932            parts.push(&expr[start..i]);
933            start = i + keyword.len();
934            i = start;
935            continue;
936        }
937        i += 1;
938    }
939    parts.push(&expr[start..]);
940    parts
941}
942
943fn split_on_and(expr: &str) -> Vec<&str> {
944    split_on_top_level_keyword(expr, " AND ")
945}
946
947fn split_on_or(expr: &str) -> Vec<&str> {
948    split_on_top_level_keyword(expr, " OR ")
949}
950
951fn evaluate_single_key_condition(
952    part: &str,
953    item: &HashMap<String, AttributeValue>,
954    _hash_key_name: &str,
955    expr_attr_names: &HashMap<String, String>,
956    expr_attr_values: &HashMap<String, Value>,
957) -> bool {
958    let part = part.trim();
959
960    if let Some(rest) = part
961        .strip_prefix("begins_with(")
962        .or_else(|| part.strip_prefix("begins_with ("))
963    {
964        return key_cond_begins_with(rest, item, expr_attr_names, expr_attr_values);
965    }
966
967    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
968        return key_cond_between(part, between_pos, item, expr_attr_names, expr_attr_values);
969    }
970
971    key_cond_simple_comparison(part, item, expr_attr_names, expr_attr_values)
972}
973
974/// `begins_with(attr, :val)` — KeyCondition variant: supports only
975/// S-typed attributes (mirrors AWS's behavior of returning false for
976/// type mismatches). The filter-expression evaluator has its own
977/// `eval_begins_with` because it operates on filter-grammar inputs.
978fn key_cond_begins_with(
979    rest: &str,
980    item: &HashMap<String, AttributeValue>,
981    expr_attr_names: &HashMap<String, String>,
982    expr_attr_values: &HashMap<String, Value>,
983) -> bool {
984    let Some(inner) = rest.strip_suffix(')') else {
985        return false;
986    };
987    let mut split = inner.splitn(2, ',');
988    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
989        return false;
990    };
991    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
992    let expected = expr_attr_values.get(val_ref.trim());
993    let actual = item.get(&attr_name);
994    match (actual, expected) {
995        (Some(a), Some(e)) => {
996            let a_str = a.get("S").and_then(|v| v.as_str());
997            let e_str = e.get("S").and_then(|v| v.as_str());
998            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
999        }
1000        _ => false,
1001    }
1002}
1003
1004/// `attr BETWEEN :lo AND :hi` — inclusive range comparison via the
1005/// shared `compare_attribute_values` ordering.
1006fn key_cond_between(
1007    part: &str,
1008    between_pos: usize,
1009    item: &HashMap<String, AttributeValue>,
1010    expr_attr_names: &HashMap<String, String>,
1011    expr_attr_values: &HashMap<String, Value>,
1012) -> bool {
1013    let attr_part = part[..between_pos].trim();
1014    let attr_name = resolve_attr_name(attr_part, expr_attr_names);
1015    let range_part = &part[between_pos + 7..];
1016    let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") else {
1017        return false;
1018    };
1019    let lo_ref = range_part[..and_pos].trim();
1020    let hi_ref = range_part[and_pos + 5..].trim();
1021    let lo = expr_attr_values.get(lo_ref);
1022    let hi = expr_attr_values.get(hi_ref);
1023    let actual = item.get(&attr_name);
1024    match (actual, lo, hi) {
1025        (Some(a), Some(l), Some(h)) => {
1026            compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
1027                && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
1028        }
1029        _ => false,
1030    }
1031}
1032
1033/// `attr <op> :val` — six operators (`=`, `<>`, `<`, `>`, `<=`, `>=`).
1034/// Multi-character operators come first in the search list so that `<=`
1035/// is not mistakenly matched as `<`.
1036fn key_cond_simple_comparison(
1037    part: &str,
1038    item: &HashMap<String, AttributeValue>,
1039    expr_attr_names: &HashMap<String, String>,
1040    expr_attr_values: &HashMap<String, Value>,
1041) -> bool {
1042    for op in &["<=", ">=", "<>", "=", "<", ">"] {
1043        let Some(pos) = part.find(op) else {
1044            continue;
1045        };
1046        let left = part[..pos].trim();
1047        let right = part[pos + op.len()..].trim();
1048        let attr_name = resolve_attr_name(left, expr_attr_names);
1049        let expected = expr_attr_values.get(right);
1050        let actual = item.get(&attr_name);
1051
1052        return match *op {
1053            "=" => actual == expected,
1054            "<>" => actual != expected,
1055            "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
1056            ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
1057            "<=" => {
1058                let cmp = compare_attribute_values(actual, expected);
1059                cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
1060            }
1061            ">=" => {
1062                let cmp = compare_attribute_values(actual, expected);
1063                cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
1064            }
1065            _ => false,
1066        };
1067    }
1068    false
1069}
1070
1071/// Returns the "size" of a DynamoDB attribute value per AWS docs:
1072/// S → character count, N → always 0 (AWS returns size of internal representation, we approximate),
1073/// B → byte count, SS/NS/BS → element count, L → element count, M → element count,
1074/// BOOL/NULL → 1.
1075fn attribute_size(val: &Value) -> Option<usize> {
1076    if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
1077        return Some(s.len());
1078    }
1079    if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
1080        // B is base64-encoded — return decoded byte count
1081        let decoded_len = base64::engine::general_purpose::STANDARD
1082            .decode(b)
1083            .map(|v| v.len())
1084            .unwrap_or(b.len());
1085        return Some(decoded_len);
1086    }
1087    if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
1088        return Some(arr.len());
1089    }
1090    if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
1091        return Some(arr.len());
1092    }
1093    if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
1094        return Some(arr.len());
1095    }
1096    if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
1097        return Some(arr.len());
1098    }
1099    if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
1100        return Some(obj.len());
1101    }
1102    if val.get("N").is_some() {
1103        // AWS returns numeric representation size; approximate with string length
1104        return val.get("N").and_then(|v| v.as_str()).map(|s| s.len());
1105    }
1106    if val.get("BOOL").is_some() || val.get("NULL").is_some() {
1107        return Some(1);
1108    }
1109    None
1110}
1111
1112/// Evaluate a `size(path) op :val` comparison expression.
1113fn evaluate_size_comparison(
1114    part: &str,
1115    item: &HashMap<String, AttributeValue>,
1116    expr_attr_names: &HashMap<String, String>,
1117    expr_attr_values: &HashMap<String, Value>,
1118) -> Option<bool> {
1119    // Find the closing paren of size(...)
1120    let open = part.find('(')?;
1121    let close = part[open..].find(')')? + open;
1122    let path = part[open + 1..close].trim();
1123    let remainder = part[close + 1..].trim();
1124
1125    // Parse operator and value ref
1126    let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
1127        ("<=", rest.trim())
1128    } else if let Some(rest) = remainder.strip_prefix(">=") {
1129        (">=", rest.trim())
1130    } else if let Some(rest) = remainder.strip_prefix("<>") {
1131        ("<>", rest.trim())
1132    } else if let Some(rest) = remainder.strip_prefix('<') {
1133        ("<", rest.trim())
1134    } else if let Some(rest) = remainder.strip_prefix('>') {
1135        (">", rest.trim())
1136    } else if let Some(rest) = remainder.strip_prefix('=') {
1137        ("=", rest.trim())
1138    } else {
1139        return None;
1140    };
1141
1142    let attr_name = resolve_attr_name(path, expr_attr_names);
1143    let actual = item.get(&attr_name)?;
1144    let size = attribute_size(actual)? as f64;
1145
1146    let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
1147
1148    Some(match op {
1149        "=" => (size - expected).abs() < f64::EPSILON,
1150        "<>" => (size - expected).abs() >= f64::EPSILON,
1151        "<" => size < expected,
1152        ">" => size > expected,
1153        "<=" => size <= expected,
1154        ">=" => size >= expected,
1155        _ => false,
1156    })
1157}
1158
1159fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
1160    match (a, b) {
1161        (None, None) => std::cmp::Ordering::Equal,
1162        (None, Some(_)) => std::cmp::Ordering::Less,
1163        (Some(_), None) => std::cmp::Ordering::Greater,
1164        (Some(a), Some(b)) => {
1165            let a_type = attribute_type_and_value(a);
1166            let b_type = attribute_type_and_value(b);
1167            match (a_type, b_type) {
1168                (Some(("S", a_val)), Some(("S", b_val))) => {
1169                    let a_str = a_val.as_str().unwrap_or("");
1170                    let b_str = b_val.as_str().unwrap_or("");
1171                    a_str.cmp(b_str)
1172                }
1173                (Some(("N", a_val)), Some(("N", b_val))) => {
1174                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1175                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1176                    a_num
1177                        .partial_cmp(&b_num)
1178                        .unwrap_or(std::cmp::Ordering::Equal)
1179                }
1180                (Some(("B", a_val)), Some(("B", b_val))) => {
1181                    let a_str = a_val.as_str().unwrap_or("");
1182                    let b_str = b_val.as_str().unwrap_or("");
1183                    a_str.cmp(b_str)
1184                }
1185                _ => std::cmp::Ordering::Equal,
1186            }
1187        }
1188    }
1189}
1190
1191fn evaluate_filter_expression(
1192    expr: &str,
1193    item: &HashMap<String, AttributeValue>,
1194    expr_attr_names: &HashMap<String, String>,
1195    expr_attr_values: &HashMap<String, Value>,
1196) -> bool {
1197    let trimmed = expr.trim();
1198
1199    // Split on OR first (lower precedence), respecting parentheses
1200    let or_parts = split_on_or(trimmed);
1201    if or_parts.len() > 1 {
1202        return or_parts.iter().any(|part| {
1203            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1204        });
1205    }
1206
1207    // Then split on AND (higher precedence), respecting parentheses
1208    let and_parts = split_on_and(trimmed);
1209    if and_parts.len() > 1 {
1210        return and_parts.iter().all(|part| {
1211            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1212        });
1213    }
1214
1215    // Strip outer parentheses if present
1216    let stripped = strip_outer_parens(trimmed);
1217    if stripped != trimmed {
1218        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
1219    }
1220
1221    // Handle NOT prefix (case-insensitive)
1222    if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
1223        return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
1224    }
1225
1226    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
1227}
1228
1229/// Strip matching outer parentheses from an expression.
1230fn strip_outer_parens(expr: &str) -> &str {
1231    let trimmed = expr.trim();
1232    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
1233        return trimmed;
1234    }
1235    // Verify the outer parens actually match each other
1236    let inner = &trimmed[1..trimmed.len() - 1];
1237    let mut depth = 0;
1238    for ch in inner.bytes() {
1239        match ch {
1240            b'(' => depth += 1,
1241            b')' => {
1242                if depth == 0 {
1243                    return trimmed; // closing paren matches something inside, not the outer one
1244                }
1245                depth -= 1;
1246            }
1247            _ => {}
1248        }
1249    }
1250    if depth == 0 {
1251        inner
1252    } else {
1253        trimmed
1254    }
1255}
1256
1257fn evaluate_single_filter_condition(
1258    part: &str,
1259    item: &HashMap<String, AttributeValue>,
1260    expr_attr_names: &HashMap<String, String>,
1261    expr_attr_values: &HashMap<String, Value>,
1262) -> bool {
1263    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
1264        let attr = resolve_attr_name(inner, expr_attr_names);
1265        return item.contains_key(&attr);
1266    }
1267
1268    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
1269        let attr = resolve_attr_name(inner, expr_attr_names);
1270        return !item.contains_key(&attr);
1271    }
1272
1273    if let Some(rest) = part
1274        .strip_prefix("begins_with(")
1275        .or_else(|| part.strip_prefix("begins_with ("))
1276    {
1277        return eval_begins_with(rest, item, expr_attr_names, expr_attr_values);
1278    }
1279
1280    if let Some(rest) = part
1281        .strip_prefix("contains(")
1282        .or_else(|| part.strip_prefix("contains ("))
1283    {
1284        return eval_contains(rest, item, expr_attr_names, expr_attr_values);
1285    }
1286
1287    if part.starts_with("size(") || part.starts_with("size (") {
1288        if let Some(result) =
1289            evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
1290        {
1291            return result;
1292        }
1293    }
1294
1295    if let Some(rest) = part
1296        .strip_prefix("attribute_type(")
1297        .or_else(|| part.strip_prefix("attribute_type ("))
1298    {
1299        return eval_attribute_type(rest, item, expr_attr_names, expr_attr_values);
1300    }
1301
1302    if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
1303        let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
1304        let actual = item.get(&attr_name);
1305        return evaluate_in_match(actual, &value_refs, expr_attr_values);
1306    }
1307
1308    evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
1309}
1310
1311/// `begins_with(path, :val)` — only S (string) operands. Returns false on
1312/// any parse failure or type mismatch (this is the same shape DynamoDB
1313/// returns: a malformed predicate is silently false rather than an error).
1314fn eval_begins_with(
1315    rest: &str,
1316    item: &HashMap<String, AttributeValue>,
1317    expr_attr_names: &HashMap<String, String>,
1318    expr_attr_values: &HashMap<String, Value>,
1319) -> bool {
1320    let Some(inner) = rest.strip_suffix(')') else {
1321        return false;
1322    };
1323    let mut split = inner.splitn(2, ',');
1324    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1325        return false;
1326    };
1327    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1328    let expected = expr_attr_values.get(val_ref.trim());
1329    let actual = item.get(&attr_name);
1330    match (actual, expected) {
1331        (Some(a), Some(e)) => {
1332            let a_str = a.get("S").and_then(|v| v.as_str());
1333            let e_str = e.get("S").and_then(|v| v.as_str());
1334            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
1335        }
1336        _ => false,
1337    }
1338}
1339
1340/// `contains(path, :val)` — substring check on S, set membership on
1341/// SS/NS/BS, and element membership on L. Other type pairings return
1342/// false.
1343fn eval_contains(
1344    rest: &str,
1345    item: &HashMap<String, AttributeValue>,
1346    expr_attr_names: &HashMap<String, String>,
1347    expr_attr_values: &HashMap<String, Value>,
1348) -> bool {
1349    let Some(inner) = rest.strip_suffix(')') else {
1350        return false;
1351    };
1352    let mut split = inner.splitn(2, ',');
1353    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1354        return false;
1355    };
1356    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1357    let expected = expr_attr_values.get(val_ref.trim());
1358    let actual = item.get(&attr_name);
1359    let (Some(a), Some(e)) = (actual, expected) else {
1360        return false;
1361    };
1362
1363    if let (Some(a_s), Some(e_s)) = (
1364        a.get("S").and_then(|v| v.as_str()),
1365        e.get("S").and_then(|v| v.as_str()),
1366    ) {
1367        return a_s.contains(e_s);
1368    }
1369    if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
1370        if let Some(val) = e.get("S") {
1371            return set.contains(val);
1372        }
1373    }
1374    if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
1375        if let Some(val) = e.get("N") {
1376            return set.contains(val);
1377        }
1378    }
1379    if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
1380        if let Some(val) = e.get("B") {
1381            return set.contains(val);
1382        }
1383    }
1384    if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
1385        return list.contains(e);
1386    }
1387    false
1388}
1389
1390/// `attribute_type(path, :type)` — checks whether the attribute at `path`
1391/// is stored under the wire type identified by `:type` (one of the
1392/// DynamoDB type letters S/N/B/BOOL/NULL/SS/NS/BS/L/M).
1393fn eval_attribute_type(
1394    rest: &str,
1395    item: &HashMap<String, AttributeValue>,
1396    expr_attr_names: &HashMap<String, String>,
1397    expr_attr_values: &HashMap<String, Value>,
1398) -> bool {
1399    let Some(inner) = rest.strip_suffix(')') else {
1400        return false;
1401    };
1402    let mut split = inner.splitn(2, ',');
1403    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1404        return false;
1405    };
1406    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1407    let expected_type = expr_attr_values
1408        .get(val_ref.trim())
1409        .and_then(|v| v.get("S"))
1410        .and_then(|v| v.as_str());
1411    let actual = item.get(&attr_name);
1412    let (Some(val), Some(t)) = (actual, expected_type) else {
1413        return false;
1414    };
1415    match t {
1416        "S" => val.get("S").is_some(),
1417        "N" => val.get("N").is_some(),
1418        "B" => val.get("B").is_some(),
1419        "BOOL" => val.get("BOOL").is_some(),
1420        "NULL" => val.get("NULL").is_some(),
1421        "SS" => val.get("SS").is_some(),
1422        "NS" => val.get("NS").is_some(),
1423        "BS" => val.get("BS").is_some(),
1424        "L" => val.get("L").is_some(),
1425        "M" => val.get("M").is_some(),
1426        _ => false,
1427    }
1428}
1429
1430/// Parse an `attr IN (:v1, :v2, ...)` expression. Mirrors the DynamoDB
1431/// ConditionExpression / FilterExpression grammar where IN takes a single
1432/// operand on the left and 1–100 comma-separated value refs inside parens
1433/// on the right. Case-insensitive; tolerates missing spaces after commas
1434/// (aws-sdk-go's `expression` builder emits ", " but hand-built expressions
1435/// often use `strings.Join(..., ",")`). Returns None for non-IN inputs so
1436/// callers can fall through to their other grammar branches.
1437fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
1438    let upper = expr.to_ascii_uppercase();
1439    let in_pos = upper.find(" IN ")?;
1440    let attr_ref = expr[..in_pos].trim();
1441    if attr_ref.is_empty() {
1442        return None;
1443    }
1444    let rest = expr[in_pos + 4..].trim_start();
1445    let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
1446    let values: Vec<&str> = inner
1447        .split(',')
1448        .map(|s| s.trim())
1449        .filter(|s| !s.is_empty())
1450        .collect();
1451    if values.is_empty() {
1452        return None;
1453    }
1454    Some((attr_ref, values))
1455}
1456
1457/// Return true iff `actual` equals any of the `value_refs` resolved through
1458/// `expr_attr_values`. A missing attribute never matches (mirrors AWS, which
1459/// evaluates `IN` against undefined attributes as false).
1460fn evaluate_in_match(
1461    actual: Option<&AttributeValue>,
1462    value_refs: &[&str],
1463    expr_attr_values: &HashMap<String, Value>,
1464) -> bool {
1465    value_refs.iter().any(|v_ref| {
1466        let expected = expr_attr_values.get(*v_ref);
1467        matches!((actual, expected), (Some(a), Some(e)) if a == e)
1468    })
1469}
1470
1471/// One of the four DynamoDB ``UpdateExpression`` action keywords.
1472#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1473enum UpdateAction {
1474    Set,
1475    Remove,
1476    Add,
1477    Delete,
1478}
1479
1480impl UpdateAction {
1481    /// All four keywords as written on the wire — these double as the search
1482    /// terms for ``parse_update_clauses``.
1483    const KEYWORDS: &'static [(&'static str, UpdateAction)] = &[
1484        ("SET", UpdateAction::Set),
1485        ("REMOVE", UpdateAction::Remove),
1486        ("ADD", UpdateAction::Add),
1487        ("DELETE", UpdateAction::Delete),
1488    ];
1489
1490    fn keyword(self) -> &'static str {
1491        match self {
1492            UpdateAction::Set => "SET",
1493            UpdateAction::Remove => "REMOVE",
1494            UpdateAction::Add => "ADD",
1495            UpdateAction::Delete => "DELETE",
1496        }
1497    }
1498}
1499
1500fn apply_update_expression(
1501    item: &mut HashMap<String, AttributeValue>,
1502    expr: &str,
1503    expr_attr_names: &HashMap<String, String>,
1504    expr_attr_values: &HashMap<String, Value>,
1505) -> Result<(), AwsServiceError> {
1506    let clauses = parse_update_clauses(expr);
1507    if clauses.is_empty() && !expr.trim().is_empty() {
1508        return Err(AwsServiceError::aws_error(
1509            StatusCode::BAD_REQUEST,
1510            "ValidationException",
1511            "Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
1512        ));
1513    }
1514    for (action, assignments) in &clauses {
1515        match action {
1516            UpdateAction::Set => {
1517                for assignment in assignments {
1518                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1519                }
1520            }
1521            UpdateAction::Remove => {
1522                for attr_ref in assignments {
1523                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1524                    item.remove(&attr);
1525                }
1526            }
1527            UpdateAction::Add => {
1528                for assignment in assignments {
1529                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1530                }
1531            }
1532            UpdateAction::Delete => {
1533                for assignment in assignments {
1534                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1535                }
1536            }
1537        }
1538    }
1539    Ok(())
1540}
1541
1542fn parse_update_clauses(expr: &str) -> Vec<(UpdateAction, Vec<String>)> {
1543    let mut clauses: Vec<(UpdateAction, Vec<String>)> = Vec::new();
1544    let upper = expr.to_ascii_uppercase();
1545    let mut positions: Vec<(usize, UpdateAction)> = Vec::new();
1546
1547    for &(kw, action) in UpdateAction::KEYWORDS {
1548        let mut search_from = 0;
1549        while let Some(pos) = upper[search_from..].find(kw) {
1550            let abs_pos = search_from + pos;
1551            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
1552            let after_pos = abs_pos + kw.len();
1553            let after_ok =
1554                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
1555            if before_ok && after_ok {
1556                positions.push((abs_pos, action));
1557            }
1558            search_from = abs_pos + kw.len();
1559        }
1560    }
1561
1562    positions.sort_by_key(|(pos, _)| *pos);
1563
1564    for (i, &(pos, action)) in positions.iter().enumerate() {
1565        let start = pos + action.keyword().len();
1566        let end = if i + 1 < positions.len() {
1567            positions[i + 1].0
1568        } else {
1569            expr.len()
1570        };
1571        let content = expr[start..end].trim();
1572        // Use a paren-aware split so that function-call arguments such as
1573        // `list_append(#a, :b)` are kept as a single assignment rather than
1574        // being torn apart at the inner comma.
1575        let assignments: Vec<String> = split_on_top_level_keyword(content, ",")
1576            .into_iter()
1577            .map(|s| s.trim().to_string())
1578            .collect();
1579        clauses.push((action, assignments));
1580    }
1581
1582    clauses
1583}
1584
1585fn apply_set_assignment(
1586    item: &mut HashMap<String, AttributeValue>,
1587    assignment: &str,
1588    expr_attr_names: &HashMap<String, String>,
1589    expr_attr_values: &HashMap<String, Value>,
1590) -> Result<(), AwsServiceError> {
1591    let Some((left, right)) = assignment.split_once('=') else {
1592        return Ok(());
1593    };
1594
1595    let left_trimmed = left.trim();
1596    // Split off a trailing `[N]` list-index suffix so we can resolve the
1597    // attribute name ref on its own. Without this, `resolve_attr_name` sees
1598    // "#items[0]" as a whole and misses the `#items` → `items` mapping.
1599    let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
1600        Some((name, idx)) => (name, Some(idx)),
1601        None => (left_trimmed, None),
1602    };
1603    let attr = resolve_attr_name(attr_ref, expr_attr_names);
1604    let right = right.trim();
1605
1606    if let Some(rest) = right
1607        .strip_prefix("if_not_exists(")
1608        .or_else(|| right.strip_prefix("if_not_exists ("))
1609    {
1610        apply_set_if_not_exists(item, &attr, rest, expr_attr_names, expr_attr_values);
1611        return Ok(());
1612    }
1613
1614    if let Some(rest) = right
1615        .strip_prefix("list_append(")
1616        .or_else(|| right.strip_prefix("list_append ("))
1617    {
1618        apply_set_list_append(item, &attr, rest, expr_attr_names, expr_attr_values);
1619        return Ok(());
1620    }
1621
1622    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
1623        return apply_set_arithmetic(
1624            item,
1625            &attr,
1626            arith_left,
1627            arith_right,
1628            is_add,
1629            expr_attr_names,
1630            expr_attr_values,
1631        );
1632    }
1633
1634    let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
1635    if let Some(v) = val {
1636        match list_index {
1637            Some(idx) => assign_list_index(item, &attr, idx, v)?,
1638            None => {
1639                item.insert(attr, v);
1640            }
1641        }
1642    }
1643
1644    Ok(())
1645}
1646
1647/// SET ... = if_not_exists(other_attr, :val) — write `:val` into `attr`
1648/// only when `other_attr` is missing from the item. The lookup uses
1649/// `other_attr`, not the SET target, which is what makes it useful as a
1650/// 'create-or-keep' primitive.
1651fn apply_set_if_not_exists(
1652    item: &mut HashMap<String, AttributeValue>,
1653    attr: &str,
1654    rest: &str,
1655    expr_attr_names: &HashMap<String, String>,
1656    expr_attr_values: &HashMap<String, Value>,
1657) {
1658    let Some(inner) = rest.strip_suffix(')') else {
1659        return;
1660    };
1661    let mut split = inner.splitn(2, ',');
1662    let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) else {
1663        return;
1664    };
1665    let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
1666    if item.contains_key(&check_name) {
1667        return;
1668    }
1669    if let Some(val) = expr_attr_values.get(default_ref.trim()) {
1670        item.insert(attr.to_string(), val.clone());
1671    }
1672}
1673
1674/// SET ... = list_append(a, b) — concatenate the L arrays of two list
1675/// operands. Either operand may be missing or non-list, in which case
1676/// it contributes nothing.
1677fn apply_set_list_append(
1678    item: &mut HashMap<String, AttributeValue>,
1679    attr: &str,
1680    rest: &str,
1681    expr_attr_names: &HashMap<String, String>,
1682    expr_attr_values: &HashMap<String, Value>,
1683) {
1684    let Some(inner) = rest.strip_suffix(')') else {
1685        return;
1686    };
1687    let mut split = inner.splitn(2, ',');
1688    let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) else {
1689        return;
1690    };
1691    let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
1692    let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
1693
1694    let mut merged = Vec::new();
1695    if let Some(Value::Object(obj)) = &a_val {
1696        if let Some(Value::Array(arr)) = obj.get("L") {
1697            merged.extend(arr.clone());
1698        }
1699    }
1700    if let Some(Value::Object(obj)) = &b_val {
1701        if let Some(Value::Array(arr)) = obj.get("L") {
1702            merged.extend(arr.clone());
1703        }
1704    }
1705
1706    item.insert(attr.to_string(), json!({"L": merged}));
1707}
1708
1709/// SET ... = `<arith_left> +/- <arith_right>` — both operands must
1710/// resolve to N values (or the LHS may be missing, in which case it's
1711/// treated as 0). Anything else is rejected with the same
1712/// `ValidationException` AWS returns.
1713fn apply_set_arithmetic(
1714    item: &mut HashMap<String, AttributeValue>,
1715    attr: &str,
1716    arith_left: &str,
1717    arith_right: &str,
1718    is_add: bool,
1719    expr_attr_names: &HashMap<String, String>,
1720    expr_attr_values: &HashMap<String, Value>,
1721) -> Result<(), AwsServiceError> {
1722    let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
1723    let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
1724
1725    let left_num = match extract_number(&left_val) {
1726        Some(n) => n,
1727        None if left_val.is_some() => {
1728            return Err(AwsServiceError::aws_error(
1729                StatusCode::BAD_REQUEST,
1730                "ValidationException",
1731                "An operand in the update expression has an incorrect data type",
1732            ));
1733        }
1734        None => 0.0,
1735    };
1736    let right_num = extract_number(&right_val).ok_or_else(|| {
1737        AwsServiceError::aws_error(
1738            StatusCode::BAD_REQUEST,
1739            "ValidationException",
1740            "An operand in the update expression has an incorrect data type",
1741        )
1742    })?;
1743
1744    let result = if is_add {
1745        left_num + right_num
1746    } else {
1747        left_num - right_num
1748    };
1749
1750    let num_str = if result == result.trunc() {
1751        format!("{}", result as i64)
1752    } else {
1753        format!("{result}")
1754    };
1755
1756    item.insert(attr.to_string(), json!({"N": num_str}));
1757    Ok(())
1758}
1759
1760/// Parse a trailing `[N]` list-index suffix off the LHS of a SET assignment.
1761/// Returns the bare attribute reference and the index, or None when the LHS
1762/// is a plain attribute (or a path shape we don't yet support).
1763fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
1764    let path = path.trim();
1765    if !path.ends_with(']') {
1766        return None;
1767    }
1768    let open = path.rfind('[')?;
1769    // Require no further `.` / `[` / `]` inside the bracketed portion and no
1770    // further path segments after — we only handle the single-index case
1771    // `name[N]`, not nested shapes like `a.b[0].c`.
1772    let idx_str = &path[open + 1..path.len() - 1];
1773    let idx: usize = idx_str.parse().ok()?;
1774    let name = &path[..open];
1775    if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
1776        return None;
1777    }
1778    Some((name, idx))
1779}
1780
1781/// Assign a value to a specific index of a `L`-typed attribute. If `idx` is
1782/// within the current list, replaces that slot; if it's at the end, appends.
1783/// AWS rejects writes beyond `len`, so we return a `ValidationException` for
1784/// out-of-range indices and non-list attributes.
1785fn assign_list_index(
1786    item: &mut HashMap<String, AttributeValue>,
1787    attr: &str,
1788    idx: usize,
1789    value: Value,
1790) -> Result<(), AwsServiceError> {
1791    let Some(existing) = item.get_mut(attr) else {
1792        return Err(AwsServiceError::aws_error(
1793            StatusCode::BAD_REQUEST,
1794            "ValidationException",
1795            "The document path provided in the update expression is invalid for update",
1796        ));
1797    };
1798    let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
1799        return Err(AwsServiceError::aws_error(
1800            StatusCode::BAD_REQUEST,
1801            "ValidationException",
1802            "The document path provided in the update expression is invalid for update",
1803        ));
1804    };
1805    if idx < list.len() {
1806        list[idx] = value;
1807    } else if idx == list.len() {
1808        list.push(value);
1809    } else {
1810        return Err(AwsServiceError::aws_error(
1811            StatusCode::BAD_REQUEST,
1812            "ValidationException",
1813            "The document path provided in the update expression is invalid for update",
1814        ));
1815    }
1816    Ok(())
1817}
1818
1819fn resolve_value(
1820    reference: &str,
1821    item: &HashMap<String, AttributeValue>,
1822    expr_attr_names: &HashMap<String, String>,
1823    expr_attr_values: &HashMap<String, Value>,
1824) -> Option<Value> {
1825    let reference = reference.trim();
1826    if reference.starts_with(':') {
1827        expr_attr_values.get(reference).cloned()
1828    } else {
1829        let attr_name = resolve_attr_name(reference, expr_attr_names);
1830        item.get(&attr_name).cloned()
1831    }
1832}
1833
1834fn extract_number(val: &Option<Value>) -> Option<f64> {
1835    val.as_ref()
1836        .and_then(|v| v.get("N"))
1837        .and_then(|n| n.as_str())
1838        .and_then(|s| s.parse().ok())
1839}
1840
1841fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
1842    let mut depth = 0;
1843    for (i, c) in expr.char_indices() {
1844        match c {
1845            '(' => depth += 1,
1846            ')' => depth -= 1,
1847            '+' if depth == 0 && i > 0 => {
1848                return Some((&expr[..i], &expr[i + 1..], true));
1849            }
1850            '-' if depth == 0 && i > 0 => {
1851                return Some((&expr[..i], &expr[i + 1..], false));
1852            }
1853            _ => {}
1854        }
1855    }
1856    None
1857}
1858
1859fn apply_add_assignment(
1860    item: &mut HashMap<String, AttributeValue>,
1861    assignment: &str,
1862    expr_attr_names: &HashMap<String, String>,
1863    expr_attr_values: &HashMap<String, Value>,
1864) -> Result<(), AwsServiceError> {
1865    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
1866    if parts.len() != 2 {
1867        return Ok(());
1868    }
1869
1870    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
1871    let val_ref = parts[1].trim();
1872    let add_val = expr_attr_values.get(val_ref);
1873
1874    if let Some(add_val) = add_val {
1875        if let Some(existing) = item.get(&attr) {
1876            if let (Some(existing_num), Some(add_num)) = (
1877                extract_number(&Some(existing.clone())),
1878                extract_number(&Some(add_val.clone())),
1879            ) {
1880                let result = existing_num + add_num;
1881                let num_str = if result == result.trunc() {
1882                    format!("{}", result as i64)
1883                } else {
1884                    format!("{result}")
1885                };
1886                item.insert(attr, json!({"N": num_str}));
1887            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
1888                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
1889                    let mut merged: Vec<Value> = existing_set.clone();
1890                    for v in add_set {
1891                        if !merged.contains(v) {
1892                            merged.push(v.clone());
1893                        }
1894                    }
1895                    item.insert(attr, json!({"SS": merged}));
1896                }
1897            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
1898                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
1899                    let mut merged: Vec<Value> = existing_set.clone();
1900                    for v in add_set {
1901                        if !merged.contains(v) {
1902                            merged.push(v.clone());
1903                        }
1904                    }
1905                    item.insert(attr, json!({"NS": merged}));
1906                }
1907            } else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
1908                if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
1909                    let mut merged: Vec<Value> = existing_set.clone();
1910                    for v in add_set {
1911                        if !merged.contains(v) {
1912                            merged.push(v.clone());
1913                        }
1914                    }
1915                    item.insert(attr, json!({"BS": merged}));
1916                }
1917            }
1918        } else {
1919            item.insert(attr, add_val.clone());
1920        }
1921    }
1922
1923    Ok(())
1924}
1925
1926fn apply_delete_assignment(
1927    item: &mut HashMap<String, AttributeValue>,
1928    assignment: &str,
1929    expr_attr_names: &HashMap<String, String>,
1930    expr_attr_values: &HashMap<String, Value>,
1931) -> Result<(), AwsServiceError> {
1932    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
1933    if parts.len() != 2 {
1934        return Ok(());
1935    }
1936
1937    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
1938    let val_ref = parts[1].trim();
1939    let del_val = expr_attr_values.get(val_ref);
1940
1941    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
1942        if let (Some(existing_set), Some(del_set)) = (
1943            existing.get("SS").and_then(|v| v.as_array()),
1944            del_val.get("SS").and_then(|v| v.as_array()),
1945        ) {
1946            let filtered: Vec<Value> = existing_set
1947                .iter()
1948                .filter(|v| !del_set.contains(v))
1949                .cloned()
1950                .collect();
1951            if filtered.is_empty() {
1952                item.remove(&attr);
1953            } else {
1954                item.insert(attr, json!({"SS": filtered}));
1955            }
1956        } else if let (Some(existing_set), Some(del_set)) = (
1957            existing.get("NS").and_then(|v| v.as_array()),
1958            del_val.get("NS").and_then(|v| v.as_array()),
1959        ) {
1960            let filtered: Vec<Value> = existing_set
1961                .iter()
1962                .filter(|v| !del_set.contains(v))
1963                .cloned()
1964                .collect();
1965            if filtered.is_empty() {
1966                item.remove(&attr);
1967            } else {
1968                item.insert(attr, json!({"NS": filtered}));
1969            }
1970        } else if let (Some(existing_set), Some(del_set)) = (
1971            existing.get("BS").and_then(|v| v.as_array()),
1972            del_val.get("BS").and_then(|v| v.as_array()),
1973        ) {
1974            let filtered: Vec<Value> = existing_set
1975                .iter()
1976                .filter(|v| !del_set.contains(v))
1977                .cloned()
1978                .collect();
1979            if filtered.is_empty() {
1980                item.remove(&attr);
1981            } else {
1982                item.insert(attr, json!({"BS": filtered}));
1983            }
1984        }
1985    }
1986
1987    Ok(())
1988}
1989
1990pub(super) struct TableDescriptionInput<'a> {
1991    pub arn: &'a str,
1992    pub table_id: &'a str,
1993    pub key_schema: &'a [KeySchemaElement],
1994    pub attribute_definitions: &'a [AttributeDefinition],
1995    pub provisioned_throughput: &'a ProvisionedThroughput,
1996    pub gsi: &'a [GlobalSecondaryIndex],
1997    pub lsi: &'a [LocalSecondaryIndex],
1998    pub billing_mode: &'a str,
1999    pub created_at: chrono::DateTime<chrono::Utc>,
2000    pub item_count: i64,
2001    pub size_bytes: i64,
2002    pub status: &'a str,
2003    pub deletion_protection_enabled: bool,
2004}
2005
2006fn build_table_description_json(input: &TableDescriptionInput<'_>) -> Value {
2007    let TableDescriptionInput {
2008        arn,
2009        table_id,
2010        key_schema,
2011        attribute_definitions,
2012        provisioned_throughput,
2013        gsi,
2014        lsi,
2015        billing_mode,
2016        created_at,
2017        item_count,
2018        size_bytes,
2019        status,
2020        deletion_protection_enabled,
2021    } = *input;
2022    let table_name = arn.rsplit('/').next().unwrap_or("");
2023    let creation_timestamp =
2024        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
2025
2026    let ks: Vec<Value> = key_schema
2027        .iter()
2028        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2029        .collect();
2030
2031    let ad: Vec<Value> = attribute_definitions
2032        .iter()
2033        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
2034        .collect();
2035
2036    let mut desc = json!({
2037        "TableName": table_name,
2038        "TableArn": arn,
2039        "TableId": table_id,
2040        "TableStatus": status,
2041        "KeySchema": ks,
2042        "AttributeDefinitions": ad,
2043        "CreationDateTime": creation_timestamp,
2044        "ItemCount": item_count,
2045        "TableSizeBytes": size_bytes,
2046        "BillingModeSummary": { "BillingMode": billing_mode },
2047        "DeletionProtectionEnabled": deletion_protection_enabled,
2048    });
2049
2050    if billing_mode != "PAY_PER_REQUEST" {
2051        desc["ProvisionedThroughput"] = json!({
2052            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
2053            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
2054            "NumberOfDecreasesToday": 0,
2055        });
2056    } else {
2057        desc["ProvisionedThroughput"] = json!({
2058            "ReadCapacityUnits": 0,
2059            "WriteCapacityUnits": 0,
2060            "NumberOfDecreasesToday": 0,
2061        });
2062    }
2063
2064    // Terraform's AWS provider now waits on WarmThroughput after CreateTable.
2065    // Real AWS returns an ACTIVE warm throughput object for active tables,
2066    // including PAY_PER_REQUEST tables. Returning null keeps the provider in a
2067    // perpetual "still creating" loop.
2068    if status == "ACTIVE" {
2069        desc["WarmThroughput"] = json!({
2070            "ReadUnitsPerSecond": 0,
2071            "WriteUnitsPerSecond": 0,
2072            "Status": "ACTIVE",
2073        });
2074    }
2075
2076    if !gsi.is_empty() {
2077        let gsi_json: Vec<Value> = gsi
2078            .iter()
2079            .map(|g| {
2080                let gks: Vec<Value> = g
2081                    .key_schema
2082                    .iter()
2083                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2084                    .collect();
2085                let mut idx = json!({
2086                    "IndexName": g.index_name,
2087                    "KeySchema": gks,
2088                    "Projection": { "ProjectionType": g.projection.projection_type },
2089                    "IndexStatus": "ACTIVE",
2090                    "IndexArn": format!("{arn}/index/{}", g.index_name),
2091                    "ItemCount": 0,
2092                    "IndexSizeBytes": 0,
2093                });
2094                if !g.projection.non_key_attributes.is_empty() {
2095                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
2096                }
2097                if let Some(ref pt) = g.provisioned_throughput {
2098                    idx["ProvisionedThroughput"] = json!({
2099                        "ReadCapacityUnits": pt.read_capacity_units,
2100                        "WriteCapacityUnits": pt.write_capacity_units,
2101                        "NumberOfDecreasesToday": 0,
2102                    });
2103                }
2104                idx
2105            })
2106            .collect();
2107        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
2108    }
2109
2110    if !lsi.is_empty() {
2111        let lsi_json: Vec<Value> = lsi
2112            .iter()
2113            .map(|l| {
2114                let lks: Vec<Value> = l
2115                    .key_schema
2116                    .iter()
2117                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2118                    .collect();
2119                let mut idx = json!({
2120                    "IndexName": l.index_name,
2121                    "KeySchema": lks,
2122                    "Projection": { "ProjectionType": l.projection.projection_type },
2123                    "IndexArn": format!("{arn}/index/{}", l.index_name),
2124                    "ItemCount": 0,
2125                    "IndexSizeBytes": 0,
2126                });
2127                if !l.projection.non_key_attributes.is_empty() {
2128                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
2129                }
2130                idx
2131            })
2132            .collect();
2133        desc["LocalSecondaryIndexes"] = json!(lsi_json);
2134    }
2135
2136    desc
2137}
2138
2139fn build_table_description(table: &DynamoTable) -> Value {
2140    let mut desc = build_table_description_json(&TableDescriptionInput {
2141        arn: &table.arn,
2142        table_id: &table.table_id,
2143        key_schema: &table.key_schema,
2144        attribute_definitions: &table.attribute_definitions,
2145        provisioned_throughput: &table.provisioned_throughput,
2146        gsi: &table.gsi,
2147        lsi: &table.lsi,
2148        billing_mode: &table.billing_mode,
2149        created_at: table.created_at,
2150        item_count: table.item_count,
2151        size_bytes: table.size_bytes,
2152        status: &table.status,
2153        deletion_protection_enabled: table.deletion_protection_enabled,
2154    });
2155
2156    // Add stream specification if streams are enabled
2157    if table.stream_enabled {
2158        if let Some(ref stream_arn) = table.stream_arn {
2159            desc["LatestStreamArn"] = json!(stream_arn);
2160            desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
2161        }
2162        if let Some(ref view_type) = table.stream_view_type {
2163            desc["StreamSpecification"] = json!({
2164                "StreamEnabled": true,
2165                "StreamViewType": view_type,
2166            });
2167        }
2168    }
2169
2170    // SSEDescription is only returned when the customer explicitly enabled
2171    // a KMS-backed SSE. Real AWS tables using the default AWS-owned key omit
2172    // this field entirely, and the Terraform provider's Read asserts
2173    // `server_side_encryption.#` == 0 in that case.
2174    if let Some(ref sse_type) = table.sse_type {
2175        let mut sse_desc = json!({
2176            "Status": "ENABLED",
2177            "SSEType": sse_type,
2178        });
2179        if let Some(ref key_arn) = table.sse_kms_key_arn {
2180            sse_desc["KMSMasterKeyArn"] = json!(key_arn);
2181        }
2182        desc["SSEDescription"] = sse_desc;
2183    }
2184
2185    desc
2186}
2187
2188fn execute_partiql_statement(
2189    state: &SharedDynamoDbState,
2190    statement: &str,
2191    parameters: &[Value],
2192) -> Result<AwsResponse, AwsServiceError> {
2193    let trimmed = statement.trim();
2194    let upper = trimmed.to_ascii_uppercase();
2195
2196    if upper.starts_with("SELECT") {
2197        execute_partiql_select(state, trimmed, parameters)
2198    } else if upper.starts_with("INSERT") {
2199        execute_partiql_insert(state, trimmed, parameters)
2200    } else if upper.starts_with("UPDATE") {
2201        execute_partiql_update(state, trimmed, parameters)
2202    } else if upper.starts_with("DELETE") {
2203        execute_partiql_delete(state, trimmed, parameters)
2204    } else {
2205        Err(AwsServiceError::aws_error(
2206            StatusCode::BAD_REQUEST,
2207            "ValidationException",
2208            format!("Unsupported PartiQL statement: {trimmed}"),
2209        ))
2210    }
2211}
2212
2213/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
2214fn execute_partiql_select(
2215    state: &SharedDynamoDbState,
2216    statement: &str,
2217    parameters: &[Value],
2218) -> Result<AwsResponse, AwsServiceError> {
2219    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
2220    let upper = statement.to_ascii_uppercase();
2221    let from_pos = upper.find("FROM").ok_or_else(|| {
2222        AwsServiceError::aws_error(
2223            StatusCode::BAD_REQUEST,
2224            "ValidationException",
2225            "Invalid SELECT statement: missing FROM",
2226        )
2227    })?;
2228
2229    let after_from = statement[from_pos + 4..].trim();
2230    let (table_name, rest) = parse_partiql_table_name(after_from);
2231
2232    let state = state.read();
2233    let table = get_table(&state.tables, &table_name)?;
2234
2235    let rest_upper = rest.trim().to_ascii_uppercase();
2236    if rest_upper.starts_with("WHERE") {
2237        let where_clause = rest.trim()[5..].trim();
2238        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
2239        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
2240        DynamoDbService::ok_json(json!({ "Items": items }))
2241    } else {
2242        // No WHERE, return all items
2243        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
2244        DynamoDbService::ok_json(json!({ "Items": items }))
2245    }
2246}
2247
2248fn execute_partiql_insert(
2249    state: &SharedDynamoDbState,
2250    statement: &str,
2251    parameters: &[Value],
2252) -> Result<AwsResponse, AwsServiceError> {
2253    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
2254    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
2255    let upper = statement.to_ascii_uppercase();
2256    let into_pos = upper.find("INTO").ok_or_else(|| {
2257        AwsServiceError::aws_error(
2258            StatusCode::BAD_REQUEST,
2259            "ValidationException",
2260            "Invalid INSERT statement: missing INTO",
2261        )
2262    })?;
2263
2264    let after_into = statement[into_pos + 4..].trim();
2265    let (table_name, rest) = parse_partiql_table_name(after_into);
2266
2267    let rest_upper = rest.trim().to_ascii_uppercase();
2268    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
2269        AwsServiceError::aws_error(
2270            StatusCode::BAD_REQUEST,
2271            "ValidationException",
2272            "Invalid INSERT statement: missing VALUE",
2273        )
2274    })?;
2275
2276    let value_str = rest.trim()[value_pos + 5..].trim();
2277    let item = parse_partiql_value_object(value_str, parameters)?;
2278
2279    let mut state = state.write();
2280    let table = get_table_mut(&mut state.tables, &table_name)?;
2281    let key = extract_key(table, &item);
2282    if table.find_item_index(&key).is_some() {
2283        // DynamoDB PartiQL INSERT fails if item exists
2284        return Err(AwsServiceError::aws_error(
2285            StatusCode::BAD_REQUEST,
2286            "DuplicateItemException",
2287            "Duplicate primary key exists in table",
2288        ));
2289    } else {
2290        table.items.push(item);
2291    }
2292    table.recalculate_stats();
2293
2294    DynamoDbService::ok_json(json!({}))
2295}
2296
2297fn execute_partiql_update(
2298    state: &SharedDynamoDbState,
2299    statement: &str,
2300    parameters: &[Value],
2301) -> Result<AwsResponse, AwsServiceError> {
2302    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
2303    // or: UPDATE "tablename" SET attr=? WHERE pk=?
2304    let after_update = statement[6..].trim(); // skip "UPDATE"
2305    let (table_name, rest) = parse_partiql_table_name(after_update);
2306
2307    let rest_upper = rest.trim().to_ascii_uppercase();
2308    let set_pos = rest_upper.find("SET").ok_or_else(|| {
2309        AwsServiceError::aws_error(
2310            StatusCode::BAD_REQUEST,
2311            "ValidationException",
2312            "Invalid UPDATE statement: missing SET",
2313        )
2314    })?;
2315
2316    let after_set = rest.trim()[set_pos + 3..].trim();
2317
2318    // Split on WHERE
2319    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
2320    let (set_clause, where_clause) = if let Some(wp) = where_pos {
2321        (&after_set[..wp], after_set[wp + 5..].trim())
2322    } else {
2323        (after_set, "")
2324    };
2325
2326    let mut state = state.write();
2327    let table = get_table_mut(&mut state.tables, &table_name)?;
2328
2329    let matched_indices = if !where_clause.is_empty() {
2330        find_partiql_where_indices(table, where_clause, parameters)?
2331    } else {
2332        (0..table.items.len()).collect()
2333    };
2334
2335    // Parse SET assignments: attr=value, attr2=value2
2336    let param_offset = count_params_in_str(where_clause);
2337    let assignments: Vec<&str> = set_clause.split(',').collect();
2338    for idx in &matched_indices {
2339        let mut local_offset = param_offset;
2340        for assignment in &assignments {
2341            let assignment = assignment.trim();
2342            if let Some((attr, val_str)) = assignment.split_once('=') {
2343                let attr = attr.trim().trim_matches('"');
2344                let val_str = val_str.trim();
2345                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
2346                if let Some(v) = value {
2347                    table.items[*idx].insert(attr.to_string(), v);
2348                }
2349            }
2350        }
2351    }
2352    table.recalculate_stats();
2353
2354    DynamoDbService::ok_json(json!({}))
2355}
2356
2357fn execute_partiql_delete(
2358    state: &SharedDynamoDbState,
2359    statement: &str,
2360    parameters: &[Value],
2361) -> Result<AwsResponse, AwsServiceError> {
2362    // Pattern: DELETE FROM "tablename" WHERE pk='val'
2363    let upper = statement.to_ascii_uppercase();
2364    let from_pos = upper.find("FROM").ok_or_else(|| {
2365        AwsServiceError::aws_error(
2366            StatusCode::BAD_REQUEST,
2367            "ValidationException",
2368            "Invalid DELETE statement: missing FROM",
2369        )
2370    })?;
2371
2372    let after_from = statement[from_pos + 4..].trim();
2373    let (table_name, rest) = parse_partiql_table_name(after_from);
2374
2375    let rest_upper = rest.trim().to_ascii_uppercase();
2376    if !rest_upper.starts_with("WHERE") {
2377        return Err(AwsServiceError::aws_error(
2378            StatusCode::BAD_REQUEST,
2379            "ValidationException",
2380            "DELETE requires a WHERE clause",
2381        ));
2382    }
2383    let where_clause = rest.trim()[5..].trim();
2384
2385    let mut state = state.write();
2386    let table = get_table_mut(&mut state.tables, &table_name)?;
2387
2388    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
2389    // Remove from highest index first to avoid invalidating lower indices
2390    indices.sort_unstable();
2391    indices.reverse();
2392    for idx in indices {
2393        table.items.remove(idx);
2394    }
2395    table.recalculate_stats();
2396
2397    DynamoDbService::ok_json(json!({}))
2398}
2399
2400/// Parse a table name that may be quoted with double quotes.
2401/// Returns (table_name, rest_of_string).
2402fn parse_partiql_table_name(s: &str) -> (String, &str) {
2403    let s = s.trim();
2404    if let Some(stripped) = s.strip_prefix('"') {
2405        // Quoted name
2406        if let Some(end) = stripped.find('"') {
2407            let name = &stripped[..end];
2408            let rest = &stripped[end + 1..];
2409            (name.to_string(), rest)
2410        } else {
2411            let end = s.find(' ').unwrap_or(s.len());
2412            (s[..end].trim_matches('"').to_string(), &s[end..])
2413        }
2414    } else {
2415        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
2416        (s[..end].to_string(), &s[end..])
2417    }
2418}
2419
2420/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
2421/// Returns matching items.
2422fn evaluate_partiql_where<'a>(
2423    table: &'a DynamoTable,
2424    where_clause: &str,
2425    parameters: &[Value],
2426) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
2427    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
2428    Ok(indices.iter().map(|i| &table.items[*i]).collect())
2429}
2430
2431fn find_partiql_where_indices(
2432    table: &DynamoTable,
2433    where_clause: &str,
2434    parameters: &[Value],
2435) -> Result<Vec<usize>, AwsServiceError> {
2436    let conditions = split_partiql_and_clauses(where_clause);
2437    let parsed_conditions = parse_partiql_equality_conditions(&conditions, parameters);
2438
2439    let mut indices = Vec::new();
2440    for (i, item) in table.items.iter().enumerate() {
2441        let all_match = parsed_conditions
2442            .iter()
2443            .all(|(attr, expected)| item.get(attr) == Some(expected));
2444        if all_match {
2445            indices.push(i);
2446        }
2447    }
2448
2449    Ok(indices)
2450}
2451
2452/// Split a PartiQL WHERE clause on case-insensitive ` AND ` boundaries.
2453fn split_partiql_and_clauses(where_clause: &str) -> Vec<&str> {
2454    let upper = where_clause.to_uppercase();
2455    if !upper.contains(" AND ") {
2456        return vec![where_clause.trim()];
2457    }
2458    let mut parts = Vec::new();
2459    let mut last = 0;
2460    for (i, _) in upper.match_indices(" AND ") {
2461        parts.push(where_clause[last..i].trim());
2462        last = i + 5;
2463    }
2464    parts.push(where_clause[last..].trim());
2465    parts
2466}
2467
2468/// Parse each `col = literal` (or `col = ?`) condition into an
2469/// `(attribute_name, expected_AttributeValue)` pair. Conditions that
2470/// don't parse as equality, or whose RHS literal can't be resolved, are
2471/// silently dropped — that mirrors the prior inline behavior.
2472fn parse_partiql_equality_conditions(
2473    conditions: &[&str],
2474    parameters: &[Value],
2475) -> Vec<(String, Value)> {
2476    let mut param_idx = 0usize;
2477    let mut parsed = Vec::new();
2478    for cond in conditions {
2479        let cond = cond.trim();
2480        if let Some((left, right)) = cond.split_once('=') {
2481            let attr = left.trim().trim_matches('"').to_string();
2482            let val_str = right.trim();
2483            if let Some(value) = parse_partiql_literal(val_str, parameters, &mut param_idx) {
2484                parsed.push((attr, value));
2485            }
2486        }
2487    }
2488    parsed
2489}
2490
2491/// Parse a PartiQL literal value. Supports:
2492/// - 'string' -> {"S": "string"}
2493/// - 123 -> {"N": "123"}
2494/// - ? -> parameter from list
2495fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
2496    let s = s.trim();
2497    if s == "?" {
2498        let idx = *param_idx;
2499        *param_idx += 1;
2500        parameters.get(idx).cloned()
2501    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
2502        let inner = &s[1..s.len() - 1];
2503        Some(json!({"S": inner}))
2504    } else if let Ok(n) = s.parse::<f64>() {
2505        let num_str = if n == n.trunc() {
2506            format!("{}", n as i64)
2507        } else {
2508            format!("{n}")
2509        };
2510        Some(json!({"N": num_str}))
2511    } else {
2512        None
2513    }
2514}
2515
2516/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
2517fn parse_partiql_value_object(
2518    s: &str,
2519    parameters: &[Value],
2520) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
2521    let s = s.trim();
2522    let inner = s
2523        .strip_prefix('{')
2524        .and_then(|s| s.strip_suffix('}'))
2525        .ok_or_else(|| {
2526            AwsServiceError::aws_error(
2527                StatusCode::BAD_REQUEST,
2528                "ValidationException",
2529                "Invalid VALUE: expected object literal",
2530            )
2531        })?;
2532
2533    let mut item = HashMap::new();
2534    let mut param_idx = 0usize;
2535
2536    // Simple comma-separated key:value parsing
2537    for pair in split_partiql_pairs(inner) {
2538        let pair = pair.trim();
2539        if pair.is_empty() {
2540            continue;
2541        }
2542        if let Some((key_part, val_part)) = pair.split_once(':') {
2543            let key = key_part
2544                .trim()
2545                .trim_matches('\'')
2546                .trim_matches('"')
2547                .to_string();
2548            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
2549                item.insert(key, val);
2550            }
2551        }
2552    }
2553
2554    Ok(item)
2555}
2556
2557/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
2558fn split_partiql_pairs(s: &str) -> Vec<&str> {
2559    let mut parts = Vec::new();
2560    let mut start = 0;
2561    let mut depth = 0;
2562    let mut in_quote = false;
2563
2564    for (i, c) in s.char_indices() {
2565        match c {
2566            '\'' if !in_quote => in_quote = true,
2567            '\'' if in_quote => in_quote = false,
2568            '{' if !in_quote => depth += 1,
2569            '}' if !in_quote => depth -= 1,
2570            ',' if !in_quote && depth == 0 => {
2571                parts.push(&s[start..i]);
2572                start = i + 1;
2573            }
2574            _ => {}
2575        }
2576    }
2577    parts.push(&s[start..]);
2578    parts
2579}
2580
2581/// Count ? parameters in a string.
2582fn count_params_in_str(s: &str) -> usize {
2583    s.chars().filter(|c| *c == '?').count()
2584}
2585
2586#[cfg(test)]
2587mod tests {
2588    use super::*;
2589    use serde_json::json;
2590
2591    #[test]
2592    fn test_parse_update_clauses_set() {
2593        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
2594        assert_eq!(clauses.len(), 1);
2595        assert_eq!(clauses[0].0, UpdateAction::Set);
2596        assert_eq!(clauses[0].1.len(), 2);
2597    }
2598
2599    #[test]
2600    fn test_parse_update_clauses_set_and_remove() {
2601        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
2602        assert_eq!(clauses.len(), 2);
2603        assert_eq!(clauses[0].0, UpdateAction::Set);
2604        assert_eq!(clauses[1].0, UpdateAction::Remove);
2605    }
2606
2607    #[test]
2608    fn test_parse_update_clauses_list_append_single_assignment() {
2609        // Before fix: naive comma split tore list_append(#0, :0) at the
2610        // inner comma, producing two bogus assignments instead of one.
2611        let clauses = parse_update_clauses("SET #0 = list_append(#0, :0)");
2612        assert_eq!(clauses.len(), 1);
2613        assert_eq!(clauses[0].0, UpdateAction::Set);
2614        assert_eq!(
2615            clauses[0].1.len(),
2616            1,
2617            "list_append(a, b) must be kept as a single assignment, not split at the inner comma"
2618        );
2619    }
2620
2621    #[test]
2622    fn test_parse_update_clauses_list_append_mixed_with_plain_set() {
2623        // list_append assignment followed by a plain SET — the comma between
2624        // the two assignments must still split them, while the comma inside
2625        // the list_append call must not.
2626        let clauses = parse_update_clauses("SET #0 = list_append(#0, :new), #1 = :other");
2627        assert_eq!(clauses.len(), 1);
2628        assert_eq!(clauses[0].0, UpdateAction::Set);
2629        assert_eq!(
2630            clauses[0].1.len(),
2631            2,
2632            "two SET assignments: one list_append and one plain"
2633        );
2634    }
2635
2636    #[test]
2637    fn test_evaluate_key_condition_simple() {
2638        let mut item = HashMap::new();
2639        item.insert("pk".to_string(), json!({"S": "user1"}));
2640        item.insert("sk".to_string(), json!({"S": "order1"}));
2641
2642        let mut expr_values = HashMap::new();
2643        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
2644
2645        assert!(evaluate_key_condition(
2646            "pk = :pk",
2647            &item,
2648            "pk",
2649            Some("sk"),
2650            &HashMap::new(),
2651            &expr_values,
2652        ));
2653    }
2654
2655    #[test]
2656    fn test_compare_attribute_values_numbers() {
2657        let a = json!({"N": "10"});
2658        let b = json!({"N": "20"});
2659        assert_eq!(
2660            compare_attribute_values(Some(&a), Some(&b)),
2661            std::cmp::Ordering::Less
2662        );
2663    }
2664
2665    #[test]
2666    fn test_compare_attribute_values_strings() {
2667        let a = json!({"S": "apple"});
2668        let b = json!({"S": "banana"});
2669        assert_eq!(
2670            compare_attribute_values(Some(&a), Some(&b)),
2671            std::cmp::Ordering::Less
2672        );
2673    }
2674
2675    #[test]
2676    fn test_split_on_and() {
2677        let parts = split_on_and("pk = :pk AND sk > :sk");
2678        assert_eq!(parts.len(), 2);
2679        assert_eq!(parts[0].trim(), "pk = :pk");
2680        assert_eq!(parts[1].trim(), "sk > :sk");
2681    }
2682
2683    #[test]
2684    fn test_split_on_and_respects_parentheses() {
2685        // Before fix: split_on_and would split inside the parens
2686        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
2687        // Should NOT split on the AND inside parentheses
2688        assert_eq!(parts.len(), 1);
2689        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
2690    }
2691
2692    #[test]
2693    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
2694        // (a AND b) OR c — should match when c is true but a is false
2695        let mut item = HashMap::new();
2696        item.insert("x".to_string(), json!({"S": "no"}));
2697        item.insert("y".to_string(), json!({"S": "no"}));
2698        item.insert("z".to_string(), json!({"S": "yes"}));
2699
2700        let mut expr_values = HashMap::new();
2701        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
2702
2703        // x=yes AND y=yes => false, but z=yes => true => overall true
2704        let result = evaluate_filter_expression(
2705            "(x = :yes AND y = :yes) OR z = :yes",
2706            &item,
2707            &HashMap::new(),
2708            &expr_values,
2709        );
2710        assert!(result, "should match because z = :yes is true");
2711
2712        // x=yes AND y=yes => false, z=yes => false => overall false
2713        let mut item2 = HashMap::new();
2714        item2.insert("x".to_string(), json!({"S": "no"}));
2715        item2.insert("y".to_string(), json!({"S": "no"}));
2716        item2.insert("z".to_string(), json!({"S": "no"}));
2717
2718        let result2 = evaluate_filter_expression(
2719            "(x = :yes AND y = :yes) OR z = :yes",
2720            &item2,
2721            &HashMap::new(),
2722            &expr_values,
2723        );
2724        assert!(!result2, "should not match because nothing is true");
2725    }
2726
2727    #[test]
2728    fn test_project_item_nested_path() {
2729        // Item with a list attribute containing maps
2730        let mut item = HashMap::new();
2731        item.insert("pk".to_string(), json!({"S": "key1"}));
2732        item.insert(
2733            "data".to_string(),
2734            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
2735        );
2736
2737        let body = json!({
2738            "ProjectionExpression": "data[0].name"
2739        });
2740
2741        let projected = project_item(&item, &body);
2742        // Should contain data[0].name = "Alice", not the entire data[0] element
2743        let name = projected
2744            .get("data")
2745            .and_then(|v| v.get("L"))
2746            .and_then(|v| v.get(0))
2747            .and_then(|v| v.get("M"))
2748            .and_then(|v| v.get("name"))
2749            .and_then(|v| v.get("S"))
2750            .and_then(|v| v.as_str());
2751        assert_eq!(name, Some("Alice"));
2752
2753        // Should NOT contain the "age" field
2754        let age = projected
2755            .get("data")
2756            .and_then(|v| v.get("L"))
2757            .and_then(|v| v.get(0))
2758            .and_then(|v| v.get("M"))
2759            .and_then(|v| v.get("age"));
2760        assert!(age.is_none(), "age should not be present in projection");
2761    }
2762
2763    #[test]
2764    fn test_resolve_nested_path_map() {
2765        let mut item = HashMap::new();
2766        item.insert(
2767            "info".to_string(),
2768            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
2769        );
2770
2771        let result = resolve_nested_path(&item, "info.address.city");
2772        assert_eq!(result, Some(json!({"S": "NYC"})));
2773    }
2774
2775    #[test]
2776    fn test_resolve_nested_path_list_then_map() {
2777        let mut item = HashMap::new();
2778        item.insert(
2779            "items".to_string(),
2780            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
2781        );
2782
2783        let result = resolve_nested_path(&item, "items[0].sku");
2784        assert_eq!(result, Some(json!({"S": "ABC"})));
2785    }
2786
2787    // -- Integration-style tests using DynamoDbService --
2788
2789    use crate::state::SharedDynamoDbState;
2790    use parking_lot::RwLock;
2791    use std::sync::Arc;
2792
2793    fn make_service() -> DynamoDbService {
2794        let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
2795            "123456789012",
2796            "us-east-1",
2797        )));
2798        DynamoDbService::new(state)
2799    }
2800
2801    fn make_request(action: &str, body: Value) -> AwsRequest {
2802        AwsRequest {
2803            service: "dynamodb".to_string(),
2804            action: action.to_string(),
2805            region: "us-east-1".to_string(),
2806            account_id: "123456789012".to_string(),
2807            request_id: "test-id".to_string(),
2808            headers: http::HeaderMap::new(),
2809            query_params: HashMap::new(),
2810            body: serde_json::to_vec(&body).unwrap().into(),
2811            path_segments: vec![],
2812            raw_path: "/".to_string(),
2813            raw_query: String::new(),
2814            method: http::Method::POST,
2815            is_query_protocol: false,
2816            access_key_id: None,
2817            principal: None,
2818        }
2819    }
2820
2821    fn create_test_table(svc: &DynamoDbService) {
2822        let req = make_request(
2823            "CreateTable",
2824            json!({
2825                "TableName": "test-table",
2826                "KeySchema": [
2827                    { "AttributeName": "pk", "KeyType": "HASH" }
2828                ],
2829                "AttributeDefinitions": [
2830                    { "AttributeName": "pk", "AttributeType": "S" }
2831                ],
2832                "BillingMode": "PAY_PER_REQUEST"
2833            }),
2834        );
2835        svc.create_table(&req).unwrap();
2836    }
2837
2838    #[test]
2839    fn describe_table_returns_stable_table_id_and_active_warm_throughput() {
2840        let svc = make_service();
2841        let req = make_request(
2842            "CreateTable",
2843            json!({
2844                "TableName": "warm-throughput-table",
2845                "KeySchema": [
2846                    { "AttributeName": "pk", "KeyType": "HASH" }
2847                ],
2848                "AttributeDefinitions": [
2849                    { "AttributeName": "pk", "AttributeType": "S" }
2850                ],
2851                "BillingMode": "PAY_PER_REQUEST"
2852            }),
2853        );
2854        let create_resp = svc.create_table(&req).unwrap();
2855        let create_body: Value = serde_json::from_slice(create_resp.body.expect_bytes()).unwrap();
2856        let create_table = &create_body["TableDescription"];
2857
2858        assert_eq!(create_table["TableStatus"], "ACTIVE");
2859        assert_eq!(create_table["WarmThroughput"]["Status"], "ACTIVE");
2860        let table_id = create_table["TableId"].as_str().unwrap().to_string();
2861        assert!(!table_id.is_empty());
2862
2863        let describe_req = make_request(
2864            "DescribeTable",
2865            json!({ "TableName": "warm-throughput-table" }),
2866        );
2867        let describe_resp = svc.describe_table(&describe_req).unwrap();
2868        let describe_body: Value =
2869            serde_json::from_slice(describe_resp.body.expect_bytes()).unwrap();
2870        let described_table = &describe_body["Table"];
2871
2872        assert_eq!(described_table["TableStatus"], "ACTIVE");
2873        assert_eq!(described_table["WarmThroughput"]["Status"], "ACTIVE");
2874        assert_eq!(described_table["TableId"], table_id);
2875
2876        let describe_resp_again = svc.describe_table(&describe_req).unwrap();
2877        let describe_body_again: Value =
2878            serde_json::from_slice(describe_resp_again.body.expect_bytes()).unwrap();
2879        assert_eq!(describe_body_again["Table"]["TableId"], table_id);
2880    }
2881
2882    #[test]
2883    fn delete_item_return_values_all_old() {
2884        let svc = make_service();
2885        create_test_table(&svc);
2886
2887        // Put an item
2888        let req = make_request(
2889            "PutItem",
2890            json!({
2891                "TableName": "test-table",
2892                "Item": {
2893                    "pk": { "S": "key1" },
2894                    "name": { "S": "Alice" },
2895                    "age": { "N": "30" }
2896                }
2897            }),
2898        );
2899        svc.put_item(&req).unwrap();
2900
2901        // Delete with ReturnValues=ALL_OLD
2902        let req = make_request(
2903            "DeleteItem",
2904            json!({
2905                "TableName": "test-table",
2906                "Key": { "pk": { "S": "key1" } },
2907                "ReturnValues": "ALL_OLD"
2908            }),
2909        );
2910        let resp = svc.delete_item(&req).unwrap();
2911        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2912
2913        // Verify the old item is returned
2914        let attrs = &body["Attributes"];
2915        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
2916        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
2917        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
2918
2919        // Verify the item is actually deleted
2920        let req = make_request(
2921            "GetItem",
2922            json!({
2923                "TableName": "test-table",
2924                "Key": { "pk": { "S": "key1" } }
2925            }),
2926        );
2927        let resp = svc.get_item(&req).unwrap();
2928        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2929        assert!(body.get("Item").is_none(), "item should be deleted");
2930    }
2931
2932    #[test]
2933    fn transact_get_items_returns_existing_and_missing() {
2934        let svc = make_service();
2935        create_test_table(&svc);
2936
2937        // Put one item
2938        let req = make_request(
2939            "PutItem",
2940            json!({
2941                "TableName": "test-table",
2942                "Item": {
2943                    "pk": { "S": "exists" },
2944                    "val": { "S": "hello" }
2945                }
2946            }),
2947        );
2948        svc.put_item(&req).unwrap();
2949
2950        let req = make_request(
2951            "TransactGetItems",
2952            json!({
2953                "TransactItems": [
2954                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
2955                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
2956                ]
2957            }),
2958        );
2959        let resp = svc.transact_get_items(&req).unwrap();
2960        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2961        let responses = body["Responses"].as_array().unwrap();
2962        assert_eq!(responses.len(), 2);
2963        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
2964        assert!(responses[1].get("Item").is_none());
2965    }
2966
2967    #[test]
2968    fn transact_write_items_put_and_delete() {
2969        let svc = make_service();
2970        create_test_table(&svc);
2971
2972        // Put initial item
2973        let req = make_request(
2974            "PutItem",
2975            json!({
2976                "TableName": "test-table",
2977                "Item": {
2978                    "pk": { "S": "to-delete" },
2979                    "val": { "S": "bye" }
2980                }
2981            }),
2982        );
2983        svc.put_item(&req).unwrap();
2984
2985        // TransactWrite: put new + delete existing
2986        let req = make_request(
2987            "TransactWriteItems",
2988            json!({
2989                "TransactItems": [
2990                    {
2991                        "Put": {
2992                            "TableName": "test-table",
2993                            "Item": {
2994                                "pk": { "S": "new-item" },
2995                                "val": { "S": "hi" }
2996                            }
2997                        }
2998                    },
2999                    {
3000                        "Delete": {
3001                            "TableName": "test-table",
3002                            "Key": { "pk": { "S": "to-delete" } }
3003                        }
3004                    }
3005                ]
3006            }),
3007        );
3008        let resp = svc.transact_write_items(&req).unwrap();
3009        assert_eq!(resp.status, StatusCode::OK);
3010
3011        // Verify new item exists
3012        let req = make_request(
3013            "GetItem",
3014            json!({
3015                "TableName": "test-table",
3016                "Key": { "pk": { "S": "new-item" } }
3017            }),
3018        );
3019        let resp = svc.get_item(&req).unwrap();
3020        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3021        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
3022
3023        // Verify deleted item is gone
3024        let req = make_request(
3025            "GetItem",
3026            json!({
3027                "TableName": "test-table",
3028                "Key": { "pk": { "S": "to-delete" } }
3029            }),
3030        );
3031        let resp = svc.get_item(&req).unwrap();
3032        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3033        assert!(body.get("Item").is_none());
3034    }
3035
3036    #[test]
3037    fn transact_write_items_condition_check_failure() {
3038        let svc = make_service();
3039        create_test_table(&svc);
3040
3041        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
3042        let req = make_request(
3043            "TransactWriteItems",
3044            json!({
3045                "TransactItems": [
3046                    {
3047                        "ConditionCheck": {
3048                            "TableName": "test-table",
3049                            "Key": { "pk": { "S": "nonexistent" } },
3050                            "ConditionExpression": "attribute_exists(pk)"
3051                        }
3052                    }
3053                ]
3054            }),
3055        );
3056        let resp = svc.transact_write_items(&req).unwrap();
3057        // Should be a 400 error response
3058        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
3059        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3060        assert_eq!(
3061            body["__type"].as_str().unwrap(),
3062            "TransactionCanceledException"
3063        );
3064        assert!(body["CancellationReasons"].as_array().is_some());
3065    }
3066
3067    #[test]
3068    fn update_and_describe_time_to_live() {
3069        let svc = make_service();
3070        create_test_table(&svc);
3071
3072        // Enable TTL
3073        let req = make_request(
3074            "UpdateTimeToLive",
3075            json!({
3076                "TableName": "test-table",
3077                "TimeToLiveSpecification": {
3078                    "AttributeName": "ttl",
3079                    "Enabled": true
3080                }
3081            }),
3082        );
3083        let resp = svc.update_time_to_live(&req).unwrap();
3084        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3085        assert_eq!(
3086            body["TimeToLiveSpecification"]["AttributeName"]
3087                .as_str()
3088                .unwrap(),
3089            "ttl"
3090        );
3091        assert!(body["TimeToLiveSpecification"]["Enabled"]
3092            .as_bool()
3093            .unwrap());
3094
3095        // Describe TTL
3096        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3097        let resp = svc.describe_time_to_live(&req).unwrap();
3098        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3099        assert_eq!(
3100            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3101                .as_str()
3102                .unwrap(),
3103            "ENABLED"
3104        );
3105        assert_eq!(
3106            body["TimeToLiveDescription"]["AttributeName"]
3107                .as_str()
3108                .unwrap(),
3109            "ttl"
3110        );
3111
3112        // Disable TTL
3113        let req = make_request(
3114            "UpdateTimeToLive",
3115            json!({
3116                "TableName": "test-table",
3117                "TimeToLiveSpecification": {
3118                    "AttributeName": "ttl",
3119                    "Enabled": false
3120                }
3121            }),
3122        );
3123        svc.update_time_to_live(&req).unwrap();
3124
3125        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3126        let resp = svc.describe_time_to_live(&req).unwrap();
3127        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3128        assert_eq!(
3129            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3130                .as_str()
3131                .unwrap(),
3132            "DISABLED"
3133        );
3134    }
3135
3136    #[test]
3137    fn resource_policy_lifecycle() {
3138        let svc = make_service();
3139        create_test_table(&svc);
3140
3141        let table_arn = {
3142            let state = svc.state.read();
3143            state.tables.get("test-table").unwrap().arn.clone()
3144        };
3145
3146        // Put policy
3147        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
3148        let req = make_request(
3149            "PutResourcePolicy",
3150            json!({
3151                "ResourceArn": table_arn,
3152                "Policy": policy_doc
3153            }),
3154        );
3155        let resp = svc.put_resource_policy(&req).unwrap();
3156        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3157        assert!(body["RevisionId"].as_str().is_some());
3158
3159        // Get policy
3160        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3161        let resp = svc.get_resource_policy(&req).unwrap();
3162        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3163        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
3164
3165        // Delete policy
3166        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
3167        svc.delete_resource_policy(&req).unwrap();
3168
3169        // Get should return null now
3170        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3171        let resp = svc.get_resource_policy(&req).unwrap();
3172        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3173        assert!(body["Policy"].is_null());
3174    }
3175
3176    #[test]
3177    fn describe_endpoints() {
3178        let svc = make_service();
3179        let req = make_request("DescribeEndpoints", json!({}));
3180        let resp = svc.describe_endpoints(&req).unwrap();
3181        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3182        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
3183    }
3184
3185    #[test]
3186    fn describe_limits() {
3187        let svc = make_service();
3188        let req = make_request("DescribeLimits", json!({}));
3189        let resp = svc.describe_limits(&req).unwrap();
3190        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3191        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
3192    }
3193
3194    #[test]
3195    fn backup_lifecycle() {
3196        let svc = make_service();
3197        create_test_table(&svc);
3198
3199        // Create backup
3200        let req = make_request(
3201            "CreateBackup",
3202            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
3203        );
3204        let resp = svc.create_backup(&req).unwrap();
3205        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3206        let backup_arn = body["BackupDetails"]["BackupArn"]
3207            .as_str()
3208            .unwrap()
3209            .to_string();
3210        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
3211
3212        // Describe backup
3213        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
3214        let resp = svc.describe_backup(&req).unwrap();
3215        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3216        assert_eq!(
3217            body["BackupDescription"]["BackupDetails"]["BackupName"],
3218            "my-backup"
3219        );
3220
3221        // List backups
3222        let req = make_request("ListBackups", json!({}));
3223        let resp = svc.list_backups(&req).unwrap();
3224        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3225        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
3226
3227        // Restore from backup
3228        let req = make_request(
3229            "RestoreTableFromBackup",
3230            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
3231        );
3232        svc.restore_table_from_backup(&req).unwrap();
3233
3234        // Verify restored table exists
3235        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
3236        let resp = svc.describe_table(&req).unwrap();
3237        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3238        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3239
3240        // Delete backup
3241        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
3242        svc.delete_backup(&req).unwrap();
3243
3244        // List should be empty
3245        let req = make_request("ListBackups", json!({}));
3246        let resp = svc.list_backups(&req).unwrap();
3247        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3248        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
3249    }
3250
3251    #[test]
3252    fn continuous_backups() {
3253        let svc = make_service();
3254        create_test_table(&svc);
3255
3256        // Initially disabled
3257        let req = make_request(
3258            "DescribeContinuousBackups",
3259            json!({ "TableName": "test-table" }),
3260        );
3261        let resp = svc.describe_continuous_backups(&req).unwrap();
3262        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3263        assert_eq!(
3264            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3265                ["PointInTimeRecoveryStatus"],
3266            "DISABLED"
3267        );
3268
3269        // Enable
3270        let req = make_request(
3271            "UpdateContinuousBackups",
3272            json!({
3273                "TableName": "test-table",
3274                "PointInTimeRecoverySpecification": {
3275                    "PointInTimeRecoveryEnabled": true
3276                }
3277            }),
3278        );
3279        svc.update_continuous_backups(&req).unwrap();
3280
3281        // Verify
3282        let req = make_request(
3283            "DescribeContinuousBackups",
3284            json!({ "TableName": "test-table" }),
3285        );
3286        let resp = svc.describe_continuous_backups(&req).unwrap();
3287        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3288        assert_eq!(
3289            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3290                ["PointInTimeRecoveryStatus"],
3291            "ENABLED"
3292        );
3293    }
3294
3295    #[test]
3296    fn restore_table_to_point_in_time() {
3297        let svc = make_service();
3298        create_test_table(&svc);
3299
3300        let req = make_request(
3301            "RestoreTableToPointInTime",
3302            json!({
3303                "SourceTableName": "test-table",
3304                "TargetTableName": "pitr-restored"
3305            }),
3306        );
3307        svc.restore_table_to_point_in_time(&req).unwrap();
3308
3309        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
3310        let resp = svc.describe_table(&req).unwrap();
3311        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3312        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3313    }
3314
3315    #[test]
3316    fn global_table_lifecycle() {
3317        let svc = make_service();
3318
3319        // Create global table
3320        let req = make_request(
3321            "CreateGlobalTable",
3322            json!({
3323                "GlobalTableName": "my-global",
3324                "ReplicationGroup": [
3325                    { "RegionName": "us-east-1" },
3326                    { "RegionName": "eu-west-1" }
3327                ]
3328            }),
3329        );
3330        let resp = svc.create_global_table(&req).unwrap();
3331        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3332        assert_eq!(
3333            body["GlobalTableDescription"]["GlobalTableStatus"],
3334            "ACTIVE"
3335        );
3336
3337        // Describe
3338        let req = make_request(
3339            "DescribeGlobalTable",
3340            json!({ "GlobalTableName": "my-global" }),
3341        );
3342        let resp = svc.describe_global_table(&req).unwrap();
3343        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3344        assert_eq!(
3345            body["GlobalTableDescription"]["ReplicationGroup"]
3346                .as_array()
3347                .unwrap()
3348                .len(),
3349            2
3350        );
3351
3352        // List
3353        let req = make_request("ListGlobalTables", json!({}));
3354        let resp = svc.list_global_tables(&req).unwrap();
3355        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3356        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
3357
3358        // Update - add a region
3359        let req = make_request(
3360            "UpdateGlobalTable",
3361            json!({
3362                "GlobalTableName": "my-global",
3363                "ReplicaUpdates": [
3364                    { "Create": { "RegionName": "ap-southeast-1" } }
3365                ]
3366            }),
3367        );
3368        let resp = svc.update_global_table(&req).unwrap();
3369        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3370        assert_eq!(
3371            body["GlobalTableDescription"]["ReplicationGroup"]
3372                .as_array()
3373                .unwrap()
3374                .len(),
3375            3
3376        );
3377
3378        // Describe settings
3379        let req = make_request(
3380            "DescribeGlobalTableSettings",
3381            json!({ "GlobalTableName": "my-global" }),
3382        );
3383        let resp = svc.describe_global_table_settings(&req).unwrap();
3384        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3385        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
3386
3387        // Update settings (no-op, just verify no error)
3388        let req = make_request(
3389            "UpdateGlobalTableSettings",
3390            json!({ "GlobalTableName": "my-global" }),
3391        );
3392        svc.update_global_table_settings(&req).unwrap();
3393    }
3394
3395    #[test]
3396    fn table_replica_auto_scaling() {
3397        let svc = make_service();
3398        create_test_table(&svc);
3399
3400        let req = make_request(
3401            "DescribeTableReplicaAutoScaling",
3402            json!({ "TableName": "test-table" }),
3403        );
3404        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
3405        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3406        assert_eq!(
3407            body["TableAutoScalingDescription"]["TableName"],
3408            "test-table"
3409        );
3410
3411        let req = make_request(
3412            "UpdateTableReplicaAutoScaling",
3413            json!({ "TableName": "test-table" }),
3414        );
3415        svc.update_table_replica_auto_scaling(&req).unwrap();
3416    }
3417
3418    #[test]
3419    fn kinesis_streaming_lifecycle() {
3420        let svc = make_service();
3421        create_test_table(&svc);
3422
3423        // Enable
3424        let req = make_request(
3425            "EnableKinesisStreamingDestination",
3426            json!({
3427                "TableName": "test-table",
3428                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3429            }),
3430        );
3431        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
3432        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3433        assert_eq!(body["DestinationStatus"], "ACTIVE");
3434
3435        // Describe
3436        let req = make_request(
3437            "DescribeKinesisStreamingDestination",
3438            json!({ "TableName": "test-table" }),
3439        );
3440        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
3441        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3442        assert_eq!(
3443            body["KinesisDataStreamDestinations"]
3444                .as_array()
3445                .unwrap()
3446                .len(),
3447            1
3448        );
3449
3450        // Update
3451        let req = make_request(
3452            "UpdateKinesisStreamingDestination",
3453            json!({
3454                "TableName": "test-table",
3455                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
3456                "UpdateKinesisStreamingConfiguration": {
3457                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
3458                }
3459            }),
3460        );
3461        svc.update_kinesis_streaming_destination(&req).unwrap();
3462
3463        // Disable
3464        let req = make_request(
3465            "DisableKinesisStreamingDestination",
3466            json!({
3467                "TableName": "test-table",
3468                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3469            }),
3470        );
3471        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
3472        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3473        assert_eq!(body["DestinationStatus"], "DISABLED");
3474    }
3475
3476    #[test]
3477    fn contributor_insights_lifecycle() {
3478        let svc = make_service();
3479        create_test_table(&svc);
3480
3481        // Initially disabled
3482        let req = make_request(
3483            "DescribeContributorInsights",
3484            json!({ "TableName": "test-table" }),
3485        );
3486        let resp = svc.describe_contributor_insights(&req).unwrap();
3487        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3488        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
3489
3490        // Enable
3491        let req = make_request(
3492            "UpdateContributorInsights",
3493            json!({
3494                "TableName": "test-table",
3495                "ContributorInsightsAction": "ENABLE"
3496            }),
3497        );
3498        let resp = svc.update_contributor_insights(&req).unwrap();
3499        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3500        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
3501
3502        // List
3503        let req = make_request("ListContributorInsights", json!({}));
3504        let resp = svc.list_contributor_insights(&req).unwrap();
3505        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3506        assert_eq!(
3507            body["ContributorInsightsSummaries"]
3508                .as_array()
3509                .unwrap()
3510                .len(),
3511            1
3512        );
3513    }
3514
3515    #[test]
3516    fn export_lifecycle() {
3517        let svc = make_service();
3518        create_test_table(&svc);
3519
3520        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
3521
3522        // Export
3523        let req = make_request(
3524            "ExportTableToPointInTime",
3525            json!({
3526                "TableArn": table_arn,
3527                "S3Bucket": "my-bucket"
3528            }),
3529        );
3530        let resp = svc.export_table_to_point_in_time(&req).unwrap();
3531        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3532        let export_arn = body["ExportDescription"]["ExportArn"]
3533            .as_str()
3534            .unwrap()
3535            .to_string();
3536        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
3537
3538        // Describe
3539        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
3540        let resp = svc.describe_export(&req).unwrap();
3541        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3542        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
3543
3544        // List
3545        let req = make_request("ListExports", json!({}));
3546        let resp = svc.list_exports(&req).unwrap();
3547        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3548        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
3549    }
3550
3551    #[test]
3552    fn import_lifecycle() {
3553        let svc = make_service();
3554
3555        let req = make_request(
3556            "ImportTable",
3557            json!({
3558                "InputFormat": "DYNAMODB_JSON",
3559                "S3BucketSource": { "S3Bucket": "import-bucket" },
3560                "TableCreationParameters": {
3561                    "TableName": "imported-table",
3562                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
3563                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
3564                }
3565            }),
3566        );
3567        let resp = svc.import_table(&req).unwrap();
3568        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3569        let import_arn = body["ImportTableDescription"]["ImportArn"]
3570            .as_str()
3571            .unwrap()
3572            .to_string();
3573        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3574
3575        // Describe import
3576        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
3577        let resp = svc.describe_import(&req).unwrap();
3578        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3579        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3580
3581        // List imports
3582        let req = make_request("ListImports", json!({}));
3583        let resp = svc.list_imports(&req).unwrap();
3584        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3585        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
3586
3587        // Verify the table was created
3588        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
3589        let resp = svc.describe_table(&req).unwrap();
3590        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3591        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3592    }
3593
3594    #[test]
3595    fn backup_restore_preserves_items() {
3596        let svc = make_service();
3597        create_test_table(&svc);
3598
3599        // Put 3 items
3600        for i in 1..=3 {
3601            let req = make_request(
3602                "PutItem",
3603                json!({
3604                    "TableName": "test-table",
3605                    "Item": {
3606                        "pk": { "S": format!("key{i}") },
3607                        "data": { "S": format!("value{i}") }
3608                    }
3609                }),
3610            );
3611            svc.put_item(&req).unwrap();
3612        }
3613
3614        // Create backup
3615        let req = make_request(
3616            "CreateBackup",
3617            json!({
3618                "TableName": "test-table",
3619                "BackupName": "my-backup"
3620            }),
3621        );
3622        let resp = svc.create_backup(&req).unwrap();
3623        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3624        let backup_arn = body["BackupDetails"]["BackupArn"]
3625            .as_str()
3626            .unwrap()
3627            .to_string();
3628
3629        // Delete all items from the original table
3630        for i in 1..=3 {
3631            let req = make_request(
3632                "DeleteItem",
3633                json!({
3634                    "TableName": "test-table",
3635                    "Key": { "pk": { "S": format!("key{i}") } }
3636                }),
3637            );
3638            svc.delete_item(&req).unwrap();
3639        }
3640
3641        // Verify original table is empty
3642        let req = make_request("Scan", json!({ "TableName": "test-table" }));
3643        let resp = svc.scan(&req).unwrap();
3644        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3645        assert_eq!(body["Count"], 0);
3646
3647        // Restore from backup
3648        let req = make_request(
3649            "RestoreTableFromBackup",
3650            json!({
3651                "BackupArn": backup_arn,
3652                "TargetTableName": "restored-table"
3653            }),
3654        );
3655        svc.restore_table_from_backup(&req).unwrap();
3656
3657        // Scan restored table — should have 3 items
3658        let req = make_request("Scan", json!({ "TableName": "restored-table" }));
3659        let resp = svc.scan(&req).unwrap();
3660        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3661        assert_eq!(body["Count"], 3);
3662        assert_eq!(body["Items"].as_array().unwrap().len(), 3);
3663    }
3664
3665    #[test]
3666    fn global_table_replicates_writes() {
3667        let svc = make_service();
3668        create_test_table(&svc);
3669
3670        // Create global table with replicas
3671        let req = make_request(
3672            "CreateGlobalTable",
3673            json!({
3674                "GlobalTableName": "test-table",
3675                "ReplicationGroup": [
3676                    { "RegionName": "us-east-1" },
3677                    { "RegionName": "eu-west-1" }
3678                ]
3679            }),
3680        );
3681        let resp = svc.create_global_table(&req).unwrap();
3682        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3683        assert_eq!(
3684            body["GlobalTableDescription"]["GlobalTableStatus"],
3685            "ACTIVE"
3686        );
3687
3688        // Put an item
3689        let req = make_request(
3690            "PutItem",
3691            json!({
3692                "TableName": "test-table",
3693                "Item": {
3694                    "pk": { "S": "replicated-key" },
3695                    "data": { "S": "replicated-value" }
3696                }
3697            }),
3698        );
3699        svc.put_item(&req).unwrap();
3700
3701        // Verify the item is readable (since all replicas share the same table)
3702        let req = make_request(
3703            "GetItem",
3704            json!({
3705                "TableName": "test-table",
3706                "Key": { "pk": { "S": "replicated-key" } }
3707            }),
3708        );
3709        let resp = svc.get_item(&req).unwrap();
3710        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3711        assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
3712        assert_eq!(body["Item"]["data"]["S"], "replicated-value");
3713    }
3714
3715    #[test]
3716    fn contributor_insights_tracks_access() {
3717        let svc = make_service();
3718        create_test_table(&svc);
3719
3720        // Enable contributor insights
3721        let req = make_request(
3722            "UpdateContributorInsights",
3723            json!({
3724                "TableName": "test-table",
3725                "ContributorInsightsAction": "ENABLE"
3726            }),
3727        );
3728        svc.update_contributor_insights(&req).unwrap();
3729
3730        // Put items with different partition keys
3731        for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
3732            let req = make_request(
3733                "PutItem",
3734                json!({
3735                    "TableName": "test-table",
3736                    "Item": {
3737                        "pk": { "S": key },
3738                        "data": { "S": "value" }
3739                    }
3740                }),
3741            );
3742            svc.put_item(&req).unwrap();
3743        }
3744
3745        // Get items (to also track read access)
3746        for _ in 0..3 {
3747            let req = make_request(
3748                "GetItem",
3749                json!({
3750                    "TableName": "test-table",
3751                    "Key": { "pk": { "S": "alpha" } }
3752                }),
3753            );
3754            svc.get_item(&req).unwrap();
3755        }
3756
3757        // Describe contributor insights — should show top contributors
3758        let req = make_request(
3759            "DescribeContributorInsights",
3760            json!({ "TableName": "test-table" }),
3761        );
3762        let resp = svc.describe_contributor_insights(&req).unwrap();
3763        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3764        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
3765
3766        let contributors = body["TopContributors"].as_array().unwrap();
3767        assert!(
3768            !contributors.is_empty(),
3769            "TopContributors should not be empty"
3770        );
3771
3772        // alpha was accessed 3 (put) + 3 (get) = 6 times, beta 2 times
3773        // alpha should be the top contributor
3774        let top = &contributors[0];
3775        assert!(top["Count"].as_u64().unwrap() > 0);
3776
3777        // Verify the rule list is populated
3778        let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
3779        assert!(!rules.is_empty());
3780    }
3781
3782    #[test]
3783    fn contributor_insights_not_tracked_when_disabled() {
3784        let svc = make_service();
3785        create_test_table(&svc);
3786
3787        // Put items without enabling insights
3788        let req = make_request(
3789            "PutItem",
3790            json!({
3791                "TableName": "test-table",
3792                "Item": {
3793                    "pk": { "S": "key1" },
3794                    "data": { "S": "value" }
3795                }
3796            }),
3797        );
3798        svc.put_item(&req).unwrap();
3799
3800        // Describe — should show empty contributors
3801        let req = make_request(
3802            "DescribeContributorInsights",
3803            json!({ "TableName": "test-table" }),
3804        );
3805        let resp = svc.describe_contributor_insights(&req).unwrap();
3806        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3807        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
3808
3809        let contributors = body["TopContributors"].as_array().unwrap();
3810        assert!(contributors.is_empty());
3811    }
3812
3813    #[test]
3814    fn contributor_insights_disabled_table_no_counters_after_scan() {
3815        let svc = make_service();
3816        create_test_table(&svc);
3817
3818        // Put items
3819        for key in &["alpha", "beta"] {
3820            let req = make_request(
3821                "PutItem",
3822                json!({
3823                    "TableName": "test-table",
3824                    "Item": { "pk": { "S": key } }
3825                }),
3826            );
3827            svc.put_item(&req).unwrap();
3828        }
3829
3830        // Enable insights, then scan, then disable, then check counters are cleared
3831        let req = make_request(
3832            "UpdateContributorInsights",
3833            json!({
3834                "TableName": "test-table",
3835                "ContributorInsightsAction": "ENABLE"
3836            }),
3837        );
3838        svc.update_contributor_insights(&req).unwrap();
3839
3840        // Scan to trigger counter collection
3841        let req = make_request("Scan", json!({ "TableName": "test-table" }));
3842        svc.scan(&req).unwrap();
3843
3844        // Verify counters were collected
3845        let req = make_request(
3846            "DescribeContributorInsights",
3847            json!({ "TableName": "test-table" }),
3848        );
3849        let resp = svc.describe_contributor_insights(&req).unwrap();
3850        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3851        let contributors = body["TopContributors"].as_array().unwrap();
3852        assert!(
3853            !contributors.is_empty(),
3854            "counters should be non-empty while enabled"
3855        );
3856
3857        // Disable insights (this clears counters)
3858        let req = make_request(
3859            "UpdateContributorInsights",
3860            json!({
3861                "TableName": "test-table",
3862                "ContributorInsightsAction": "DISABLE"
3863            }),
3864        );
3865        svc.update_contributor_insights(&req).unwrap();
3866
3867        // Scan again -- should NOT accumulate counters since insights is disabled
3868        let req = make_request("Scan", json!({ "TableName": "test-table" }));
3869        svc.scan(&req).unwrap();
3870
3871        // Verify counters are still empty
3872        let req = make_request(
3873            "DescribeContributorInsights",
3874            json!({ "TableName": "test-table" }),
3875        );
3876        let resp = svc.describe_contributor_insights(&req).unwrap();
3877        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3878        let contributors = body["TopContributors"].as_array().unwrap();
3879        assert!(
3880            contributors.is_empty(),
3881            "counters should be empty after disabling insights"
3882        );
3883    }
3884
3885    #[test]
3886    fn scan_pagination_with_limit() {
3887        let svc = make_service();
3888        create_test_table(&svc);
3889
3890        // Insert 5 items
3891        for i in 0..5 {
3892            let req = make_request(
3893                "PutItem",
3894                json!({
3895                    "TableName": "test-table",
3896                    "Item": {
3897                        "pk": { "S": format!("item{i}") },
3898                        "data": { "S": format!("value{i}") }
3899                    }
3900                }),
3901            );
3902            svc.put_item(&req).unwrap();
3903        }
3904
3905        // Scan with limit=2
3906        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
3907        let resp = svc.scan(&req).unwrap();
3908        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3909        assert_eq!(body["Count"], 2);
3910        assert!(
3911            body["LastEvaluatedKey"].is_object(),
3912            "should have LastEvaluatedKey when limit < total items"
3913        );
3914        assert!(body["LastEvaluatedKey"]["pk"].is_object());
3915
3916        // Page through all items
3917        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
3918        let mut lek = body["LastEvaluatedKey"].clone();
3919
3920        while lek.is_object() {
3921            let req = make_request(
3922                "Scan",
3923                json!({
3924                    "TableName": "test-table",
3925                    "Limit": 2,
3926                    "ExclusiveStartKey": lek
3927                }),
3928            );
3929            let resp = svc.scan(&req).unwrap();
3930            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3931            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
3932            lek = body["LastEvaluatedKey"].clone();
3933        }
3934
3935        assert_eq!(
3936            all_items.len(),
3937            5,
3938            "should retrieve all 5 items via pagination"
3939        );
3940    }
3941
3942    #[test]
3943    fn scan_no_pagination_when_all_fit() {
3944        let svc = make_service();
3945        create_test_table(&svc);
3946
3947        for i in 0..3 {
3948            let req = make_request(
3949                "PutItem",
3950                json!({
3951                    "TableName": "test-table",
3952                    "Item": {
3953                        "pk": { "S": format!("item{i}") }
3954                    }
3955                }),
3956            );
3957            svc.put_item(&req).unwrap();
3958        }
3959
3960        // Scan with limit > item count
3961        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
3962        let resp = svc.scan(&req).unwrap();
3963        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3964        assert_eq!(body["Count"], 3);
3965        assert!(
3966            body["LastEvaluatedKey"].is_null(),
3967            "should not have LastEvaluatedKey when all items fit"
3968        );
3969
3970        // Scan without limit
3971        let req = make_request("Scan", json!({ "TableName": "test-table" }));
3972        let resp = svc.scan(&req).unwrap();
3973        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3974        assert_eq!(body["Count"], 3);
3975        assert!(body["LastEvaluatedKey"].is_null());
3976    }
3977
3978    fn create_composite_table(svc: &DynamoDbService) {
3979        let req = make_request(
3980            "CreateTable",
3981            json!({
3982                "TableName": "composite-table",
3983                "KeySchema": [
3984                    { "AttributeName": "pk", "KeyType": "HASH" },
3985                    { "AttributeName": "sk", "KeyType": "RANGE" }
3986                ],
3987                "AttributeDefinitions": [
3988                    { "AttributeName": "pk", "AttributeType": "S" },
3989                    { "AttributeName": "sk", "AttributeType": "S" }
3990                ],
3991                "BillingMode": "PAY_PER_REQUEST"
3992            }),
3993        );
3994        svc.create_table(&req).unwrap();
3995    }
3996
3997    #[test]
3998    fn query_pagination_with_composite_key() {
3999        let svc = make_service();
4000        create_composite_table(&svc);
4001
4002        // Insert 5 items under the same partition key
4003        for i in 0..5 {
4004            let req = make_request(
4005                "PutItem",
4006                json!({
4007                    "TableName": "composite-table",
4008                    "Item": {
4009                        "pk": { "S": "user1" },
4010                        "sk": { "S": format!("item{i:03}") },
4011                        "data": { "S": format!("value{i}") }
4012                    }
4013                }),
4014            );
4015            svc.put_item(&req).unwrap();
4016        }
4017
4018        // Query with limit=2
4019        let req = make_request(
4020            "Query",
4021            json!({
4022                "TableName": "composite-table",
4023                "KeyConditionExpression": "pk = :pk",
4024                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4025                "Limit": 2
4026            }),
4027        );
4028        let resp = svc.query(&req).unwrap();
4029        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4030        assert_eq!(body["Count"], 2);
4031        assert!(body["LastEvaluatedKey"].is_object());
4032        assert!(body["LastEvaluatedKey"]["pk"].is_object());
4033        assert!(body["LastEvaluatedKey"]["sk"].is_object());
4034
4035        // Page through all items
4036        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
4037        let mut lek = body["LastEvaluatedKey"].clone();
4038
4039        while lek.is_object() {
4040            let req = make_request(
4041                "Query",
4042                json!({
4043                    "TableName": "composite-table",
4044                    "KeyConditionExpression": "pk = :pk",
4045                    "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4046                    "Limit": 2,
4047                    "ExclusiveStartKey": lek
4048                }),
4049            );
4050            let resp = svc.query(&req).unwrap();
4051            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4052            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
4053            lek = body["LastEvaluatedKey"].clone();
4054        }
4055
4056        assert_eq!(
4057            all_items.len(),
4058            5,
4059            "should retrieve all 5 items via pagination"
4060        );
4061
4062        // Verify items came back sorted by sort key
4063        let sks: Vec<String> = all_items
4064            .iter()
4065            .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
4066            .collect();
4067        let mut sorted = sks.clone();
4068        sorted.sort();
4069        assert_eq!(sks, sorted, "items should be sorted by sort key");
4070    }
4071
4072    #[test]
4073    fn query_no_pagination_when_all_fit() {
4074        let svc = make_service();
4075        create_composite_table(&svc);
4076
4077        for i in 0..2 {
4078            let req = make_request(
4079                "PutItem",
4080                json!({
4081                    "TableName": "composite-table",
4082                    "Item": {
4083                        "pk": { "S": "user1" },
4084                        "sk": { "S": format!("item{i}") }
4085                    }
4086                }),
4087            );
4088            svc.put_item(&req).unwrap();
4089        }
4090
4091        let req = make_request(
4092            "Query",
4093            json!({
4094                "TableName": "composite-table",
4095                "KeyConditionExpression": "pk = :pk",
4096                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4097                "Limit": 10
4098            }),
4099        );
4100        let resp = svc.query(&req).unwrap();
4101        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4102        assert_eq!(body["Count"], 2);
4103        assert!(
4104            body["LastEvaluatedKey"].is_null(),
4105            "should not have LastEvaluatedKey when all items fit"
4106        );
4107    }
4108
4109    fn create_gsi_table(svc: &DynamoDbService) {
4110        let req = make_request(
4111            "CreateTable",
4112            json!({
4113                "TableName": "gsi-table",
4114                "KeySchema": [
4115                    { "AttributeName": "pk", "KeyType": "HASH" }
4116                ],
4117                "AttributeDefinitions": [
4118                    { "AttributeName": "pk", "AttributeType": "S" },
4119                    { "AttributeName": "gsi_pk", "AttributeType": "S" },
4120                    { "AttributeName": "gsi_sk", "AttributeType": "S" }
4121                ],
4122                "BillingMode": "PAY_PER_REQUEST",
4123                "GlobalSecondaryIndexes": [
4124                    {
4125                        "IndexName": "gsi-index",
4126                        "KeySchema": [
4127                            { "AttributeName": "gsi_pk", "KeyType": "HASH" },
4128                            { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
4129                        ],
4130                        "Projection": { "ProjectionType": "ALL" }
4131                    }
4132                ]
4133            }),
4134        );
4135        svc.create_table(&req).unwrap();
4136    }
4137
4138    #[test]
4139    fn gsi_query_last_evaluated_key_includes_table_pk() {
4140        let svc = make_service();
4141        create_gsi_table(&svc);
4142
4143        // Insert 3 items with the SAME GSI key but different table PKs
4144        for i in 0..3 {
4145            let req = make_request(
4146                "PutItem",
4147                json!({
4148                    "TableName": "gsi-table",
4149                    "Item": {
4150                        "pk": { "S": format!("item{i}") },
4151                        "gsi_pk": { "S": "shared" },
4152                        "gsi_sk": { "S": "sort" }
4153                    }
4154                }),
4155            );
4156            svc.put_item(&req).unwrap();
4157        }
4158
4159        // Query GSI with Limit=1 to trigger pagination
4160        let req = make_request(
4161            "Query",
4162            json!({
4163                "TableName": "gsi-table",
4164                "IndexName": "gsi-index",
4165                "KeyConditionExpression": "gsi_pk = :v",
4166                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4167                "Limit": 1
4168            }),
4169        );
4170        let resp = svc.query(&req).unwrap();
4171        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4172        assert_eq!(body["Count"], 1);
4173        let lek = &body["LastEvaluatedKey"];
4174        assert!(lek.is_object(), "should have LastEvaluatedKey");
4175        // Must contain the index keys
4176        assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
4177        assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
4178        // Must also contain the table PK
4179        assert!(
4180            lek["pk"].is_object(),
4181            "LEK must contain table PK for GSI queries"
4182        );
4183    }
4184
4185    #[test]
4186    fn gsi_query_pagination_returns_all_items() {
4187        let svc = make_service();
4188        create_gsi_table(&svc);
4189
4190        // Insert 4 items with the SAME GSI key but different table PKs
4191        for i in 0..4 {
4192            let req = make_request(
4193                "PutItem",
4194                json!({
4195                    "TableName": "gsi-table",
4196                    "Item": {
4197                        "pk": { "S": format!("item{i:03}") },
4198                        "gsi_pk": { "S": "shared" },
4199                        "gsi_sk": { "S": "sort" }
4200                    }
4201                }),
4202            );
4203            svc.put_item(&req).unwrap();
4204        }
4205
4206        // Paginate through all items with Limit=2
4207        let mut all_pks = Vec::new();
4208        let mut lek: Option<Value> = None;
4209
4210        loop {
4211            let mut query = json!({
4212                "TableName": "gsi-table",
4213                "IndexName": "gsi-index",
4214                "KeyConditionExpression": "gsi_pk = :v",
4215                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4216                "Limit": 2
4217            });
4218            if let Some(ref start_key) = lek {
4219                query["ExclusiveStartKey"] = start_key.clone();
4220            }
4221
4222            let req = make_request("Query", query);
4223            let resp = svc.query(&req).unwrap();
4224            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4225
4226            for item in body["Items"].as_array().unwrap() {
4227                let pk = item["pk"]["S"].as_str().unwrap().to_string();
4228                all_pks.push(pk);
4229            }
4230
4231            if body["LastEvaluatedKey"].is_object() {
4232                lek = Some(body["LastEvaluatedKey"].clone());
4233            } else {
4234                break;
4235            }
4236        }
4237
4238        all_pks.sort();
4239        assert_eq!(
4240            all_pks,
4241            vec!["item000", "item001", "item002", "item003"],
4242            "pagination should return all items without duplicates"
4243        );
4244    }
4245
4246    fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
4247        pairs
4248            .iter()
4249            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4250            .collect()
4251    }
4252
4253    fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
4254        pairs
4255            .iter()
4256            .map(|(k, v)| (k.to_string(), v.to_string()))
4257            .collect()
4258    }
4259
4260    fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
4261        pairs
4262            .iter()
4263            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4264            .collect()
4265    }
4266
4267    #[test]
4268    fn test_evaluate_condition_bare_not_equal() {
4269        let item = cond_item(&[("state", "active")]);
4270        let names = cond_names(&[("#s", "state")]);
4271        let values = cond_values(&[(":c", "complete")]);
4272
4273        assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
4274
4275        let item2 = cond_item(&[("state", "complete")]);
4276        assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
4277    }
4278
4279    #[test]
4280    fn test_evaluate_condition_parenthesized_not_equal() {
4281        let item = cond_item(&[("state", "active")]);
4282        let names = cond_names(&[("#s", "state")]);
4283        let values = cond_values(&[(":c", "complete")]);
4284
4285        assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
4286    }
4287
4288    #[test]
4289    fn test_evaluate_condition_parenthesized_equal_mismatch() {
4290        let item = cond_item(&[("state", "active")]);
4291        let names = cond_names(&[("#s", "state")]);
4292        let values = cond_values(&[(":c", "complete")]);
4293
4294        assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
4295    }
4296
4297    #[test]
4298    fn test_evaluate_condition_compound_and() {
4299        let item = cond_item(&[("state", "active")]);
4300        let names = cond_names(&[("#s", "state")]);
4301        let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
4302
4303        // active <> complete AND active <> failed => true
4304        assert!(
4305            evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
4306        );
4307    }
4308
4309    #[test]
4310    fn test_evaluate_condition_compound_and_mismatch() {
4311        let item = cond_item(&[("state", "inactive")]);
4312        let names = cond_names(&[("#s", "state")]);
4313        let values = cond_values(&[(":a", "active"), (":b", "active")]);
4314
4315        // inactive = active AND inactive = active => false
4316        assert!(
4317            evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
4318        );
4319    }
4320
4321    #[test]
4322    fn test_evaluate_condition_compound_or() {
4323        let item = cond_item(&[("state", "running")]);
4324        let names = cond_names(&[("#s", "state")]);
4325        let values = cond_values(&[(":a", "active"), (":b", "idle")]);
4326
4327        // running = active OR running = idle => false
4328        assert!(
4329            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
4330        );
4331
4332        // running = active OR running = running => true
4333        let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
4334        assert!(
4335            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
4336        );
4337    }
4338
4339    #[test]
4340    fn test_evaluate_condition_not_operator() {
4341        let item = cond_item(&[("state", "active")]);
4342        let names = cond_names(&[("#s", "state")]);
4343        let values = cond_values(&[(":c", "complete")]);
4344
4345        // NOT (active = complete) => NOT false => true
4346        assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
4347
4348        // NOT (active <> complete) => NOT true => false
4349        assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
4350
4351        // NOT attribute_exists(#s) on existing item => NOT true => false
4352        assert!(
4353            evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
4354        );
4355
4356        // NOT attribute_exists(#s) on missing item => NOT false => true
4357        assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
4358    }
4359
4360    #[test]
4361    fn test_evaluate_condition_begins_with() {
4362        // After unification, conditions support begins_with via
4363        // evaluate_single_filter_condition (previously only filters had it).
4364        let item = cond_item(&[("name", "fakecloud-dynamodb")]);
4365        let names = cond_names(&[("#n", "name")]);
4366        let values = cond_values(&[(":p", "fakecloud")]);
4367
4368        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
4369
4370        let values2 = cond_values(&[(":p", "realcloud")]);
4371        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
4372    }
4373
4374    #[test]
4375    fn test_evaluate_condition_contains() {
4376        let item = cond_item(&[("tags", "alpha,beta,gamma")]);
4377        let names = cond_names(&[("#t", "tags")]);
4378        let values = cond_values(&[(":v", "beta")]);
4379
4380        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
4381
4382        let values2 = cond_values(&[(":v", "delta")]);
4383        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
4384    }
4385
4386    #[test]
4387    fn test_evaluate_condition_no_existing_item() {
4388        // When no item exists (PutItem with condition), attribute_not_exists
4389        // should succeed and attribute_exists should fail.
4390        let names = cond_names(&[("#s", "state")]);
4391        let values = cond_values(&[(":v", "active")]);
4392
4393        assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
4394        assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
4395        // Comparison against missing item: None != Some(val) => true for <>
4396        assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
4397        // None == Some(val) => false for =
4398        assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
4399    }
4400
4401    #[test]
4402    fn test_evaluate_filter_not_operator() {
4403        let item = cond_item(&[("status", "pending")]);
4404        let names = cond_names(&[("#s", "status")]);
4405        let values = cond_values(&[(":v", "pending")]);
4406
4407        assert!(!evaluate_filter_expression(
4408            "NOT (#s = :v)",
4409            &item,
4410            &names,
4411            &values
4412        ));
4413        assert!(evaluate_filter_expression(
4414            "NOT (#s <> :v)",
4415            &item,
4416            &names,
4417            &values
4418        ));
4419    }
4420
4421    #[test]
4422    fn test_evaluate_filter_expression_in_match() {
4423        // aws-sdk-go v2's expression.Name("state").In(Value("active"), Value("pending"))
4424        // emits "#0 IN (:0, :1)". Before fix: neither evaluate_single_filter_condition
4425        // nor evaluate_single_key_condition handled IN, so the filter leaf fell through
4426        // to the simple-comparison loop, hit no operators, and returned `true` — meaning
4427        // every item matched every IN filter regardless of value.
4428        let item = cond_item(&[("state", "active")]);
4429        let names = cond_names(&[("#s", "state")]);
4430        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4431
4432        assert!(
4433            evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4434            "state=active should match IN (active, pending)"
4435        );
4436    }
4437
4438    #[test]
4439    fn test_evaluate_filter_expression_in_no_match() {
4440        let item = cond_item(&[("state", "complete")]);
4441        let names = cond_names(&[("#s", "state")]);
4442        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4443
4444        assert!(
4445            !evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4446            "state=complete should not match IN (active, pending)"
4447        );
4448    }
4449
4450    #[test]
4451    fn test_evaluate_filter_expression_in_no_spaces() {
4452        // orderbot emits the raw form
4453        //     "#status IN (" + strings.Join(keys, ",") + ")"
4454        // which produces "IN (:v0,:v1,:v2)" — no spaces after commas. Must parse.
4455        let item = cond_item(&[("status", "shipped")]);
4456        let names = cond_names(&[("#s", "status")]);
4457        let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
4458
4459        assert!(
4460            evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
4461            "no-space IN list should still parse"
4462        );
4463    }
4464
4465    #[test]
4466    fn test_evaluate_filter_expression_in_missing_attribute() {
4467        // A missing attribute must not match any IN list — the silent-true
4468        // fallthrough would wrongly accept these items.
4469        let item: HashMap<String, AttributeValue> = HashMap::new();
4470        let names = cond_names(&[("#s", "state")]);
4471        let values = cond_values(&[(":a", "active")]);
4472
4473        assert!(
4474            !evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
4475            "missing attribute should not match any IN list"
4476        );
4477    }
4478
4479    #[test]
4480    fn test_evaluate_filter_expression_compound_in_and_eq() {
4481        // Shape emitted by `Name("state").In(...).And(Name("priority").Equal(...))`:
4482        //     "(#0 IN (:0, :1)) AND (#1 = :2)"
4483        // split_on_and handles the outer parens, but the IN leaf had the
4484        // silent-true fallthrough, so any item with priority=high would match
4485        // regardless of state.
4486        let item = cond_item(&[("state", "active"), ("priority", "high")]);
4487        let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
4488        let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
4489
4490        assert!(
4491            evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
4492            "(active IN (active, pending)) AND (high = high) should match"
4493        );
4494
4495        let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
4496        assert!(
4497            !evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
4498            "(complete IN (active, pending)) AND (high = high) should not match"
4499        );
4500    }
4501
4502    #[test]
4503    fn test_evaluate_condition_attribute_exists_with_space() {
4504        // aws-sdk-go v2's expression.NewBuilder emits function calls with a
4505        // space between the name and the opening paren:
4506        //     "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))"
4507        // Before fix: extract_function_arg used strip_prefix("attribute_exists(")
4508        // with no space, so these fell through the filter leaf entirely and
4509        // hit evaluate_single_key_condition's silent-true fallthrough —
4510        // every conditional write was silently accepted.
4511        let item = cond_item(&[("store_id", "s-1")]);
4512        let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
4513        let values = cond_values(&[(":0", "tab-A")]);
4514
4515        // On an existing item without active_viewer_tab_id: exists(store_id)
4516        // is true, not_exists(active_viewer_tab_id) is true → OK.
4517        assert!(
4518            evaluate_condition(
4519                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4520                Some(&item),
4521                &names,
4522                &values,
4523            )
4524            .is_ok(),
4525            "claim-lease compound on free item should succeed"
4526        );
4527
4528        // On a missing item: exists(store_id) is false → whole AND false → Err.
4529        assert!(
4530            evaluate_condition(
4531                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4532                None,
4533                &names,
4534                &values,
4535            )
4536            .is_err(),
4537            "claim-lease compound on missing item must fail attribute_exists branch"
4538        );
4539
4540        // On an item already held by tab-B: exists ✓, not_exists ✗, #1 = :0 ✗
4541        // → (✓) AND ((✗) OR (✗)) → false → Err.
4542        let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
4543        assert!(
4544            evaluate_condition(
4545                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4546                Some(&held),
4547                &names,
4548                &values,
4549            )
4550            .is_err(),
4551            "claim-lease compound on item held by another tab must fail"
4552        );
4553
4554        // Same tab re-claiming: exists ✓, not_exists ✗, #1 = :0 ✓
4555        // → (✓) AND ((✗) OR (✓)) → true → Ok.
4556        let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
4557        assert!(
4558            evaluate_condition(
4559                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4560                Some(&self_held),
4561                &names,
4562                &values,
4563            )
4564            .is_ok(),
4565            "same-tab re-claim must succeed"
4566        );
4567    }
4568
4569    #[test]
4570    fn test_evaluate_condition_in_match() {
4571        // evaluate_condition delegates to evaluate_filter_expression, so this
4572        // also proves the ConditionExpression path. Before fix: silently Ok.
4573        let item = cond_item(&[("state", "active")]);
4574        let names = cond_names(&[("#s", "state")]);
4575        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4576
4577        assert!(
4578            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
4579            "IN should succeed when actual value is in the list"
4580        );
4581    }
4582
4583    #[test]
4584    fn test_evaluate_condition_in_no_match() {
4585        // Before fix: evaluate_condition silently returned Ok(()) for IN — any
4586        // conditional write was accepted regardless of actual state, the
4587        // opposite of what the caller asked for.
4588        let item = cond_item(&[("state", "complete")]);
4589        let names = cond_names(&[("#s", "state")]);
4590        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4591
4592        assert!(
4593            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
4594            "IN should fail when actual value is not in the list"
4595        );
4596    }
4597
4598    #[test]
4599    fn test_apply_update_set_list_index_replaces_existing() {
4600        // Shape emitted by orderbot's order-item update retry loop:
4601        //     UpdateExpression: fmt.Sprintf("SET #items[%d] = :item", index)
4602        // Before fix: apply_set_assignment called resolve_attr_name on the
4603        // whole "#items[0]" token, which misses the name map, and then
4604        // item.insert("#items[0]", :item), producing a top-level key
4605        // literally named "#items[0]" rather than mutating the list.
4606        let mut item = HashMap::new();
4607        item.insert(
4608            "items".to_string(),
4609            json!({"L": [
4610                {"M": {"sku": {"S": "OLD-A"}}},
4611                {"M": {"sku": {"S": "OLD-B"}}},
4612            ]}),
4613        );
4614
4615        let names = cond_names(&[("#items", "items")]);
4616        let mut values = HashMap::new();
4617        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
4618
4619        apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
4620
4621        let items_list = item
4622            .get("items")
4623            .and_then(|v| v.get("L"))
4624            .and_then(|v| v.as_array())
4625            .expect("items should still be a list");
4626        assert_eq!(items_list.len(), 2, "list length should be unchanged");
4627        let sku0 = items_list[0]
4628            .get("M")
4629            .and_then(|m| m.get("sku"))
4630            .and_then(|s| s.get("S"))
4631            .and_then(|s| s.as_str());
4632        assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
4633        let sku1 = items_list[1]
4634            .get("M")
4635            .and_then(|m| m.get("sku"))
4636            .and_then(|s| s.get("S"))
4637            .and_then(|s| s.as_str());
4638        assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
4639
4640        assert!(!item.contains_key("items[0]"));
4641        assert!(!item.contains_key("#items[0]"));
4642    }
4643
4644    #[test]
4645    fn test_apply_update_set_list_index_second_slot() {
4646        let mut item = HashMap::new();
4647        item.insert(
4648            "items".to_string(),
4649            json!({"L": [
4650                {"M": {"sku": {"S": "A"}}},
4651                {"M": {"sku": {"S": "B"}}},
4652                {"M": {"sku": {"S": "C"}}},
4653            ]}),
4654        );
4655
4656        let names = cond_names(&[("#items", "items")]);
4657        let mut values = HashMap::new();
4658        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
4659
4660        apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
4661
4662        let items_list = item
4663            .get("items")
4664            .and_then(|v| v.get("L"))
4665            .and_then(|v| v.as_array())
4666            .unwrap();
4667        let skus: Vec<&str> = items_list
4668            .iter()
4669            .map(|v| {
4670                v.get("M")
4671                    .and_then(|m| m.get("sku"))
4672                    .and_then(|s| s.get("S"))
4673                    .and_then(|s| s.as_str())
4674                    .unwrap()
4675            })
4676            .collect();
4677        assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
4678    }
4679
4680    #[test]
4681    fn test_apply_update_set_list_index_without_name_ref() {
4682        // Same fix must also work when the LHS is a literal attribute name,
4683        // not an expression attribute name ref.
4684        let mut item = HashMap::new();
4685        item.insert(
4686            "tags".to_string(),
4687            json!({"L": [{"S": "red"}, {"S": "blue"}]}),
4688        );
4689
4690        let names: HashMap<String, String> = HashMap::new();
4691        let mut values = HashMap::new();
4692        values.insert(":t".to_string(), json!({"S": "green"}));
4693
4694        apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
4695
4696        let tags = item
4697            .get("tags")
4698            .and_then(|v| v.get("L"))
4699            .and_then(|v| v.as_array())
4700            .unwrap();
4701        assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
4702        assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
4703    }
4704
4705    #[test]
4706    fn test_list_append_into_empty_list() {
4707        // Regression: UpdateItem with `SET #0 = list_append(#0, :0)` where
4708        // the attribute already exists as an empty list silently no-oped.
4709        // Root cause: parse_update_clauses split `list_append(#0, :0)` at
4710        // the inner comma, so apply_set_list_append received a truncated
4711        // `rest` with no closing ')' and returned early without writing.
4712        let mut item = HashMap::new();
4713        item.insert("files".to_string(), json!({"L": []}));
4714
4715        let names = cond_names(&[("#0", "files")]);
4716        let mut values = HashMap::new();
4717        values.insert(
4718            ":0".to_string(),
4719            json!({"L": [{"M": {"field": {"S": "value"}}}]}),
4720        );
4721
4722        apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
4723            .unwrap();
4724
4725        let list = item
4726            .get("files")
4727            .and_then(|v| v.get("L"))
4728            .and_then(|v| v.as_array())
4729            .expect("files should be an L-typed attribute");
4730        assert_eq!(list.len(), 1, "one element should have been appended");
4731    }
4732
4733    #[test]
4734    fn test_list_append_into_nonempty_list() {
4735        // Verifies the same fix works when the existing list already has elements.
4736        let mut item = HashMap::new();
4737        item.insert(
4738            "files".to_string(),
4739            json!({"L": [{"M": {"field": {"S": "existing"}}}]}),
4740        );
4741
4742        let names = cond_names(&[("#0", "files")]);
4743        let mut values = HashMap::new();
4744        values.insert(
4745            ":0".to_string(),
4746            json!({"L": [{"M": {"field": {"S": "new"}}}]}),
4747        );
4748
4749        apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
4750            .unwrap();
4751
4752        let list = item
4753            .get("files")
4754            .and_then(|v| v.get("L"))
4755            .and_then(|v| v.as_array())
4756            .expect("files should be an L-typed attribute");
4757        assert_eq!(list.len(), 2, "existing element plus one new element");
4758    }
4759
4760    #[test]
4761    fn test_list_append_combined_with_plain_set() {
4762        // Verifies that a mixed expression like
4763        // `SET #a = list_append(#a, :v), #b = :other` correctly applies
4764        // both assignments after the paren-aware comma split fix.
4765        let mut item = HashMap::new();
4766        item.insert("logs".to_string(), json!({"L": []}));
4767        item.insert("count".to_string(), json!({"N": "0"}));
4768
4769        let names = cond_names(&[("#a", "logs"), ("#b", "count")]);
4770        let mut values = HashMap::new();
4771        values.insert(":v".to_string(), json!({"L": [{"S": "entry"}]}));
4772        values.insert(":other".to_string(), json!({"N": "1"}));
4773
4774        apply_update_expression(
4775            &mut item,
4776            "SET #a = list_append(#a, :v), #b = :other",
4777            &names,
4778            &values,
4779        )
4780        .unwrap();
4781
4782        let list = item
4783            .get("logs")
4784            .and_then(|v| v.get("L"))
4785            .and_then(|v| v.as_array())
4786            .expect("logs should be an L-typed attribute");
4787        assert_eq!(list.len(), 1, "one log entry appended");
4788
4789        let count = item
4790            .get("count")
4791            .and_then(|v| v.get("N"))
4792            .and_then(|v| v.as_str())
4793            .expect("count should be an N-typed attribute");
4794        assert_eq!(count, "1", "count updated to 1");
4795    }
4796
4797    #[test]
4798    fn test_unrecognized_expression_returns_false() {
4799        // evaluate_single_key_condition must fail-closed: an expression shape
4800        // it doesn't recognize should return false (reject), not true (accept).
4801        let item = cond_item(&[("x", "1")]);
4802        let names: HashMap<String, String> = HashMap::new();
4803        let values: HashMap<String, Value> = HashMap::new();
4804
4805        assert!(
4806            !evaluate_single_key_condition("GARBAGE NONSENSE", &item, "", &names, &values),
4807            "unrecognized expression must return false"
4808        );
4809    }
4810
4811    #[test]
4812    fn test_set_list_index_out_of_range_returns_error() {
4813        // SET list[N] where N > len must return a ValidationException,
4814        // not silently no-op.
4815        let mut item = HashMap::new();
4816        item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
4817
4818        let names: HashMap<String, String> = HashMap::new();
4819        let mut values = HashMap::new();
4820        values.insert(":v".to_string(), json!({"S": "z"}));
4821
4822        let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
4823        assert!(
4824            result.is_err(),
4825            "out-of-range list index must return an error"
4826        );
4827
4828        // List should be unchanged
4829        let list = item
4830            .get("items")
4831            .and_then(|v| v.get("L"))
4832            .and_then(|v| v.as_array())
4833            .unwrap();
4834        assert_eq!(list.len(), 2);
4835    }
4836
4837    #[test]
4838    fn test_set_list_index_on_non_list_returns_error() {
4839        // SET attr[0] = :v where attr is a string (not a list) must return
4840        // a ValidationException.
4841        let mut item = HashMap::new();
4842        item.insert("name".to_string(), json!({"S": "hello"}));
4843
4844        let names: HashMap<String, String> = HashMap::new();
4845        let mut values = HashMap::new();
4846        values.insert(":v".to_string(), json!({"S": "z"}));
4847
4848        let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
4849        assert!(
4850            result.is_err(),
4851            "list index on non-list attribute must return an error"
4852        );
4853    }
4854
4855    #[test]
4856    fn test_unrecognized_update_action_returns_error() {
4857        let mut item = HashMap::new();
4858        item.insert("name".to_string(), json!({"S": "hello"}));
4859
4860        let names: HashMap<String, String> = HashMap::new();
4861        let mut values = HashMap::new();
4862        values.insert(":bar".to_string(), json!({"S": "baz"}));
4863
4864        let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
4865        assert!(
4866            result.is_err(),
4867            "unrecognized UpdateExpression action must return an error"
4868        );
4869        let err_msg = format!("{}", result.unwrap_err());
4870        assert!(
4871            err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
4872            "error should mention Invalid UpdateExpression, got: {err_msg}"
4873        );
4874    }
4875
4876    // ── size() function tests ──────────────────────────────────────────
4877
4878    #[test]
4879    fn test_size_string() {
4880        let mut item = HashMap::new();
4881        item.insert("name".to_string(), json!({"S": "hello"}));
4882        let names = HashMap::new();
4883        let mut values = HashMap::new();
4884        values.insert(":limit".to_string(), json!({"N": "5"}));
4885
4886        assert!(evaluate_single_filter_condition(
4887            "size(name) = :limit",
4888            &item,
4889            &names,
4890            &values,
4891        ));
4892        values.insert(":limit".to_string(), json!({"N": "4"}));
4893        assert!(evaluate_single_filter_condition(
4894            "size(name) > :limit",
4895            &item,
4896            &names,
4897            &values,
4898        ));
4899    }
4900
4901    #[test]
4902    fn test_size_list() {
4903        let mut item = HashMap::new();
4904        item.insert(
4905            "items".to_string(),
4906            json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
4907        );
4908        let names = HashMap::new();
4909        let mut values = HashMap::new();
4910        values.insert(":limit".to_string(), json!({"N": "3"}));
4911
4912        assert!(evaluate_single_filter_condition(
4913            "size(items) = :limit",
4914            &item,
4915            &names,
4916            &values,
4917        ));
4918    }
4919
4920    #[test]
4921    fn test_size_map() {
4922        let mut item = HashMap::new();
4923        item.insert(
4924            "data".to_string(),
4925            json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
4926        );
4927        let names = HashMap::new();
4928        let mut values = HashMap::new();
4929        values.insert(":limit".to_string(), json!({"N": "2"}));
4930
4931        assert!(evaluate_single_filter_condition(
4932            "size(data) = :limit",
4933            &item,
4934            &names,
4935            &values,
4936        ));
4937    }
4938
4939    #[test]
4940    fn test_size_set() {
4941        let mut item = HashMap::new();
4942        item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
4943        let names = HashMap::new();
4944        let mut values = HashMap::new();
4945        values.insert(":limit".to_string(), json!({"N": "3"}));
4946
4947        assert!(evaluate_single_filter_condition(
4948            "size(tags) > :limit",
4949            &item,
4950            &names,
4951            &values,
4952        ));
4953    }
4954
4955    // ── attribute_type() function tests ────────────────────────────────
4956
4957    #[test]
4958    fn test_attribute_type_string() {
4959        let mut item = HashMap::new();
4960        item.insert("name".to_string(), json!({"S": "hello"}));
4961        let names = HashMap::new();
4962        let mut values = HashMap::new();
4963        values.insert(":t".to_string(), json!({"S": "S"}));
4964
4965        assert!(evaluate_single_filter_condition(
4966            "attribute_type(name, :t)",
4967            &item,
4968            &names,
4969            &values,
4970        ));
4971
4972        values.insert(":t".to_string(), json!({"S": "N"}));
4973        assert!(!evaluate_single_filter_condition(
4974            "attribute_type(name, :t)",
4975            &item,
4976            &names,
4977            &values,
4978        ));
4979    }
4980
4981    #[test]
4982    fn test_attribute_type_number() {
4983        let mut item = HashMap::new();
4984        item.insert("age".to_string(), json!({"N": "42"}));
4985        let names = HashMap::new();
4986        let mut values = HashMap::new();
4987        values.insert(":t".to_string(), json!({"S": "N"}));
4988
4989        assert!(evaluate_single_filter_condition(
4990            "attribute_type(age, :t)",
4991            &item,
4992            &names,
4993            &values,
4994        ));
4995    }
4996
4997    #[test]
4998    fn test_attribute_type_list() {
4999        let mut item = HashMap::new();
5000        item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
5001        let names = HashMap::new();
5002        let mut values = HashMap::new();
5003        values.insert(":t".to_string(), json!({"S": "L"}));
5004
5005        assert!(evaluate_single_filter_condition(
5006            "attribute_type(items, :t)",
5007            &item,
5008            &names,
5009            &values,
5010        ));
5011    }
5012
5013    #[test]
5014    fn test_attribute_type_map() {
5015        let mut item = HashMap::new();
5016        item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
5017        let names = HashMap::new();
5018        let mut values = HashMap::new();
5019        values.insert(":t".to_string(), json!({"S": "M"}));
5020
5021        assert!(evaluate_single_filter_condition(
5022            "attribute_type(data, :t)",
5023            &item,
5024            &names,
5025            &values,
5026        ));
5027    }
5028
5029    #[test]
5030    fn test_attribute_type_bool() {
5031        let mut item = HashMap::new();
5032        item.insert("active".to_string(), json!({"BOOL": true}));
5033        let names = HashMap::new();
5034        let mut values = HashMap::new();
5035        values.insert(":t".to_string(), json!({"S": "BOOL"}));
5036
5037        assert!(evaluate_single_filter_condition(
5038            "attribute_type(active, :t)",
5039            &item,
5040            &names,
5041            &values,
5042        ));
5043    }
5044
5045    // ── begins_with rejects non-string types ───────────────────────────
5046
5047    #[test]
5048    fn test_begins_with_rejects_number_type() {
5049        let mut item = HashMap::new();
5050        item.insert("code".to_string(), json!({"N": "12345"}));
5051        let names = HashMap::new();
5052        let mut values = HashMap::new();
5053        values.insert(":prefix".to_string(), json!({"S": "123"}));
5054
5055        assert!(
5056            !evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
5057            "begins_with must return false for N-type attributes"
5058        );
5059    }
5060
5061    #[test]
5062    fn test_begins_with_works_on_string_type() {
5063        let mut item = HashMap::new();
5064        item.insert("code".to_string(), json!({"S": "abc123"}));
5065        let names = HashMap::new();
5066        let mut values = HashMap::new();
5067        values.insert(":prefix".to_string(), json!({"S": "abc"}));
5068
5069        assert!(evaluate_single_filter_condition(
5070            "begins_with(code, :prefix)",
5071            &item,
5072            &names,
5073            &values,
5074        ));
5075    }
5076
5077    // ── contains on sets ───────────────────────────────────────────────
5078
5079    #[test]
5080    fn test_contains_string_set() {
5081        let mut item = HashMap::new();
5082        item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
5083        let names = HashMap::new();
5084        let mut values = HashMap::new();
5085        values.insert(":val".to_string(), json!({"S": "blue"}));
5086
5087        assert!(evaluate_single_filter_condition(
5088            "contains(tags, :val)",
5089            &item,
5090            &names,
5091            &values,
5092        ));
5093
5094        values.insert(":val".to_string(), json!({"S": "yellow"}));
5095        assert!(!evaluate_single_filter_condition(
5096            "contains(tags, :val)",
5097            &item,
5098            &names,
5099            &values,
5100        ));
5101    }
5102
5103    #[test]
5104    fn test_contains_number_set() {
5105        let mut item = HashMap::new();
5106        item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
5107        let names = HashMap::new();
5108        let mut values = HashMap::new();
5109        values.insert(":val".to_string(), json!({"N": "2"}));
5110
5111        assert!(evaluate_single_filter_condition(
5112            "contains(scores, :val)",
5113            &item,
5114            &names,
5115            &values,
5116        ));
5117    }
5118
5119    // ── SET arithmetic type validation ─────────────────────────────────
5120
5121    #[test]
5122    fn test_set_arithmetic_rejects_string_operand() {
5123        let mut item = HashMap::new();
5124        item.insert("name".to_string(), json!({"S": "hello"}));
5125        let names = HashMap::new();
5126        let mut values = HashMap::new();
5127        values.insert(":val".to_string(), json!({"N": "1"}));
5128
5129        let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
5130        assert!(
5131            result.is_err(),
5132            "arithmetic on S-type attribute must return a ValidationException"
5133        );
5134    }
5135
5136    #[test]
5137    fn test_set_arithmetic_rejects_string_value() {
5138        let mut item = HashMap::new();
5139        item.insert("count".to_string(), json!({"N": "5"}));
5140        let names = HashMap::new();
5141        let mut values = HashMap::new();
5142        values.insert(":val".to_string(), json!({"S": "notanumber"}));
5143
5144        let result =
5145            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5146        assert!(
5147            result.is_err(),
5148            "arithmetic with S-type value must return a ValidationException"
5149        );
5150    }
5151
5152    #[test]
5153    fn test_set_arithmetic_valid_numbers() {
5154        let mut item = HashMap::new();
5155        item.insert("count".to_string(), json!({"N": "10"}));
5156        let names = HashMap::new();
5157        let mut values = HashMap::new();
5158        values.insert(":val".to_string(), json!({"N": "3"}));
5159
5160        let result =
5161            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5162        assert!(result.is_ok());
5163        assert_eq!(item["count"], json!({"N": "13"}));
5164    }
5165
5166    // ── Binary Set (BS) support in ADD/DELETE ──────────────────────────
5167
5168    #[test]
5169    fn test_add_binary_set() {
5170        let mut item = HashMap::new();
5171        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
5172        let names = HashMap::new();
5173        let mut values = HashMap::new();
5174        values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
5175
5176        let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
5177        assert!(result.is_ok());
5178        let bs = item["data"]["BS"].as_array().unwrap();
5179        assert_eq!(bs.len(), 3, "should merge sets without duplicates");
5180        assert!(bs.contains(&json!("YQ==")));
5181        assert!(bs.contains(&json!("Yg==")));
5182        assert!(bs.contains(&json!("Yw==")));
5183    }
5184
5185    #[test]
5186    fn test_delete_binary_set() {
5187        let mut item = HashMap::new();
5188        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
5189        let names = HashMap::new();
5190        let mut values = HashMap::new();
5191        values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
5192
5193        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5194        assert!(result.is_ok());
5195        let bs = item["data"]["BS"].as_array().unwrap();
5196        assert_eq!(bs.len(), 2);
5197        assert!(!bs.contains(&json!("Yg==")));
5198    }
5199
5200    #[test]
5201    fn test_delete_binary_set_removes_attr_when_empty() {
5202        let mut item = HashMap::new();
5203        item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
5204        let names = HashMap::new();
5205        let mut values = HashMap::new();
5206        values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
5207
5208        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5209        assert!(result.is_ok());
5210        assert!(
5211            !item.contains_key("data"),
5212            "attribute should be removed when set becomes empty"
5213        );
5214    }
5215}