Skip to main content

fakecloud_dynamodb/service/
mod.rs

1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use base64::Engine;
15use http::StatusCode;
16use serde_json::{json, Value};
17
18use fakecloud_core::delivery::DeliveryBus;
19use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
20
21use fakecloud_persistence::{S3Store, SnapshotStore};
22use fakecloud_s3::state::SharedS3State;
23
24use crate::state::{
25    attribute_type_and_value, AttributeDefinition, AttributeValue, DynamoDbSnapshot, DynamoTable,
26    GlobalSecondaryIndex, KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection,
27    ProvisionedThroughput, SharedDynamoDbState, DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
28};
29
30/// Minimal subset of a ``DynamoTable`` that Kinesis streaming delivery needs.
31///
32/// A table can carry megabytes of items; cloning the whole table just to
33/// release the write lock and deliver one change record is extremely wasteful.
34/// Extracting only the fields the delivery path actually reads (destinations,
35/// arn, name) keeps the clone small.
36pub(super) struct KinesisDeliveryTarget {
37    pub destinations: Vec<KinesisDestination>,
38    pub arn: String,
39    pub name: String,
40}
41
42pub struct DynamoDbService {
43    state: SharedDynamoDbState,
44    pub(crate) s3_state: Option<SharedS3State>,
45    pub(crate) s3_store: Option<Arc<dyn S3Store>>,
46    delivery: Option<Arc<DeliveryBus>>,
47    snapshot_store: Option<Arc<dyn SnapshotStore>>,
48    /// Serializes concurrent snapshot writes so the newest observed
49    /// state always wins on disk. Without it, two tasks could race
50    /// between state.read().clone() and store.save() and leave older
51    /// bytes as the final on-disk state.
52    snapshot_lock: Arc<tokio::sync::Mutex<()>>,
53}
54
55impl DynamoDbService {
56    pub fn new(state: SharedDynamoDbState) -> Self {
57        Self {
58            state,
59            s3_state: None,
60            s3_store: None,
61            delivery: None,
62            snapshot_store: None,
63            snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
64        }
65    }
66
67    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
68        self.s3_state = Some(s3_state);
69        self
70    }
71
72    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
73        self.s3_store = Some(store);
74        self
75    }
76
77    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
78        self.delivery = Some(delivery);
79        self
80    }
81
82    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
83        self.snapshot_store = Some(store);
84        self
85    }
86
87    /// Persist the current in-memory state as a snapshot. Called after
88    /// every state-mutating action. A noop when no snapshot store is
89    /// configured (i.e. `StorageMode::Memory`).
90    ///
91    /// The snapshot lock serializes the full clone + serialize + write
92    /// so concurrent mutators cannot leave older bytes on disk, and
93    /// serialization + the blocking file write are offloaded to the
94    /// blocking pool to keep Tokio workers responsive.
95    async fn save_snapshot(&self) {
96        let Some(store) = self.snapshot_store.clone() else {
97            return;
98        };
99        let _guard = self.snapshot_lock.lock().await;
100        let snapshot = DynamoDbSnapshot {
101            schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
102            accounts: Some(self.state.read().clone()),
103            state: None,
104        };
105        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
106            let bytes = serde_json::to_vec(&snapshot)
107                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
108            store.save(&bytes)
109        })
110        .await;
111        match join {
112            Ok(Ok(())) => {}
113            Ok(Err(err)) => tracing::error!(%err, "failed to write dynamodb snapshot"),
114            Err(err) => tracing::error!(%err, "dynamodb snapshot task panicked"),
115        }
116    }
117
118    fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
119        if table
120            .kinesis_destinations
121            .iter()
122            .any(|d| d.destination_status == "ACTIVE")
123        {
124            Some(KinesisDeliveryTarget {
125                destinations: table.kinesis_destinations.clone(),
126                arn: table.arn.clone(),
127                name: table.name.clone(),
128            })
129        } else {
130            None
131        }
132    }
133
134    /// Deliver a change record to all active Kinesis streaming destinations for a table.
135    pub(super) fn deliver_to_kinesis_destinations(
136        &self,
137        target: &KinesisDeliveryTarget,
138        event_name: &str,
139        keys: &HashMap<String, AttributeValue>,
140        old_image: Option<&HashMap<String, AttributeValue>>,
141        new_image: Option<&HashMap<String, AttributeValue>>,
142    ) {
143        let delivery = match &self.delivery {
144            Some(d) => d,
145            None => return,
146        };
147
148        let active_destinations: Vec<_> = target
149            .destinations
150            .iter()
151            .filter(|d| d.destination_status == "ACTIVE")
152            .collect();
153
154        if active_destinations.is_empty() {
155            return;
156        }
157
158        let mut record = json!({
159            "eventID": uuid::Uuid::new_v4().to_string(),
160            "eventName": event_name,
161            "eventVersion": "1.1",
162            "eventSource": "aws:dynamodb",
163            "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
164            "dynamodb": {
165                "Keys": keys,
166                "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
167                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
168                "StreamViewType": "NEW_AND_OLD_IMAGES",
169            },
170            "eventSourceARN": &target.arn,
171            "tableName": &target.name,
172        });
173
174        if let Some(old) = old_image {
175            record["dynamodb"]["OldImage"] = json!(old);
176        }
177        if let Some(new) = new_image {
178            record["dynamodb"]["NewImage"] = json!(new);
179        }
180
181        let record_str = serde_json::to_string(&record).unwrap_or_default();
182        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
183        let partition_key = serde_json::to_string(keys).unwrap_or_default();
184
185        for dest in active_destinations {
186            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
187        }
188    }
189
190    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
191        serde_json::from_slice(&req.body).map_err(|e| {
192            AwsServiceError::aws_error(
193                StatusCode::BAD_REQUEST,
194                "SerializationException",
195                format!("Invalid JSON: {e}"),
196            )
197        })
198    }
199
200    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
201        Ok(AwsResponse::ok_json(body))
202    }
203}
204
205#[async_trait]
206impl AwsService for DynamoDbService {
207    fn service_name(&self) -> &str {
208        "dynamodb"
209    }
210
211    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
212        let mutates = is_mutating_action(req.action.as_str());
213        let result = match req.action.as_str() {
214            "CreateTable" => self.create_table(&req),
215            "DeleteTable" => self.delete_table(&req),
216            "DescribeTable" => self.describe_table(&req),
217            "ListTables" => self.list_tables(&req),
218            "UpdateTable" => self.update_table(&req),
219            "PutItem" => self.put_item(&req),
220            "GetItem" => self.get_item(&req),
221            "DeleteItem" => self.delete_item(&req),
222            "UpdateItem" => self.update_item(&req),
223            "Query" => self.query(&req),
224            "Scan" => self.scan(&req),
225            "BatchGetItem" => self.batch_get_item(&req),
226            "BatchWriteItem" => self.batch_write_item(&req),
227            "TagResource" => self.tag_resource(&req),
228            "UntagResource" => self.untag_resource(&req),
229            "ListTagsOfResource" => self.list_tags_of_resource(&req),
230            "TransactGetItems" => self.transact_get_items(&req),
231            "TransactWriteItems" => self.transact_write_items(&req),
232            "ExecuteStatement" => self.execute_statement(&req),
233            "BatchExecuteStatement" => self.batch_execute_statement(&req),
234            "ExecuteTransaction" => self.execute_transaction(&req),
235            "UpdateTimeToLive" => self.update_time_to_live(&req),
236            "DescribeTimeToLive" => self.describe_time_to_live(&req),
237            "PutResourcePolicy" => self.put_resource_policy(&req),
238            "GetResourcePolicy" => self.get_resource_policy(&req),
239            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
240            // Stubs
241            "DescribeEndpoints" => self.describe_endpoints(&req),
242            "DescribeLimits" => self.describe_limits(&req),
243            // Backups
244            "CreateBackup" => self.create_backup(&req),
245            "DeleteBackup" => self.delete_backup(&req),
246            "DescribeBackup" => self.describe_backup(&req),
247            "ListBackups" => self.list_backups(&req),
248            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
249            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
250            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
251            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
252            // Global tables
253            "CreateGlobalTable" => self.create_global_table(&req),
254            "DescribeGlobalTable" => self.describe_global_table(&req),
255            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
256            "ListGlobalTables" => self.list_global_tables(&req),
257            "UpdateGlobalTable" => self.update_global_table(&req),
258            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
259            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
260            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
261            // Kinesis streaming
262            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
263            "DisableKinesisStreamingDestination" => {
264                self.disable_kinesis_streaming_destination(&req)
265            }
266            "DescribeKinesisStreamingDestination" => {
267                self.describe_kinesis_streaming_destination(&req)
268            }
269            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
270            // Contributor insights
271            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
272            "UpdateContributorInsights" => self.update_contributor_insights(&req),
273            "ListContributorInsights" => self.list_contributor_insights(&req),
274            // Import/Export
275            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
276            "DescribeExport" => self.describe_export(&req),
277            "ListExports" => self.list_exports(&req),
278            "ImportTable" => self.import_table(&req),
279            "DescribeImport" => self.describe_import(&req),
280            "ListImports" => self.list_imports(&req),
281            _ => Err(AwsServiceError::action_not_implemented(
282                "dynamodb",
283                &req.action,
284            )),
285        };
286        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
287            self.save_snapshot().await;
288        }
289        result
290    }
291
292    fn supported_actions(&self) -> &[&str] {
293        &[
294            "CreateTable",
295            "DeleteTable",
296            "DescribeTable",
297            "ListTables",
298            "UpdateTable",
299            "PutItem",
300            "GetItem",
301            "DeleteItem",
302            "UpdateItem",
303            "Query",
304            "Scan",
305            "BatchGetItem",
306            "BatchWriteItem",
307            "TagResource",
308            "UntagResource",
309            "ListTagsOfResource",
310            "TransactGetItems",
311            "TransactWriteItems",
312            "ExecuteStatement",
313            "BatchExecuteStatement",
314            "ExecuteTransaction",
315            "UpdateTimeToLive",
316            "DescribeTimeToLive",
317            "PutResourcePolicy",
318            "GetResourcePolicy",
319            "DeleteResourcePolicy",
320            "DescribeEndpoints",
321            "DescribeLimits",
322            "CreateBackup",
323            "DeleteBackup",
324            "DescribeBackup",
325            "ListBackups",
326            "RestoreTableFromBackup",
327            "RestoreTableToPointInTime",
328            "UpdateContinuousBackups",
329            "DescribeContinuousBackups",
330            "CreateGlobalTable",
331            "DescribeGlobalTable",
332            "DescribeGlobalTableSettings",
333            "ListGlobalTables",
334            "UpdateGlobalTable",
335            "UpdateGlobalTableSettings",
336            "DescribeTableReplicaAutoScaling",
337            "UpdateTableReplicaAutoScaling",
338            "EnableKinesisStreamingDestination",
339            "DisableKinesisStreamingDestination",
340            "DescribeKinesisStreamingDestination",
341            "UpdateKinesisStreamingDestination",
342            "DescribeContributorInsights",
343            "UpdateContributorInsights",
344            "ListContributorInsights",
345            "ExportTableToPointInTime",
346            "DescribeExport",
347            "ListExports",
348            "ImportTable",
349            "DescribeImport",
350            "ListImports",
351        ]
352    }
353}
354/// Actions that mutate DynamoDB state and therefore require a snapshot
355/// write after success. Kept in sync with the dispatch table above.
356fn is_mutating_action(action: &str) -> bool {
357    matches!(
358        action,
359        "CreateTable"
360            | "DeleteTable"
361            | "UpdateTable"
362            | "PutItem"
363            | "DeleteItem"
364            | "UpdateItem"
365            | "BatchWriteItem"
366            | "TagResource"
367            | "UntagResource"
368            | "TransactWriteItems"
369            | "ExecuteStatement"
370            | "BatchExecuteStatement"
371            | "ExecuteTransaction"
372            | "UpdateTimeToLive"
373            | "PutResourcePolicy"
374            | "DeleteResourcePolicy"
375            | "CreateBackup"
376            | "DeleteBackup"
377            | "RestoreTableFromBackup"
378            | "RestoreTableToPointInTime"
379            | "UpdateContinuousBackups"
380            | "CreateGlobalTable"
381            | "UpdateGlobalTable"
382            | "UpdateGlobalTableSettings"
383            | "UpdateTableReplicaAutoScaling"
384            | "EnableKinesisStreamingDestination"
385            | "DisableKinesisStreamingDestination"
386            | "UpdateKinesisStreamingDestination"
387            | "UpdateContributorInsights"
388            | "ExportTableToPointInTime"
389            | "ImportTable"
390    )
391}
392
393// ── Helper functions ────────────────────────────────────────────────────
394
395fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
396    body[field].as_str().ok_or_else(|| {
397        AwsServiceError::aws_error(
398            StatusCode::BAD_REQUEST,
399            "ValidationException",
400            format!("{field} is required"),
401        )
402    })
403}
404
405fn require_object(
406    body: &Value,
407    field: &str,
408) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
409    let obj = body[field].as_object().ok_or_else(|| {
410        AwsServiceError::aws_error(
411            StatusCode::BAD_REQUEST,
412            "ValidationException",
413            format!("{field} is required"),
414        )
415    })?;
416    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
417}
418
419fn get_table<'a>(
420    tables: &'a HashMap<String, DynamoTable>,
421    name: &str,
422) -> Result<&'a DynamoTable, AwsServiceError> {
423    tables.get(name).ok_or_else(|| {
424        AwsServiceError::aws_error(
425            StatusCode::BAD_REQUEST,
426            "ResourceNotFoundException",
427            format!("Requested resource not found: Table: {name} not found"),
428        )
429    })
430}
431
432fn get_table_mut<'a>(
433    tables: &'a mut HashMap<String, DynamoTable>,
434    name: &str,
435) -> Result<&'a mut DynamoTable, AwsServiceError> {
436    tables.get_mut(name).ok_or_else(|| {
437        AwsServiceError::aws_error(
438            StatusCode::BAD_REQUEST,
439            "ResourceNotFoundException",
440            format!("Requested resource not found: Table: {name} not found"),
441        )
442    })
443}
444
445fn find_table_by_arn<'a>(
446    tables: &'a HashMap<String, DynamoTable>,
447    arn: &str,
448) -> Result<&'a DynamoTable, AwsServiceError> {
449    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
450        AwsServiceError::aws_error(
451            StatusCode::BAD_REQUEST,
452            "ResourceNotFoundException",
453            format!("Requested resource not found: {arn}"),
454        )
455    })
456}
457
458fn find_table_by_arn_mut<'a>(
459    tables: &'a mut HashMap<String, DynamoTable>,
460    arn: &str,
461) -> Result<&'a mut DynamoTable, AwsServiceError> {
462    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
463        AwsServiceError::aws_error(
464            StatusCode::BAD_REQUEST,
465            "ResourceNotFoundException",
466            format!("Requested resource not found: {arn}"),
467        )
468    })
469}
470
471fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
472    let arr = val.as_array().ok_or_else(|| {
473        AwsServiceError::aws_error(
474            StatusCode::BAD_REQUEST,
475            "ValidationException",
476            "KeySchema is required",
477        )
478    })?;
479    Ok(arr
480        .iter()
481        .map(|elem| KeySchemaElement {
482            attribute_name: elem["AttributeName"]
483                .as_str()
484                .unwrap_or_default()
485                .to_string(),
486            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
487        })
488        .collect())
489}
490
491fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
492    let arr = val.as_array().ok_or_else(|| {
493        AwsServiceError::aws_error(
494            StatusCode::BAD_REQUEST,
495            "ValidationException",
496            "AttributeDefinitions is required",
497        )
498    })?;
499    Ok(arr
500        .iter()
501        .map(|elem| AttributeDefinition {
502            attribute_name: elem["AttributeName"]
503                .as_str()
504                .unwrap_or_default()
505                .to_string(),
506            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
507        })
508        .collect())
509}
510
511fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
512    Ok(ProvisionedThroughput {
513        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
514        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
515    })
516}
517
518fn parse_gsi(val: &Value, billing_mode: &str) -> Vec<GlobalSecondaryIndex> {
519    let Some(arr) = val.as_array() else {
520        return Vec::new();
521    };
522    arr.iter()
523        .filter_map(|g| {
524            Some(GlobalSecondaryIndex {
525                index_name: g["IndexName"].as_str()?.to_string(),
526                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
527                projection: parse_projection(&g["Projection"]),
528                provisioned_throughput: Some(parse_gsi_throughput(
529                    &g["ProvisionedThroughput"],
530                    billing_mode,
531                )),
532                on_demand_throughput: parse_on_demand_throughput(&g["OnDemandThroughput"]),
533            })
534        })
535        .collect()
536}
537
538/// Parse an `OnDemandThroughput` block. Absent fields default to `-1`,
539/// which is AWS's sentinel for "no cap" — and the value real AWS echoes
540/// back on DescribeTable when the caller omitted either axis.
541pub(super) fn parse_on_demand_throughput(val: &Value) -> Option<crate::state::OnDemandThroughput> {
542    if !val.is_object() {
543        return None;
544    }
545    Some(crate::state::OnDemandThroughput {
546        max_read_request_units: val["MaxReadRequestUnits"].as_i64().unwrap_or(-1),
547        max_write_request_units: val["MaxWriteRequestUnits"].as_i64().unwrap_or(-1),
548    })
549}
550
551/// Resolve the provisioned-throughput slot for a GSI on a CreateTable or
552/// UpdateTable Create action. Real DynamoDB returns `{0, 0}` for GSIs on
553/// PAY_PER_REQUEST tables regardless of whether the caller sent a
554/// `ProvisionedThroughput` block, and the Terraform provider's `flatten`
555/// code keys `name`/`read_capacity`/`write_capacity` off the presence of
556/// that field — returning `None` would desynchronise state.
557fn parse_gsi_throughput(val: &Value, billing_mode: &str) -> ProvisionedThroughput {
558    if billing_mode == "PAY_PER_REQUEST" {
559        return ProvisionedThroughput {
560            read_capacity_units: 0,
561            write_capacity_units: 0,
562        };
563    }
564    ProvisionedThroughput {
565        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
566        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
567    }
568}
569
570fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
571    let Some(arr) = val.as_array() else {
572        return Vec::new();
573    };
574    arr.iter()
575        .filter_map(|l| {
576            Some(LocalSecondaryIndex {
577                index_name: l["IndexName"].as_str()?.to_string(),
578                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
579                projection: parse_projection(&l["Projection"]),
580            })
581        })
582        .collect()
583}
584
585pub(super) fn parse_projection(val: &Value) -> Projection {
586    Projection {
587        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
588        non_key_attributes: val["NonKeyAttributes"]
589            .as_array()
590            .map(|arr| {
591                arr.iter()
592                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
593                    .collect()
594            })
595            .unwrap_or_default(),
596    }
597}
598
599fn parse_tags(val: &Value) -> HashMap<String, String> {
600    let mut tags = HashMap::new();
601    if let Some(arr) = val.as_array() {
602        for tag in arr {
603            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
604                tags.insert(k.to_string(), v.to_string());
605            }
606        }
607    }
608    tags
609}
610
611fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
612    let mut names = HashMap::new();
613    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
614        for (k, v) in obj {
615            if let Some(s) = v.as_str() {
616                names.insert(k.clone(), s.to_string());
617            }
618        }
619    }
620    names
621}
622
623fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
624    let mut values = HashMap::new();
625    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
626        for (k, v) in obj {
627            values.insert(k.clone(), v.clone());
628        }
629    }
630    values
631}
632
633fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
634    if name.starts_with('#') {
635        expr_attr_names
636            .get(name)
637            .cloned()
638            .unwrap_or_else(|| name.to_string())
639    } else {
640        name.to_string()
641    }
642}
643
644fn extract_key(
645    table: &DynamoTable,
646    item: &HashMap<String, AttributeValue>,
647) -> HashMap<String, AttributeValue> {
648    let mut key = HashMap::new();
649    let hash_key = table.hash_key_name();
650    if let Some(v) = item.get(hash_key) {
651        key.insert(hash_key.to_string(), v.clone());
652    }
653    if let Some(range_key) = table.range_key_name() {
654        if let Some(v) = item.get(range_key) {
655            key.insert(range_key.to_string(), v.clone());
656        }
657    }
658    key
659}
660
661/// Parse a JSON object into a key map (used for ExclusiveStartKey).
662fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
663    let obj = value.as_object()?;
664    if obj.is_empty() {
665        return None;
666    }
667    Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
668}
669
670/// Check whether an item's key attributes match the given key map.
671fn item_matches_key(
672    item: &HashMap<String, AttributeValue>,
673    key: &HashMap<String, AttributeValue>,
674    hash_key_name: &str,
675    range_key_name: Option<&str>,
676) -> bool {
677    let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
678        (Some(a), Some(b)) => a == b,
679        _ => false,
680    };
681    if !hash_match {
682        return false;
683    }
684    match range_key_name {
685        Some(rk) => match (item.get(rk), key.get(rk)) {
686            (Some(a), Some(b)) => a == b,
687            (None, None) => true,
688            _ => false,
689        },
690        None => true,
691    }
692}
693
694/// Extract the primary key from an item given explicit key attribute names.
695fn extract_key_for_schema(
696    item: &HashMap<String, AttributeValue>,
697    hash_key_name: &str,
698    range_key_name: Option<&str>,
699) -> HashMap<String, AttributeValue> {
700    let mut key = HashMap::new();
701    if let Some(v) = item.get(hash_key_name) {
702        key.insert(hash_key_name.to_string(), v.clone());
703    }
704    if let Some(rk) = range_key_name {
705        if let Some(v) = item.get(rk) {
706            key.insert(rk.to_string(), v.clone());
707        }
708    }
709    key
710}
711
712fn validate_key_in_item(
713    table: &DynamoTable,
714    item: &HashMap<String, AttributeValue>,
715) -> Result<(), AwsServiceError> {
716    let hash_key = table.hash_key_name();
717    if !item.contains_key(hash_key) {
718        return Err(AwsServiceError::aws_error(
719            StatusCode::BAD_REQUEST,
720            "ValidationException",
721            format!("Missing the key {hash_key} in the item"),
722        ));
723    }
724    if let Some(range_key) = table.range_key_name() {
725        if !item.contains_key(range_key) {
726            return Err(AwsServiceError::aws_error(
727                StatusCode::BAD_REQUEST,
728                "ValidationException",
729                format!("Missing the key {range_key} in the item"),
730            ));
731        }
732    }
733    Ok(())
734}
735
736fn validate_key_attributes_in_key(
737    table: &DynamoTable,
738    key: &HashMap<String, AttributeValue>,
739) -> Result<(), AwsServiceError> {
740    let hash_key = table.hash_key_name();
741    if !key.contains_key(hash_key) {
742        return Err(AwsServiceError::aws_error(
743            StatusCode::BAD_REQUEST,
744            "ValidationException",
745            format!("Missing the key {hash_key} in the item"),
746        ));
747    }
748    Ok(())
749}
750
751fn project_item(
752    item: &HashMap<String, AttributeValue>,
753    body: &Value,
754) -> HashMap<String, AttributeValue> {
755    let projection = body["ProjectionExpression"].as_str();
756    match projection {
757        Some(proj) if !proj.is_empty() => {
758            let expr_attr_names = parse_expression_attribute_names(body);
759            let attrs: Vec<String> = proj
760                .split(',')
761                .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
762                .collect();
763            let mut result = HashMap::new();
764            for attr in &attrs {
765                if let Some(v) = resolve_nested_path(item, attr) {
766                    insert_nested_value(&mut result, attr, v);
767                }
768            }
769            result
770        }
771        _ => item.clone(),
772    }
773}
774
775/// Resolve expression attribute names within each segment of a projection path.
776/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
777fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
778    // Split on dots, resolve each part, rejoin
779    let mut result = String::new();
780    for (i, segment) in path.split('.').enumerate() {
781        if i > 0 {
782            result.push('.');
783        }
784        // A segment might be like "#n" or "people[0]" or "#attr[0]"
785        if let Some(bracket_pos) = segment.find('[') {
786            let key_part = &segment[..bracket_pos];
787            let index_part = &segment[bracket_pos..];
788            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
789            result.push_str(index_part);
790        } else {
791            result.push_str(&resolve_attr_name(segment, expr_attr_names));
792        }
793    }
794    result
795}
796
797/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
798fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
799    let segments = parse_path_segments(path);
800    if segments.is_empty() {
801        return None;
802    }
803
804    let first = &segments[0];
805    let top_key = match first {
806        PathSegment::Key(k) => k.as_str(),
807        _ => return None,
808    };
809
810    let mut current = item.get(top_key)?.clone();
811
812    for segment in &segments[1..] {
813        match segment {
814            PathSegment::Key(k) => {
815                // Navigate into a Map: {"M": {"key": ...}}
816                current = current.get("M")?.get(k)?.clone();
817            }
818            PathSegment::Index(idx) => {
819                // Navigate into a List: {"L": [...]}
820                current = current.get("L")?.get(*idx)?.clone();
821            }
822        }
823    }
824
825    Some(current)
826}
827
828#[derive(Debug)]
829enum PathSegment {
830    Key(String),
831    Index(usize),
832}
833
834/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
835fn parse_path_segments(path: &str) -> Vec<PathSegment> {
836    let mut segments = Vec::new();
837    let mut current = String::new();
838
839    let chars: Vec<char> = path.chars().collect();
840    let mut i = 0;
841    while i < chars.len() {
842        match chars[i] {
843            '.' => {
844                if !current.is_empty() {
845                    segments.push(PathSegment::Key(current.clone()));
846                    current.clear();
847                }
848            }
849            '[' => {
850                if !current.is_empty() {
851                    segments.push(PathSegment::Key(current.clone()));
852                    current.clear();
853                }
854                i += 1;
855                let mut num = String::new();
856                while i < chars.len() && chars[i] != ']' {
857                    num.push(chars[i]);
858                    i += 1;
859                }
860                if let Ok(idx) = num.parse::<usize>() {
861                    segments.push(PathSegment::Index(idx));
862                }
863                // skip ']'
864            }
865            c => {
866                current.push(c);
867            }
868        }
869        i += 1;
870    }
871    if !current.is_empty() {
872        segments.push(PathSegment::Key(current));
873    }
874    segments
875}
876
877/// Insert a value at a nested path in the result HashMap.
878/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
879fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
880    // Simple case: no nesting
881    if !path.contains('.') && !path.contains('[') {
882        result.insert(path.to_string(), value);
883        return;
884    }
885
886    let segments = parse_path_segments(path);
887    if segments.is_empty() {
888        return;
889    }
890
891    let top_key = match &segments[0] {
892        PathSegment::Key(k) => k.clone(),
893        _ => return,
894    };
895
896    if segments.len() == 1 {
897        result.insert(top_key, value);
898        return;
899    }
900
901    // For nested paths, wrap the value back into the nested structure
902    let wrapped = wrap_value_in_path(&segments[1..], value);
903    // Merge into existing value if present
904    let existing = result.remove(&top_key);
905    let merged = match existing {
906        Some(existing) => merge_attribute_values(existing, wrapped),
907        None => wrapped,
908    };
909    result.insert(top_key, merged);
910}
911
912/// Wrap a value in the nested path structure.
913fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
914    if segments.is_empty() {
915        return value;
916    }
917    let inner = wrap_value_in_path(&segments[1..], value);
918    match &segments[0] {
919        PathSegment::Key(k) => {
920            json!({"M": {k.clone(): inner}})
921        }
922        PathSegment::Index(idx) => {
923            let mut arr = vec![Value::Null; idx + 1];
924            arr[*idx] = inner;
925            json!({"L": arr})
926        }
927    }
928}
929
930/// Merge two attribute values (for overlapping projections).
931fn merge_attribute_values(a: Value, b: Value) -> Value {
932    if let (Some(a_map), Some(b_map)) = (
933        a.get("M").and_then(|v| v.as_object()),
934        b.get("M").and_then(|v| v.as_object()),
935    ) {
936        let mut merged = a_map.clone();
937        for (k, v) in b_map {
938            if let Some(existing) = merged.get(k) {
939                merged.insert(
940                    k.clone(),
941                    merge_attribute_values(existing.clone(), v.clone()),
942                );
943            } else {
944                merged.insert(k.clone(), v.clone());
945            }
946        }
947        json!({"M": merged})
948    } else {
949        b
950    }
951}
952
953fn evaluate_condition(
954    condition: &str,
955    existing: Option<&HashMap<String, AttributeValue>>,
956    expr_attr_names: &HashMap<String, String>,
957    expr_attr_values: &HashMap<String, Value>,
958) -> Result<(), AwsServiceError> {
959    // ConditionExpression and FilterExpression share the same DynamoDB grammar,
960    // so we delegate to evaluate_filter_expression. An empty map models "item
961    // doesn't exist" correctly: attribute_exists → false, attribute_not_exists
962    // → true, comparisons against missing attributes → None vs Some(val).
963    let empty = HashMap::new();
964    let item = existing.unwrap_or(&empty);
965    if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
966        Ok(())
967    } else {
968        Err(AwsServiceError::aws_error(
969            StatusCode::BAD_REQUEST,
970            "ConditionalCheckFailedException",
971            "The conditional request failed",
972        ))
973    }
974}
975
976fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
977    // aws-sdk-go v2's expression builder emits function calls with a space
978    // between the name and the opening paren (`attribute_exists (#0)`),
979    // while hand-written expressions usually don't — accept both.
980    let with_paren = format!("{func_name}(");
981    let with_space = format!("{func_name} (");
982    let rest = expr
983        .strip_prefix(&with_paren)
984        .or_else(|| expr.strip_prefix(&with_space))?;
985    let inner = rest.strip_suffix(')')?;
986    Some(inner.trim())
987}
988
989fn evaluate_key_condition(
990    expr: &str,
991    item: &HashMap<String, AttributeValue>,
992    expr_attr_names: &HashMap<String, String>,
993    expr_attr_values: &HashMap<String, Value>,
994) -> bool {
995    let trimmed = expr.trim();
996
997    let parts = split_on_and(trimmed);
998    if parts.len() > 1 {
999        return parts.iter().all(|part| {
1000            evaluate_key_condition(part.trim(), item, expr_attr_names, expr_attr_values)
1001        });
1002    }
1003
1004    let stripped = strip_outer_parens(trimmed);
1005    if stripped != trimmed {
1006        return evaluate_key_condition(stripped, item, expr_attr_names, expr_attr_values);
1007    }
1008
1009    evaluate_single_key_condition(trimmed, item, expr_attr_names, expr_attr_values)
1010}
1011
1012/// Split a DynamoDB condition expression on a top-level keyword (``" AND "``,
1013/// ``" OR "``), case-insensitively. Parenthesised groups are skipped so only
1014/// unparenthesised occurrences of the keyword act as separators.
1015fn split_on_top_level_keyword<'a>(expr: &'a str, keyword: &str) -> Vec<&'a str> {
1016    let mut parts = Vec::new();
1017    let mut start = 0;
1018    let len = expr.len();
1019    let mut i = 0;
1020    let mut depth = 0;
1021    while i < len {
1022        let ch = expr.as_bytes()[i];
1023        if ch == b'(' {
1024            depth += 1;
1025        } else if ch == b')' {
1026            if depth > 0 {
1027                depth -= 1;
1028            }
1029        } else if depth == 0
1030            && i + keyword.len() <= len
1031            && expr.is_char_boundary(i)
1032            && expr.is_char_boundary(i + keyword.len())
1033            && expr[i..i + keyword.len()].eq_ignore_ascii_case(keyword)
1034        {
1035            parts.push(&expr[start..i]);
1036            start = i + keyword.len();
1037            i = start;
1038            continue;
1039        }
1040        i += 1;
1041    }
1042    parts.push(&expr[start..]);
1043    parts
1044}
1045
1046fn split_on_and(expr: &str) -> Vec<&str> {
1047    split_on_top_level_keyword(expr, " AND ")
1048}
1049
1050fn split_on_or(expr: &str) -> Vec<&str> {
1051    split_on_top_level_keyword(expr, " OR ")
1052}
1053
1054fn evaluate_single_key_condition(
1055    part: &str,
1056    item: &HashMap<String, AttributeValue>,
1057    expr_attr_names: &HashMap<String, String>,
1058    expr_attr_values: &HashMap<String, Value>,
1059) -> bool {
1060    let part = part.trim();
1061
1062    if let Some(rest) = part
1063        .strip_prefix("begins_with(")
1064        .or_else(|| part.strip_prefix("begins_with ("))
1065    {
1066        return key_cond_begins_with(rest, item, expr_attr_names, expr_attr_values);
1067    }
1068
1069    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
1070        return key_cond_between(part, between_pos, item, expr_attr_names, expr_attr_values);
1071    }
1072
1073    key_cond_simple_comparison(part, item, expr_attr_names, expr_attr_values)
1074}
1075
1076/// `begins_with(attr, :val)` — KeyCondition variant: supports only
1077/// S-typed attributes (mirrors AWS's behavior of returning false for
1078/// type mismatches). The filter-expression evaluator has its own
1079/// `eval_begins_with` because it operates on filter-grammar inputs.
1080fn key_cond_begins_with(
1081    rest: &str,
1082    item: &HashMap<String, AttributeValue>,
1083    expr_attr_names: &HashMap<String, String>,
1084    expr_attr_values: &HashMap<String, Value>,
1085) -> bool {
1086    let Some(inner) = rest.strip_suffix(')') else {
1087        return false;
1088    };
1089    let mut split = inner.splitn(2, ',');
1090    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1091        return false;
1092    };
1093    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1094    let expected = expr_attr_values.get(val_ref.trim());
1095    let actual = item.get(&attr_name);
1096    match (actual, expected) {
1097        (Some(a), Some(e)) => {
1098            let a_str = a.get("S").and_then(|v| v.as_str());
1099            let e_str = e.get("S").and_then(|v| v.as_str());
1100            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
1101        }
1102        _ => false,
1103    }
1104}
1105
1106/// `attr BETWEEN :lo AND :hi` — inclusive range comparison via the
1107/// shared `compare_attribute_values` ordering.
1108fn key_cond_between(
1109    part: &str,
1110    between_pos: usize,
1111    item: &HashMap<String, AttributeValue>,
1112    expr_attr_names: &HashMap<String, String>,
1113    expr_attr_values: &HashMap<String, Value>,
1114) -> bool {
1115    let attr_part = part[..between_pos].trim();
1116    let attr_name = resolve_attr_name(attr_part, expr_attr_names);
1117    let range_part = &part[between_pos + 7..];
1118    let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") else {
1119        return false;
1120    };
1121    let lo_ref = range_part[..and_pos].trim();
1122    let hi_ref = range_part[and_pos + 5..].trim();
1123    let lo = expr_attr_values.get(lo_ref);
1124    let hi = expr_attr_values.get(hi_ref);
1125    let actual = item.get(&attr_name);
1126    match (actual, lo, hi) {
1127        (Some(a), Some(l), Some(h)) => {
1128            compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
1129                && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
1130        }
1131        _ => false,
1132    }
1133}
1134
1135/// `attr <op> :val` — six operators (`=`, `<>`, `<`, `>`, `<=`, `>=`).
1136/// Multi-character operators come first in the search list so that `<=`
1137/// is not mistakenly matched as `<`.
1138fn key_cond_simple_comparison(
1139    part: &str,
1140    item: &HashMap<String, AttributeValue>,
1141    expr_attr_names: &HashMap<String, String>,
1142    expr_attr_values: &HashMap<String, Value>,
1143) -> bool {
1144    for op in &["<=", ">=", "<>", "=", "<", ">"] {
1145        let Some(pos) = part.find(op) else {
1146            continue;
1147        };
1148        let left = part[..pos].trim();
1149        let right = part[pos + op.len()..].trim();
1150        let attr_name = resolve_attr_name(left, expr_attr_names);
1151        let expected = expr_attr_values.get(right);
1152        let actual = item.get(&attr_name);
1153
1154        return match *op {
1155            "=" => actual == expected,
1156            "<>" => actual != expected,
1157            "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
1158            ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
1159            "<=" => {
1160                let cmp = compare_attribute_values(actual, expected);
1161                cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
1162            }
1163            ">=" => {
1164                let cmp = compare_attribute_values(actual, expected);
1165                cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
1166            }
1167            _ => false,
1168        };
1169    }
1170    false
1171}
1172
1173/// Returns the "size" of a DynamoDB attribute value per AWS docs:
1174/// S → character count, N → always 0 (AWS returns size of internal representation, we approximate),
1175/// B → byte count, SS/NS/BS → element count, L → element count, M → element count,
1176/// BOOL/NULL → 1.
1177fn attribute_size(val: &Value) -> Option<usize> {
1178    if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
1179        return Some(s.len());
1180    }
1181    if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
1182        // B is base64-encoded — return decoded byte count
1183        let decoded_len = base64::engine::general_purpose::STANDARD
1184            .decode(b)
1185            .map(|v| v.len())
1186            .unwrap_or(b.len());
1187        return Some(decoded_len);
1188    }
1189    if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
1190        return Some(arr.len());
1191    }
1192    if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
1193        return Some(arr.len());
1194    }
1195    if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
1196        return Some(arr.len());
1197    }
1198    if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
1199        return Some(arr.len());
1200    }
1201    if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
1202        return Some(obj.len());
1203    }
1204    if val.get("N").is_some() {
1205        // AWS returns numeric representation size; approximate with string length
1206        return val.get("N").and_then(|v| v.as_str()).map(|s| s.len());
1207    }
1208    if val.get("BOOL").is_some() || val.get("NULL").is_some() {
1209        return Some(1);
1210    }
1211    None
1212}
1213
1214/// Evaluate a `size(path) op :val` comparison expression.
1215fn evaluate_size_comparison(
1216    part: &str,
1217    item: &HashMap<String, AttributeValue>,
1218    expr_attr_names: &HashMap<String, String>,
1219    expr_attr_values: &HashMap<String, Value>,
1220) -> Option<bool> {
1221    // Find the closing paren of size(...)
1222    let open = part.find('(')?;
1223    let close = part[open..].find(')')? + open;
1224    let path = part[open + 1..close].trim();
1225    let remainder = part[close + 1..].trim();
1226
1227    // Parse operator and value ref
1228    let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
1229        ("<=", rest.trim())
1230    } else if let Some(rest) = remainder.strip_prefix(">=") {
1231        (">=", rest.trim())
1232    } else if let Some(rest) = remainder.strip_prefix("<>") {
1233        ("<>", rest.trim())
1234    } else if let Some(rest) = remainder.strip_prefix('<') {
1235        ("<", rest.trim())
1236    } else if let Some(rest) = remainder.strip_prefix('>') {
1237        (">", rest.trim())
1238    } else if let Some(rest) = remainder.strip_prefix('=') {
1239        ("=", rest.trim())
1240    } else {
1241        return None;
1242    };
1243
1244    let attr_name = resolve_attr_name(path, expr_attr_names);
1245    let actual = item.get(&attr_name)?;
1246    let size = attribute_size(actual)? as f64;
1247
1248    let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
1249
1250    Some(match op {
1251        "=" => (size - expected).abs() < f64::EPSILON,
1252        "<>" => (size - expected).abs() >= f64::EPSILON,
1253        "<" => size < expected,
1254        ">" => size > expected,
1255        "<=" => size <= expected,
1256        ">=" => size >= expected,
1257        _ => false,
1258    })
1259}
1260
1261fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
1262    match (a, b) {
1263        (None, None) => std::cmp::Ordering::Equal,
1264        (None, Some(_)) => std::cmp::Ordering::Less,
1265        (Some(_), None) => std::cmp::Ordering::Greater,
1266        (Some(a), Some(b)) => {
1267            let a_type = attribute_type_and_value(a);
1268            let b_type = attribute_type_and_value(b);
1269            match (a_type, b_type) {
1270                (Some(("S", a_val)), Some(("S", b_val))) => {
1271                    let a_str = a_val.as_str().unwrap_or("");
1272                    let b_str = b_val.as_str().unwrap_or("");
1273                    a_str.cmp(b_str)
1274                }
1275                (Some(("N", a_val)), Some(("N", b_val))) => {
1276                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1277                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1278                    a_num
1279                        .partial_cmp(&b_num)
1280                        .unwrap_or(std::cmp::Ordering::Equal)
1281                }
1282                (Some(("B", a_val)), Some(("B", b_val))) => {
1283                    let a_str = a_val.as_str().unwrap_or("");
1284                    let b_str = b_val.as_str().unwrap_or("");
1285                    a_str.cmp(b_str)
1286                }
1287                _ => std::cmp::Ordering::Equal,
1288            }
1289        }
1290    }
1291}
1292
1293fn evaluate_filter_expression(
1294    expr: &str,
1295    item: &HashMap<String, AttributeValue>,
1296    expr_attr_names: &HashMap<String, String>,
1297    expr_attr_values: &HashMap<String, Value>,
1298) -> bool {
1299    let trimmed = expr.trim();
1300
1301    // Split on OR first (lower precedence), respecting parentheses
1302    let or_parts = split_on_or(trimmed);
1303    if or_parts.len() > 1 {
1304        return or_parts.iter().any(|part| {
1305            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1306        });
1307    }
1308
1309    // Then split on AND (higher precedence), respecting parentheses
1310    let and_parts = split_on_and(trimmed);
1311    if and_parts.len() > 1 {
1312        return and_parts.iter().all(|part| {
1313            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1314        });
1315    }
1316
1317    // Strip outer parentheses if present
1318    let stripped = strip_outer_parens(trimmed);
1319    if stripped != trimmed {
1320        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
1321    }
1322
1323    // Handle NOT prefix (case-insensitive)
1324    if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
1325        return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
1326    }
1327
1328    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
1329}
1330
1331/// Strip matching outer parentheses from an expression.
1332fn strip_outer_parens(expr: &str) -> &str {
1333    let trimmed = expr.trim();
1334    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
1335        return trimmed;
1336    }
1337    // Verify the outer parens actually match each other
1338    let inner = &trimmed[1..trimmed.len() - 1];
1339    let mut depth = 0;
1340    for ch in inner.bytes() {
1341        match ch {
1342            b'(' => depth += 1,
1343            b')' => {
1344                if depth == 0 {
1345                    return trimmed; // closing paren matches something inside, not the outer one
1346                }
1347                depth -= 1;
1348            }
1349            _ => {}
1350        }
1351    }
1352    if depth == 0 {
1353        inner
1354    } else {
1355        trimmed
1356    }
1357}
1358
1359fn evaluate_single_filter_condition(
1360    part: &str,
1361    item: &HashMap<String, AttributeValue>,
1362    expr_attr_names: &HashMap<String, String>,
1363    expr_attr_values: &HashMap<String, Value>,
1364) -> bool {
1365    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
1366        let attr = resolve_attr_name(inner, expr_attr_names);
1367        return item.contains_key(&attr);
1368    }
1369
1370    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
1371        let attr = resolve_attr_name(inner, expr_attr_names);
1372        return !item.contains_key(&attr);
1373    }
1374
1375    if let Some(rest) = part
1376        .strip_prefix("begins_with(")
1377        .or_else(|| part.strip_prefix("begins_with ("))
1378    {
1379        return eval_begins_with(rest, item, expr_attr_names, expr_attr_values);
1380    }
1381
1382    if let Some(rest) = part
1383        .strip_prefix("contains(")
1384        .or_else(|| part.strip_prefix("contains ("))
1385    {
1386        return eval_contains(rest, item, expr_attr_names, expr_attr_values);
1387    }
1388
1389    if part.starts_with("size(") || part.starts_with("size (") {
1390        if let Some(result) =
1391            evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
1392        {
1393            return result;
1394        }
1395    }
1396
1397    if let Some(rest) = part
1398        .strip_prefix("attribute_type(")
1399        .or_else(|| part.strip_prefix("attribute_type ("))
1400    {
1401        return eval_attribute_type(rest, item, expr_attr_names, expr_attr_values);
1402    }
1403
1404    if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
1405        let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
1406        let actual = item.get(&attr_name);
1407        return evaluate_in_match(actual, &value_refs, expr_attr_values);
1408    }
1409
1410    evaluate_single_key_condition(part, item, expr_attr_names, expr_attr_values)
1411}
1412
1413/// `begins_with(path, :val)` — only S (string) operands. Returns false on
1414/// any parse failure or type mismatch (this is the same shape DynamoDB
1415/// returns: a malformed predicate is silently false rather than an error).
1416fn eval_begins_with(
1417    rest: &str,
1418    item: &HashMap<String, AttributeValue>,
1419    expr_attr_names: &HashMap<String, String>,
1420    expr_attr_values: &HashMap<String, Value>,
1421) -> bool {
1422    let Some(inner) = rest.strip_suffix(')') else {
1423        return false;
1424    };
1425    let mut split = inner.splitn(2, ',');
1426    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1427        return false;
1428    };
1429    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1430    let expected = expr_attr_values.get(val_ref.trim());
1431    let actual = item.get(&attr_name);
1432    match (actual, expected) {
1433        (Some(a), Some(e)) => {
1434            let a_str = a.get("S").and_then(|v| v.as_str());
1435            let e_str = e.get("S").and_then(|v| v.as_str());
1436            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
1437        }
1438        _ => false,
1439    }
1440}
1441
1442/// `contains(path, :val)` — substring check on S, set membership on
1443/// SS/NS/BS, and element membership on L. Other type pairings return
1444/// false.
1445fn eval_contains(
1446    rest: &str,
1447    item: &HashMap<String, AttributeValue>,
1448    expr_attr_names: &HashMap<String, String>,
1449    expr_attr_values: &HashMap<String, Value>,
1450) -> bool {
1451    let Some(inner) = rest.strip_suffix(')') else {
1452        return false;
1453    };
1454    let mut split = inner.splitn(2, ',');
1455    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1456        return false;
1457    };
1458    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1459    let expected = expr_attr_values.get(val_ref.trim());
1460    let actual = item.get(&attr_name);
1461    let (Some(a), Some(e)) = (actual, expected) else {
1462        return false;
1463    };
1464
1465    if let (Some(a_s), Some(e_s)) = (
1466        a.get("S").and_then(|v| v.as_str()),
1467        e.get("S").and_then(|v| v.as_str()),
1468    ) {
1469        return a_s.contains(e_s);
1470    }
1471    if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
1472        if let Some(val) = e.get("S") {
1473            return set.contains(val);
1474        }
1475    }
1476    if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
1477        if let Some(val) = e.get("N") {
1478            return set.contains(val);
1479        }
1480    }
1481    if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
1482        if let Some(val) = e.get("B") {
1483            return set.contains(val);
1484        }
1485    }
1486    if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
1487        return list.contains(e);
1488    }
1489    false
1490}
1491
1492/// `attribute_type(path, :type)` — checks whether the attribute at `path`
1493/// is stored under the wire type identified by `:type` (one of the
1494/// DynamoDB type letters S/N/B/BOOL/NULL/SS/NS/BS/L/M).
1495fn eval_attribute_type(
1496    rest: &str,
1497    item: &HashMap<String, AttributeValue>,
1498    expr_attr_names: &HashMap<String, String>,
1499    expr_attr_values: &HashMap<String, Value>,
1500) -> bool {
1501    let Some(inner) = rest.strip_suffix(')') else {
1502        return false;
1503    };
1504    let mut split = inner.splitn(2, ',');
1505    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1506        return false;
1507    };
1508    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1509    let expected_type = expr_attr_values
1510        .get(val_ref.trim())
1511        .and_then(|v| v.get("S"))
1512        .and_then(|v| v.as_str());
1513    let actual = item.get(&attr_name);
1514    let (Some(val), Some(t)) = (actual, expected_type) else {
1515        return false;
1516    };
1517    match t {
1518        "S" => val.get("S").is_some(),
1519        "N" => val.get("N").is_some(),
1520        "B" => val.get("B").is_some(),
1521        "BOOL" => val.get("BOOL").is_some(),
1522        "NULL" => val.get("NULL").is_some(),
1523        "SS" => val.get("SS").is_some(),
1524        "NS" => val.get("NS").is_some(),
1525        "BS" => val.get("BS").is_some(),
1526        "L" => val.get("L").is_some(),
1527        "M" => val.get("M").is_some(),
1528        _ => false,
1529    }
1530}
1531
1532/// Parse an `attr IN (:v1, :v2, ...)` expression. Mirrors the DynamoDB
1533/// ConditionExpression / FilterExpression grammar where IN takes a single
1534/// operand on the left and 1–100 comma-separated value refs inside parens
1535/// on the right. Case-insensitive; tolerates missing spaces after commas
1536/// (aws-sdk-go's `expression` builder emits ", " but hand-built expressions
1537/// often use `strings.Join(..., ",")`). Returns None for non-IN inputs so
1538/// callers can fall through to their other grammar branches.
1539fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
1540    let upper = expr.to_ascii_uppercase();
1541    let in_pos = upper.find(" IN ")?;
1542    let attr_ref = expr[..in_pos].trim();
1543    if attr_ref.is_empty() {
1544        return None;
1545    }
1546    let rest = expr[in_pos + 4..].trim_start();
1547    let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
1548    let values: Vec<&str> = inner
1549        .split(',')
1550        .map(|s| s.trim())
1551        .filter(|s| !s.is_empty())
1552        .collect();
1553    if values.is_empty() {
1554        return None;
1555    }
1556    Some((attr_ref, values))
1557}
1558
1559/// Return true iff `actual` equals any of the `value_refs` resolved through
1560/// `expr_attr_values`. A missing attribute never matches (mirrors AWS, which
1561/// evaluates `IN` against undefined attributes as false).
1562fn evaluate_in_match(
1563    actual: Option<&AttributeValue>,
1564    value_refs: &[&str],
1565    expr_attr_values: &HashMap<String, Value>,
1566) -> bool {
1567    value_refs.iter().any(|v_ref| {
1568        let expected = expr_attr_values.get(*v_ref);
1569        matches!((actual, expected), (Some(a), Some(e)) if a == e)
1570    })
1571}
1572
1573/// One of the four DynamoDB ``UpdateExpression`` action keywords.
1574#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1575enum UpdateAction {
1576    Set,
1577    Remove,
1578    Add,
1579    Delete,
1580}
1581
1582impl UpdateAction {
1583    /// All four keywords as written on the wire — these double as the search
1584    /// terms for ``parse_update_clauses``.
1585    const KEYWORDS: &'static [(&'static str, UpdateAction)] = &[
1586        ("SET", UpdateAction::Set),
1587        ("REMOVE", UpdateAction::Remove),
1588        ("ADD", UpdateAction::Add),
1589        ("DELETE", UpdateAction::Delete),
1590    ];
1591
1592    fn keyword(self) -> &'static str {
1593        match self {
1594            UpdateAction::Set => "SET",
1595            UpdateAction::Remove => "REMOVE",
1596            UpdateAction::Add => "ADD",
1597            UpdateAction::Delete => "DELETE",
1598        }
1599    }
1600}
1601
1602fn apply_update_expression(
1603    item: &mut HashMap<String, AttributeValue>,
1604    expr: &str,
1605    expr_attr_names: &HashMap<String, String>,
1606    expr_attr_values: &HashMap<String, Value>,
1607) -> Result<(), AwsServiceError> {
1608    let clauses = parse_update_clauses(expr);
1609    if clauses.is_empty() && !expr.trim().is_empty() {
1610        return Err(AwsServiceError::aws_error(
1611            StatusCode::BAD_REQUEST,
1612            "ValidationException",
1613            "Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
1614        ));
1615    }
1616    for (action, assignments) in &clauses {
1617        match action {
1618            UpdateAction::Set => {
1619                for assignment in assignments {
1620                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1621                }
1622            }
1623            UpdateAction::Remove => {
1624                for attr_ref in assignments {
1625                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1626                    item.remove(&attr);
1627                }
1628            }
1629            UpdateAction::Add => {
1630                for assignment in assignments {
1631                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1632                }
1633            }
1634            UpdateAction::Delete => {
1635                for assignment in assignments {
1636                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1637                }
1638            }
1639        }
1640    }
1641    Ok(())
1642}
1643
1644fn parse_update_clauses(expr: &str) -> Vec<(UpdateAction, Vec<String>)> {
1645    let mut clauses: Vec<(UpdateAction, Vec<String>)> = Vec::new();
1646    let upper = expr.to_ascii_uppercase();
1647    let mut positions: Vec<(usize, UpdateAction)> = Vec::new();
1648
1649    for &(kw, action) in UpdateAction::KEYWORDS {
1650        let mut search_from = 0;
1651        while let Some(pos) = upper[search_from..].find(kw) {
1652            let abs_pos = search_from + pos;
1653            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
1654            let after_pos = abs_pos + kw.len();
1655            let after_ok =
1656                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
1657            if before_ok && after_ok {
1658                positions.push((abs_pos, action));
1659            }
1660            search_from = abs_pos + kw.len();
1661        }
1662    }
1663
1664    positions.sort_by_key(|(pos, _)| *pos);
1665
1666    for (i, &(pos, action)) in positions.iter().enumerate() {
1667        let start = pos + action.keyword().len();
1668        let end = if i + 1 < positions.len() {
1669            positions[i + 1].0
1670        } else {
1671            expr.len()
1672        };
1673        let content = expr[start..end].trim();
1674        // Use a paren-aware split so that function-call arguments such as
1675        // `list_append(#a, :b)` are kept as a single assignment rather than
1676        // being torn apart at the inner comma.
1677        let assignments: Vec<String> = split_on_top_level_keyword(content, ",")
1678            .into_iter()
1679            .map(|s| s.trim().to_string())
1680            .collect();
1681        clauses.push((action, assignments));
1682    }
1683
1684    clauses
1685}
1686
1687fn apply_set_assignment(
1688    item: &mut HashMap<String, AttributeValue>,
1689    assignment: &str,
1690    expr_attr_names: &HashMap<String, String>,
1691    expr_attr_values: &HashMap<String, Value>,
1692) -> Result<(), AwsServiceError> {
1693    let Some((left, right)) = assignment.split_once('=') else {
1694        return Ok(());
1695    };
1696
1697    let left_trimmed = left.trim();
1698    let right = right.trim();
1699
1700    // Dotted paths like `#web.#tab_id` or `profile.email` target a nested
1701    // key inside an M-typed attribute. Only plain-value RHS is supported for
1702    // now (no `if_not_exists` / `list_append` / arithmetic into a nested
1703    // path — those shapes aren't common and can be added when needed).
1704    // An unresolvable RHS is a ValidationException, not a silent no-op —
1705    // silent drops are exactly the class of bug this file already fights.
1706    if is_dotted_path(left_trimmed) {
1707        let v = resolve_value(right, item, expr_attr_names, expr_attr_values)
1708            .ok_or_else(invalid_document_path)?;
1709        return assign_nested_path(item, left_trimmed, expr_attr_names, v);
1710    }
1711
1712    // Split off a trailing `[N]` list-index suffix so we can resolve the
1713    // attribute name ref on its own. Without this, `resolve_attr_name` sees
1714    // "#items[0]" as a whole and misses the `#items` → `items` mapping.
1715    let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
1716        Some((name, idx)) => (name, Some(idx)),
1717        None => (left_trimmed, None),
1718    };
1719    let attr = resolve_attr_name(attr_ref, expr_attr_names);
1720
1721    if let Some(rest) = right
1722        .strip_prefix("if_not_exists(")
1723        .or_else(|| right.strip_prefix("if_not_exists ("))
1724    {
1725        apply_set_if_not_exists(item, &attr, rest, expr_attr_names, expr_attr_values);
1726        return Ok(());
1727    }
1728
1729    if let Some(rest) = right
1730        .strip_prefix("list_append(")
1731        .or_else(|| right.strip_prefix("list_append ("))
1732    {
1733        apply_set_list_append(item, &attr, rest, expr_attr_names, expr_attr_values);
1734        return Ok(());
1735    }
1736
1737    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
1738        return apply_set_arithmetic(
1739            item,
1740            &attr,
1741            arith_left,
1742            arith_right,
1743            is_add,
1744            expr_attr_names,
1745            expr_attr_values,
1746        );
1747    }
1748
1749    let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
1750    if let Some(v) = val {
1751        match list_index {
1752            Some(idx) => assign_list_index(item, &attr, idx, v)?,
1753            None => {
1754                item.insert(attr, v);
1755            }
1756        }
1757    }
1758
1759    Ok(())
1760}
1761
1762/// SET ... = if_not_exists(other_attr, :val) — write `:val` into `attr`
1763/// only when `other_attr` is missing from the item. The lookup uses
1764/// `other_attr`, not the SET target, which is what makes it useful as a
1765/// 'create-or-keep' primitive.
1766fn apply_set_if_not_exists(
1767    item: &mut HashMap<String, AttributeValue>,
1768    attr: &str,
1769    rest: &str,
1770    expr_attr_names: &HashMap<String, String>,
1771    expr_attr_values: &HashMap<String, Value>,
1772) {
1773    let Some(inner) = rest.strip_suffix(')') else {
1774        return;
1775    };
1776    let mut split = inner.splitn(2, ',');
1777    let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) else {
1778        return;
1779    };
1780    let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
1781    if item.contains_key(&check_name) {
1782        return;
1783    }
1784    if let Some(val) = expr_attr_values.get(default_ref.trim()) {
1785        item.insert(attr.to_string(), val.clone());
1786    }
1787}
1788
1789/// SET ... = list_append(a, b) — concatenate the L arrays of two list
1790/// operands. Either operand may be missing or non-list, in which case
1791/// it contributes nothing.
1792fn apply_set_list_append(
1793    item: &mut HashMap<String, AttributeValue>,
1794    attr: &str,
1795    rest: &str,
1796    expr_attr_names: &HashMap<String, String>,
1797    expr_attr_values: &HashMap<String, Value>,
1798) {
1799    let Some(inner) = rest.strip_suffix(')') else {
1800        return;
1801    };
1802    let mut split = inner.splitn(2, ',');
1803    let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) else {
1804        return;
1805    };
1806    let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
1807    let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
1808
1809    let mut merged = Vec::new();
1810    if let Some(Value::Object(obj)) = &a_val {
1811        if let Some(Value::Array(arr)) = obj.get("L") {
1812            merged.extend(arr.clone());
1813        }
1814    }
1815    if let Some(Value::Object(obj)) = &b_val {
1816        if let Some(Value::Array(arr)) = obj.get("L") {
1817            merged.extend(arr.clone());
1818        }
1819    }
1820
1821    item.insert(attr.to_string(), json!({"L": merged}));
1822}
1823
1824/// SET ... = `<arith_left> +/- <arith_right>` — both operands must
1825/// resolve to N values (or the LHS may be missing, in which case it's
1826/// treated as 0). Anything else is rejected with the same
1827/// `ValidationException` AWS returns.
1828fn apply_set_arithmetic(
1829    item: &mut HashMap<String, AttributeValue>,
1830    attr: &str,
1831    arith_left: &str,
1832    arith_right: &str,
1833    is_add: bool,
1834    expr_attr_names: &HashMap<String, String>,
1835    expr_attr_values: &HashMap<String, Value>,
1836) -> Result<(), AwsServiceError> {
1837    let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
1838    let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
1839
1840    let left_num = match extract_number(&left_val) {
1841        Some(n) => n,
1842        None if left_val.is_some() => {
1843            return Err(AwsServiceError::aws_error(
1844                StatusCode::BAD_REQUEST,
1845                "ValidationException",
1846                "An operand in the update expression has an incorrect data type",
1847            ));
1848        }
1849        None => 0.0,
1850    };
1851    let right_num = extract_number(&right_val).ok_or_else(|| {
1852        AwsServiceError::aws_error(
1853            StatusCode::BAD_REQUEST,
1854            "ValidationException",
1855            "An operand in the update expression has an incorrect data type",
1856        )
1857    })?;
1858
1859    let result = if is_add {
1860        left_num + right_num
1861    } else {
1862        left_num - right_num
1863    };
1864
1865    let num_str = if result == result.trunc() {
1866        format!("{}", result as i64)
1867    } else {
1868        format!("{result}")
1869    };
1870
1871    item.insert(attr.to_string(), json!({"N": num_str}));
1872    Ok(())
1873}
1874
1875/// Parse a trailing `[N]` list-index suffix off the LHS of a SET assignment.
1876/// Returns the bare attribute reference and the index, or None when the LHS
1877/// is a plain attribute (or a path shape we don't yet support).
1878fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
1879    let path = path.trim();
1880    if !path.ends_with(']') {
1881        return None;
1882    }
1883    let open = path.rfind('[')?;
1884    // Require no further `.` / `[` / `]` inside the bracketed portion and no
1885    // further path segments after — we only handle the single-index case
1886    // `name[N]`, not nested shapes like `a.b[0].c`.
1887    let idx_str = &path[open + 1..path.len() - 1];
1888    let idx: usize = idx_str.parse().ok()?;
1889    let name = &path[..open];
1890    if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
1891        return None;
1892    }
1893    Some((name, idx))
1894}
1895
1896/// Assign a value to a specific index of a `L`-typed attribute. If `idx` is
1897/// within the current list, replaces that slot; if it's at the end, appends.
1898/// AWS rejects writes beyond `len`, so we return a `ValidationException` for
1899/// out-of-range indices and non-list attributes.
1900fn assign_list_index(
1901    item: &mut HashMap<String, AttributeValue>,
1902    attr: &str,
1903    idx: usize,
1904    value: Value,
1905) -> Result<(), AwsServiceError> {
1906    let Some(existing) = item.get_mut(attr) else {
1907        return Err(invalid_document_path());
1908    };
1909    let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
1910        return Err(invalid_document_path());
1911    };
1912    if idx < list.len() {
1913        list[idx] = value;
1914    } else if idx == list.len() {
1915        list.push(value);
1916    } else {
1917        return Err(invalid_document_path());
1918    }
1919    Ok(())
1920}
1921
1922fn invalid_document_path() -> AwsServiceError {
1923    AwsServiceError::aws_error(
1924        StatusCode::BAD_REQUEST,
1925        "ValidationException",
1926        "The document path provided in the update expression is invalid for update",
1927    )
1928}
1929
1930fn resolve_value(
1931    reference: &str,
1932    item: &HashMap<String, AttributeValue>,
1933    expr_attr_names: &HashMap<String, String>,
1934    expr_attr_values: &HashMap<String, Value>,
1935) -> Option<Value> {
1936    let reference = reference.trim();
1937    if reference.starts_with(':') {
1938        expr_attr_values.get(reference).cloned()
1939    } else {
1940        let attr_name = resolve_attr_name(reference, expr_attr_names);
1941        item.get(&attr_name).cloned()
1942    }
1943}
1944
1945/// True if `path` targets a nested key inside an M-typed attribute. Bracketed
1946/// list indices (`a[0]`, `a.b[0]`) are not supported by the nested-SET writer.
1947fn is_dotted_path(path: &str) -> bool {
1948    path.contains('.') && !path.contains('[')
1949}
1950
1951/// Write `value` at a dotted path inside an M-typed attribute.
1952///
1953/// Resolves each `#name` segment through `expr_attr_names`. The top-level
1954/// attribute and every intermediate segment must already exist as a Map —
1955/// DynamoDB rejects writes through missing parents with ValidationException.
1956fn assign_nested_path(
1957    item: &mut HashMap<String, AttributeValue>,
1958    path: &str,
1959    expr_attr_names: &HashMap<String, String>,
1960    value: Value,
1961) -> Result<(), AwsServiceError> {
1962    let mut segments: Vec<String> = path
1963        .split('.')
1964        .map(|seg| resolve_attr_name(seg.trim(), expr_attr_names))
1965        .collect();
1966    if segments.len() < 2 {
1967        return Err(invalid_document_path());
1968    }
1969
1970    let leaf = segments.pop().expect("len >= 2");
1971    let top = segments.remove(0);
1972
1973    let top_attr = item.get_mut(&top).ok_or_else(invalid_document_path)?;
1974    let mut current = top_attr
1975        .get_mut("M")
1976        .and_then(|m| m.as_object_mut())
1977        .ok_or_else(invalid_document_path)?;
1978
1979    for seg in &segments {
1980        current = current
1981            .get_mut(seg)
1982            .and_then(|v| v.get_mut("M"))
1983            .and_then(|m| m.as_object_mut())
1984            .ok_or_else(invalid_document_path)?;
1985    }
1986
1987    current.insert(leaf, value);
1988    Ok(())
1989}
1990
1991fn extract_number(val: &Option<Value>) -> Option<f64> {
1992    val.as_ref()
1993        .and_then(|v| v.get("N"))
1994        .and_then(|n| n.as_str())
1995        .and_then(|s| s.parse().ok())
1996}
1997
1998fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
1999    let mut depth = 0;
2000    for (i, c) in expr.char_indices() {
2001        match c {
2002            '(' => depth += 1,
2003            ')' => depth -= 1,
2004            '+' if depth == 0 && i > 0 => {
2005                return Some((&expr[..i], &expr[i + 1..], true));
2006            }
2007            '-' if depth == 0 && i > 0 => {
2008                return Some((&expr[..i], &expr[i + 1..], false));
2009            }
2010            _ => {}
2011        }
2012    }
2013    None
2014}
2015
2016fn apply_add_assignment(
2017    item: &mut HashMap<String, AttributeValue>,
2018    assignment: &str,
2019    expr_attr_names: &HashMap<String, String>,
2020    expr_attr_values: &HashMap<String, Value>,
2021) -> Result<(), AwsServiceError> {
2022    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
2023    if parts.len() != 2 {
2024        return Ok(());
2025    }
2026
2027    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
2028    let val_ref = parts[1].trim();
2029    let add_val = expr_attr_values.get(val_ref);
2030
2031    if let Some(add_val) = add_val {
2032        if let Some(existing) = item.get(&attr) {
2033            if let (Some(existing_num), Some(add_num)) = (
2034                extract_number(&Some(existing.clone())),
2035                extract_number(&Some(add_val.clone())),
2036            ) {
2037                let result = existing_num + add_num;
2038                let num_str = if result == result.trunc() {
2039                    format!("{}", result as i64)
2040                } else {
2041                    format!("{result}")
2042                };
2043                item.insert(attr, json!({"N": num_str}));
2044            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
2045                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
2046                    let mut merged: Vec<Value> = existing_set.clone();
2047                    for v in add_set {
2048                        if !merged.contains(v) {
2049                            merged.push(v.clone());
2050                        }
2051                    }
2052                    item.insert(attr, json!({"SS": merged}));
2053                }
2054            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
2055                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
2056                    let mut merged: Vec<Value> = existing_set.clone();
2057                    for v in add_set {
2058                        if !merged.contains(v) {
2059                            merged.push(v.clone());
2060                        }
2061                    }
2062                    item.insert(attr, json!({"NS": merged}));
2063                }
2064            } else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
2065                if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
2066                    let mut merged: Vec<Value> = existing_set.clone();
2067                    for v in add_set {
2068                        if !merged.contains(v) {
2069                            merged.push(v.clone());
2070                        }
2071                    }
2072                    item.insert(attr, json!({"BS": merged}));
2073                }
2074            }
2075        } else {
2076            item.insert(attr, add_val.clone());
2077        }
2078    }
2079
2080    Ok(())
2081}
2082
2083fn apply_delete_assignment(
2084    item: &mut HashMap<String, AttributeValue>,
2085    assignment: &str,
2086    expr_attr_names: &HashMap<String, String>,
2087    expr_attr_values: &HashMap<String, Value>,
2088) -> Result<(), AwsServiceError> {
2089    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
2090    if parts.len() != 2 {
2091        return Ok(());
2092    }
2093
2094    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
2095    let val_ref = parts[1].trim();
2096    let del_val = expr_attr_values.get(val_ref);
2097
2098    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
2099        if let (Some(existing_set), Some(del_set)) = (
2100            existing.get("SS").and_then(|v| v.as_array()),
2101            del_val.get("SS").and_then(|v| v.as_array()),
2102        ) {
2103            let filtered: Vec<Value> = existing_set
2104                .iter()
2105                .filter(|v| !del_set.contains(v))
2106                .cloned()
2107                .collect();
2108            if filtered.is_empty() {
2109                item.remove(&attr);
2110            } else {
2111                item.insert(attr, json!({"SS": filtered}));
2112            }
2113        } else if let (Some(existing_set), Some(del_set)) = (
2114            existing.get("NS").and_then(|v| v.as_array()),
2115            del_val.get("NS").and_then(|v| v.as_array()),
2116        ) {
2117            let filtered: Vec<Value> = existing_set
2118                .iter()
2119                .filter(|v| !del_set.contains(v))
2120                .cloned()
2121                .collect();
2122            if filtered.is_empty() {
2123                item.remove(&attr);
2124            } else {
2125                item.insert(attr, json!({"NS": filtered}));
2126            }
2127        } else if let (Some(existing_set), Some(del_set)) = (
2128            existing.get("BS").and_then(|v| v.as_array()),
2129            del_val.get("BS").and_then(|v| v.as_array()),
2130        ) {
2131            let filtered: Vec<Value> = existing_set
2132                .iter()
2133                .filter(|v| !del_set.contains(v))
2134                .cloned()
2135                .collect();
2136            if filtered.is_empty() {
2137                item.remove(&attr);
2138            } else {
2139                item.insert(attr, json!({"BS": filtered}));
2140            }
2141        }
2142    }
2143
2144    Ok(())
2145}
2146
2147pub(super) struct TableDescriptionInput<'a> {
2148    pub arn: &'a str,
2149    pub table_id: &'a str,
2150    pub key_schema: &'a [KeySchemaElement],
2151    pub attribute_definitions: &'a [AttributeDefinition],
2152    pub provisioned_throughput: &'a ProvisionedThroughput,
2153    pub gsi: &'a [GlobalSecondaryIndex],
2154    pub lsi: &'a [LocalSecondaryIndex],
2155    pub billing_mode: &'a str,
2156    pub created_at: chrono::DateTime<chrono::Utc>,
2157    pub item_count: i64,
2158    pub size_bytes: i64,
2159    pub status: &'a str,
2160    pub deletion_protection_enabled: bool,
2161    pub on_demand_throughput: Option<&'a crate::state::OnDemandThroughput>,
2162}
2163
2164fn build_table_description_json(input: &TableDescriptionInput<'_>) -> Value {
2165    let TableDescriptionInput {
2166        arn,
2167        table_id,
2168        key_schema,
2169        attribute_definitions,
2170        provisioned_throughput,
2171        gsi,
2172        lsi,
2173        billing_mode,
2174        created_at,
2175        item_count,
2176        size_bytes,
2177        status,
2178        deletion_protection_enabled,
2179        on_demand_throughput,
2180    } = *input;
2181    let table_name = arn.rsplit('/').next().unwrap_or("");
2182    let creation_timestamp =
2183        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
2184
2185    let ks: Vec<Value> = key_schema
2186        .iter()
2187        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2188        .collect();
2189
2190    let ad: Vec<Value> = attribute_definitions
2191        .iter()
2192        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
2193        .collect();
2194
2195    let mut desc = json!({
2196        "TableName": table_name,
2197        "TableArn": arn,
2198        "TableId": table_id,
2199        "TableStatus": status,
2200        "KeySchema": ks,
2201        "AttributeDefinitions": ad,
2202        "CreationDateTime": creation_timestamp,
2203        "ItemCount": item_count,
2204        "TableSizeBytes": size_bytes,
2205        "BillingModeSummary": { "BillingMode": billing_mode },
2206        "DeletionProtectionEnabled": deletion_protection_enabled,
2207    });
2208
2209    if billing_mode != "PAY_PER_REQUEST" {
2210        desc["ProvisionedThroughput"] = json!({
2211            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
2212            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
2213            "NumberOfDecreasesToday": 0,
2214        });
2215    } else {
2216        desc["ProvisionedThroughput"] = json!({
2217            "ReadCapacityUnits": 0,
2218            "WriteCapacityUnits": 0,
2219            "NumberOfDecreasesToday": 0,
2220        });
2221    }
2222
2223    if let Some(odt) = on_demand_throughput {
2224        desc["OnDemandThroughput"] = json!({
2225            "MaxReadRequestUnits": odt.max_read_request_units,
2226            "MaxWriteRequestUnits": odt.max_write_request_units,
2227        });
2228    }
2229
2230    // Terraform's AWS provider now waits on WarmThroughput after CreateTable.
2231    // Real AWS returns an ACTIVE warm throughput object for active tables,
2232    // including PAY_PER_REQUEST tables. Returning null keeps the provider in a
2233    // perpetual "still creating" loop.
2234    if status == "ACTIVE" {
2235        desc["WarmThroughput"] = json!({
2236            "ReadUnitsPerSecond": 0,
2237            "WriteUnitsPerSecond": 0,
2238            "Status": "ACTIVE",
2239        });
2240    }
2241
2242    if !gsi.is_empty() {
2243        let gsi_json: Vec<Value> = gsi
2244            .iter()
2245            .map(|g| {
2246                let gks: Vec<Value> = g
2247                    .key_schema
2248                    .iter()
2249                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2250                    .collect();
2251                let mut idx = json!({
2252                    "IndexName": g.index_name,
2253                    "KeySchema": gks,
2254                    "Projection": { "ProjectionType": g.projection.projection_type },
2255                    "IndexStatus": "ACTIVE",
2256                    "IndexArn": format!("{arn}/index/{}", g.index_name),
2257                    "ItemCount": 0,
2258                    "IndexSizeBytes": 0,
2259                });
2260                if !g.projection.non_key_attributes.is_empty() {
2261                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
2262                }
2263                if let Some(ref pt) = g.provisioned_throughput {
2264                    idx["ProvisionedThroughput"] = json!({
2265                        "ReadCapacityUnits": pt.read_capacity_units,
2266                        "WriteCapacityUnits": pt.write_capacity_units,
2267                        "NumberOfDecreasesToday": 0,
2268                    });
2269                }
2270                if let Some(ref odt) = g.on_demand_throughput {
2271                    idx["OnDemandThroughput"] = json!({
2272                        "MaxReadRequestUnits": odt.max_read_request_units,
2273                        "MaxWriteRequestUnits": odt.max_write_request_units,
2274                    });
2275                }
2276                idx
2277            })
2278            .collect();
2279        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
2280    }
2281
2282    if !lsi.is_empty() {
2283        let lsi_json: Vec<Value> = lsi
2284            .iter()
2285            .map(|l| {
2286                let lks: Vec<Value> = l
2287                    .key_schema
2288                    .iter()
2289                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2290                    .collect();
2291                let mut idx = json!({
2292                    "IndexName": l.index_name,
2293                    "KeySchema": lks,
2294                    "Projection": { "ProjectionType": l.projection.projection_type },
2295                    "IndexArn": format!("{arn}/index/{}", l.index_name),
2296                    "ItemCount": 0,
2297                    "IndexSizeBytes": 0,
2298                });
2299                if !l.projection.non_key_attributes.is_empty() {
2300                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
2301                }
2302                idx
2303            })
2304            .collect();
2305        desc["LocalSecondaryIndexes"] = json!(lsi_json);
2306    }
2307
2308    desc
2309}
2310
2311fn build_table_description(table: &DynamoTable) -> Value {
2312    let mut desc = build_table_description_json(&TableDescriptionInput {
2313        arn: &table.arn,
2314        table_id: &table.table_id,
2315        key_schema: &table.key_schema,
2316        attribute_definitions: &table.attribute_definitions,
2317        provisioned_throughput: &table.provisioned_throughput,
2318        gsi: &table.gsi,
2319        lsi: &table.lsi,
2320        billing_mode: &table.billing_mode,
2321        created_at: table.created_at,
2322        item_count: table.item_count,
2323        size_bytes: table.size_bytes,
2324        status: &table.status,
2325        deletion_protection_enabled: table.deletion_protection_enabled,
2326        on_demand_throughput: table.on_demand_throughput.as_ref(),
2327    });
2328
2329    // `LatestStreamArn` / `LatestStreamLabel` persist after a stream has
2330    // been created, even if streams are currently disabled — real AWS
2331    // keeps them for ~24h post-disable so DescribeTable callers can still
2332    // observe the last active stream. fakecloud keeps them for the
2333    // table's lifetime, which is sufficient for any single test run.
2334    if let Some(ref stream_arn) = table.stream_arn {
2335        desc["LatestStreamArn"] = json!(stream_arn);
2336        desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
2337    }
2338    // The `StreamSpecification` block is only present while streams are
2339    // actively enabled. When absent, the Terraform provider Read falls
2340    // through to the prior `stream_view_type` from its own state rather
2341    // than clearing it, which matches the diff behaviour the upstream
2342    // acceptance tests assert on.
2343    if table.stream_enabled {
2344        if let Some(ref view_type) = table.stream_view_type {
2345            desc["StreamSpecification"] = json!({
2346                "StreamEnabled": true,
2347                "StreamViewType": view_type,
2348            });
2349        }
2350    }
2351
2352    // SSEDescription is only returned when the customer explicitly enabled
2353    // a KMS-backed SSE. Real AWS tables using the default AWS-owned key omit
2354    // this field entirely, and the Terraform provider's Read asserts
2355    // `server_side_encryption.#` == 0 in that case.
2356    if let Some(ref sse_type) = table.sse_type {
2357        let mut sse_desc = json!({
2358            "Status": "ENABLED",
2359            "SSEType": sse_type,
2360        });
2361        if let Some(ref key_arn) = table.sse_kms_key_arn {
2362            sse_desc["KMSMasterKeyArn"] = json!(key_arn);
2363        }
2364        desc["SSEDescription"] = sse_desc;
2365    }
2366
2367    desc
2368}
2369
2370fn execute_partiql_statement(
2371    state: &SharedDynamoDbState,
2372    statement: &str,
2373    parameters: &[Value],
2374) -> Result<AwsResponse, AwsServiceError> {
2375    let trimmed = statement.trim();
2376    let upper = trimmed.to_ascii_uppercase();
2377
2378    if upper.starts_with("SELECT") {
2379        execute_partiql_select(state, trimmed, parameters)
2380    } else if upper.starts_with("INSERT") {
2381        execute_partiql_insert(state, trimmed, parameters)
2382    } else if upper.starts_with("UPDATE") {
2383        execute_partiql_update(state, trimmed, parameters)
2384    } else if upper.starts_with("DELETE") {
2385        execute_partiql_delete(state, trimmed, parameters)
2386    } else {
2387        Err(AwsServiceError::aws_error(
2388            StatusCode::BAD_REQUEST,
2389            "ValidationException",
2390            format!("Unsupported PartiQL statement: {trimmed}"),
2391        ))
2392    }
2393}
2394
2395/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
2396fn execute_partiql_select(
2397    state: &SharedDynamoDbState,
2398    statement: &str,
2399    parameters: &[Value],
2400) -> Result<AwsResponse, AwsServiceError> {
2401    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
2402    let upper = statement.to_ascii_uppercase();
2403    let from_pos = upper.find("FROM").ok_or_else(|| {
2404        AwsServiceError::aws_error(
2405            StatusCode::BAD_REQUEST,
2406            "ValidationException",
2407            "Invalid SELECT statement: missing FROM",
2408        )
2409    })?;
2410
2411    let after_from = statement[from_pos + 4..].trim();
2412    let (table_name, rest) = parse_partiql_table_name(after_from);
2413
2414    let __mas = state.read();
2415    let state = __mas.default_ref();
2416    let table = get_table(&state.tables, &table_name)?;
2417
2418    let rest_upper = rest.trim().to_ascii_uppercase();
2419    if rest_upper.starts_with("WHERE") {
2420        let where_clause = rest.trim()[5..].trim();
2421        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
2422        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
2423        DynamoDbService::ok_json(json!({ "Items": items }))
2424    } else {
2425        // No WHERE, return all items
2426        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
2427        DynamoDbService::ok_json(json!({ "Items": items }))
2428    }
2429}
2430
2431fn execute_partiql_insert(
2432    state: &SharedDynamoDbState,
2433    statement: &str,
2434    parameters: &[Value],
2435) -> Result<AwsResponse, AwsServiceError> {
2436    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
2437    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
2438    let upper = statement.to_ascii_uppercase();
2439    let into_pos = upper.find("INTO").ok_or_else(|| {
2440        AwsServiceError::aws_error(
2441            StatusCode::BAD_REQUEST,
2442            "ValidationException",
2443            "Invalid INSERT statement: missing INTO",
2444        )
2445    })?;
2446
2447    let after_into = statement[into_pos + 4..].trim();
2448    let (table_name, rest) = parse_partiql_table_name(after_into);
2449
2450    let rest_upper = rest.trim().to_ascii_uppercase();
2451    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
2452        AwsServiceError::aws_error(
2453            StatusCode::BAD_REQUEST,
2454            "ValidationException",
2455            "Invalid INSERT statement: missing VALUE",
2456        )
2457    })?;
2458
2459    let value_str = rest.trim()[value_pos + 5..].trim();
2460    let item = parse_partiql_value_object(value_str, parameters)?;
2461
2462    let mut __mas = state.write();
2463    let state = __mas.default_mut();
2464    let table = get_table_mut(&mut state.tables, &table_name)?;
2465    let key = extract_key(table, &item);
2466    if table.find_item_index(&key).is_some() {
2467        // DynamoDB PartiQL INSERT fails if item exists
2468        return Err(AwsServiceError::aws_error(
2469            StatusCode::BAD_REQUEST,
2470            "DuplicateItemException",
2471            "Duplicate primary key exists in table",
2472        ));
2473    } else {
2474        table.items.push(item);
2475    }
2476    table.recalculate_stats();
2477
2478    DynamoDbService::ok_json(json!({}))
2479}
2480
2481fn execute_partiql_update(
2482    state: &SharedDynamoDbState,
2483    statement: &str,
2484    parameters: &[Value],
2485) -> Result<AwsResponse, AwsServiceError> {
2486    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
2487    // or: UPDATE "tablename" SET attr=? WHERE pk=?
2488    let after_update = statement[6..].trim(); // skip "UPDATE"
2489    let (table_name, rest) = parse_partiql_table_name(after_update);
2490
2491    let rest_upper = rest.trim().to_ascii_uppercase();
2492    let set_pos = rest_upper.find("SET").ok_or_else(|| {
2493        AwsServiceError::aws_error(
2494            StatusCode::BAD_REQUEST,
2495            "ValidationException",
2496            "Invalid UPDATE statement: missing SET",
2497        )
2498    })?;
2499
2500    let after_set = rest.trim()[set_pos + 3..].trim();
2501
2502    // Split on WHERE
2503    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
2504    let (set_clause, where_clause) = if let Some(wp) = where_pos {
2505        (&after_set[..wp], after_set[wp + 5..].trim())
2506    } else {
2507        (after_set, "")
2508    };
2509
2510    let mut __mas = state.write();
2511    let state = __mas.default_mut();
2512    let table = get_table_mut(&mut state.tables, &table_name)?;
2513
2514    let matched_indices = if !where_clause.is_empty() {
2515        find_partiql_where_indices(table, where_clause, parameters)?
2516    } else {
2517        (0..table.items.len()).collect()
2518    };
2519
2520    // Parse SET assignments: attr=value, attr2=value2
2521    let param_offset = count_params_in_str(where_clause);
2522    let assignments: Vec<&str> = set_clause.split(',').collect();
2523    for idx in &matched_indices {
2524        let mut local_offset = param_offset;
2525        for assignment in &assignments {
2526            let assignment = assignment.trim();
2527            if let Some((attr, val_str)) = assignment.split_once('=') {
2528                let attr = attr.trim().trim_matches('"');
2529                let val_str = val_str.trim();
2530                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
2531                if let Some(v) = value {
2532                    table.items[*idx].insert(attr.to_string(), v);
2533                }
2534            }
2535        }
2536    }
2537    table.recalculate_stats();
2538
2539    DynamoDbService::ok_json(json!({}))
2540}
2541
2542fn execute_partiql_delete(
2543    state: &SharedDynamoDbState,
2544    statement: &str,
2545    parameters: &[Value],
2546) -> Result<AwsResponse, AwsServiceError> {
2547    // Pattern: DELETE FROM "tablename" WHERE pk='val'
2548    let upper = statement.to_ascii_uppercase();
2549    let from_pos = upper.find("FROM").ok_or_else(|| {
2550        AwsServiceError::aws_error(
2551            StatusCode::BAD_REQUEST,
2552            "ValidationException",
2553            "Invalid DELETE statement: missing FROM",
2554        )
2555    })?;
2556
2557    let after_from = statement[from_pos + 4..].trim();
2558    let (table_name, rest) = parse_partiql_table_name(after_from);
2559
2560    let rest_upper = rest.trim().to_ascii_uppercase();
2561    if !rest_upper.starts_with("WHERE") {
2562        return Err(AwsServiceError::aws_error(
2563            StatusCode::BAD_REQUEST,
2564            "ValidationException",
2565            "DELETE requires a WHERE clause",
2566        ));
2567    }
2568    let where_clause = rest.trim()[5..].trim();
2569
2570    let mut __mas = state.write();
2571    let state = __mas.default_mut();
2572    let table = get_table_mut(&mut state.tables, &table_name)?;
2573
2574    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
2575    // Remove from highest index first to avoid invalidating lower indices
2576    indices.sort_unstable();
2577    indices.reverse();
2578    for idx in indices {
2579        table.items.remove(idx);
2580    }
2581    table.recalculate_stats();
2582
2583    DynamoDbService::ok_json(json!({}))
2584}
2585
2586/// Parse a table name that may be quoted with double quotes.
2587/// Returns (table_name, rest_of_string).
2588fn parse_partiql_table_name(s: &str) -> (String, &str) {
2589    let s = s.trim();
2590    if let Some(stripped) = s.strip_prefix('"') {
2591        // Quoted name
2592        if let Some(end) = stripped.find('"') {
2593            let name = &stripped[..end];
2594            let rest = &stripped[end + 1..];
2595            (name.to_string(), rest)
2596        } else {
2597            let end = s.find(' ').unwrap_or(s.len());
2598            (s[..end].trim_matches('"').to_string(), &s[end..])
2599        }
2600    } else {
2601        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
2602        (s[..end].to_string(), &s[end..])
2603    }
2604}
2605
2606/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
2607/// Returns matching items.
2608fn evaluate_partiql_where<'a>(
2609    table: &'a DynamoTable,
2610    where_clause: &str,
2611    parameters: &[Value],
2612) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
2613    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
2614    Ok(indices.iter().map(|i| &table.items[*i]).collect())
2615}
2616
2617fn find_partiql_where_indices(
2618    table: &DynamoTable,
2619    where_clause: &str,
2620    parameters: &[Value],
2621) -> Result<Vec<usize>, AwsServiceError> {
2622    let conditions = split_partiql_and_clauses(where_clause);
2623    let parsed_conditions = parse_partiql_equality_conditions(&conditions, parameters);
2624
2625    let mut indices = Vec::new();
2626    for (i, item) in table.items.iter().enumerate() {
2627        let all_match = parsed_conditions
2628            .iter()
2629            .all(|(attr, expected)| item.get(attr) == Some(expected));
2630        if all_match {
2631            indices.push(i);
2632        }
2633    }
2634
2635    Ok(indices)
2636}
2637
2638/// Split a PartiQL WHERE clause on case-insensitive ` AND ` boundaries.
2639fn split_partiql_and_clauses(where_clause: &str) -> Vec<&str> {
2640    let upper = where_clause.to_uppercase();
2641    if !upper.contains(" AND ") {
2642        return vec![where_clause.trim()];
2643    }
2644    let mut parts = Vec::new();
2645    let mut last = 0;
2646    for (i, _) in upper.match_indices(" AND ") {
2647        parts.push(where_clause[last..i].trim());
2648        last = i + 5;
2649    }
2650    parts.push(where_clause[last..].trim());
2651    parts
2652}
2653
2654/// Parse each `col = literal` (or `col = ?`) condition into an
2655/// `(attribute_name, expected_AttributeValue)` pair. Conditions that
2656/// don't parse as equality, or whose RHS literal can't be resolved, are
2657/// silently dropped — that mirrors the prior inline behavior.
2658fn parse_partiql_equality_conditions(
2659    conditions: &[&str],
2660    parameters: &[Value],
2661) -> Vec<(String, Value)> {
2662    let mut param_idx = 0usize;
2663    let mut parsed = Vec::new();
2664    for cond in conditions {
2665        let cond = cond.trim();
2666        if let Some((left, right)) = cond.split_once('=') {
2667            let attr = left.trim().trim_matches('"').to_string();
2668            let val_str = right.trim();
2669            if let Some(value) = parse_partiql_literal(val_str, parameters, &mut param_idx) {
2670                parsed.push((attr, value));
2671            }
2672        }
2673    }
2674    parsed
2675}
2676
2677/// Parse a PartiQL literal value. Supports:
2678/// - 'string' -> {"S": "string"}
2679/// - 123 -> {"N": "123"}
2680/// - ? -> parameter from list
2681fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
2682    let s = s.trim();
2683    if s == "?" {
2684        let idx = *param_idx;
2685        *param_idx += 1;
2686        parameters.get(idx).cloned()
2687    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
2688        let inner = &s[1..s.len() - 1];
2689        Some(json!({"S": inner}))
2690    } else if let Ok(n) = s.parse::<f64>() {
2691        let num_str = if n == n.trunc() {
2692            format!("{}", n as i64)
2693        } else {
2694            format!("{n}")
2695        };
2696        Some(json!({"N": num_str}))
2697    } else {
2698        None
2699    }
2700}
2701
2702/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
2703fn parse_partiql_value_object(
2704    s: &str,
2705    parameters: &[Value],
2706) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
2707    let s = s.trim();
2708    let inner = s
2709        .strip_prefix('{')
2710        .and_then(|s| s.strip_suffix('}'))
2711        .ok_or_else(|| {
2712            AwsServiceError::aws_error(
2713                StatusCode::BAD_REQUEST,
2714                "ValidationException",
2715                "Invalid VALUE: expected object literal",
2716            )
2717        })?;
2718
2719    let mut item = HashMap::new();
2720    let mut param_idx = 0usize;
2721
2722    // Simple comma-separated key:value parsing
2723    for pair in split_partiql_pairs(inner) {
2724        let pair = pair.trim();
2725        if pair.is_empty() {
2726            continue;
2727        }
2728        if let Some((key_part, val_part)) = pair.split_once(':') {
2729            let key = key_part
2730                .trim()
2731                .trim_matches('\'')
2732                .trim_matches('"')
2733                .to_string();
2734            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
2735                item.insert(key, val);
2736            }
2737        }
2738    }
2739
2740    Ok(item)
2741}
2742
2743/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
2744fn split_partiql_pairs(s: &str) -> Vec<&str> {
2745    let mut parts = Vec::new();
2746    let mut start = 0;
2747    let mut depth = 0;
2748    let mut in_quote = false;
2749
2750    for (i, c) in s.char_indices() {
2751        match c {
2752            '\'' if !in_quote => in_quote = true,
2753            '\'' if in_quote => in_quote = false,
2754            '{' if !in_quote => depth += 1,
2755            '}' if !in_quote => depth -= 1,
2756            ',' if !in_quote && depth == 0 => {
2757                parts.push(&s[start..i]);
2758                start = i + 1;
2759            }
2760            _ => {}
2761        }
2762    }
2763    parts.push(&s[start..]);
2764    parts
2765}
2766
2767/// Count ? parameters in a string.
2768fn count_params_in_str(s: &str) -> usize {
2769    s.chars().filter(|c| *c == '?').count()
2770}
2771
2772#[cfg(test)]
2773mod tests {
2774    use super::*;
2775    use serde_json::json;
2776
2777    #[test]
2778    fn test_parse_update_clauses_set() {
2779        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
2780        assert_eq!(clauses.len(), 1);
2781        assert_eq!(clauses[0].0, UpdateAction::Set);
2782        assert_eq!(clauses[0].1.len(), 2);
2783    }
2784
2785    #[test]
2786    fn test_parse_update_clauses_set_and_remove() {
2787        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
2788        assert_eq!(clauses.len(), 2);
2789        assert_eq!(clauses[0].0, UpdateAction::Set);
2790        assert_eq!(clauses[1].0, UpdateAction::Remove);
2791    }
2792
2793    #[test]
2794    fn test_parse_update_clauses_list_append_single_assignment() {
2795        // Before fix: naive comma split tore list_append(#0, :0) at the
2796        // inner comma, producing two bogus assignments instead of one.
2797        let clauses = parse_update_clauses("SET #0 = list_append(#0, :0)");
2798        assert_eq!(clauses.len(), 1);
2799        assert_eq!(clauses[0].0, UpdateAction::Set);
2800        assert_eq!(
2801            clauses[0].1.len(),
2802            1,
2803            "list_append(a, b) must be kept as a single assignment, not split at the inner comma"
2804        );
2805    }
2806
2807    #[test]
2808    fn test_parse_update_clauses_list_append_mixed_with_plain_set() {
2809        // list_append assignment followed by a plain SET — the comma between
2810        // the two assignments must still split them, while the comma inside
2811        // the list_append call must not.
2812        let clauses = parse_update_clauses("SET #0 = list_append(#0, :new), #1 = :other");
2813        assert_eq!(clauses.len(), 1);
2814        assert_eq!(clauses[0].0, UpdateAction::Set);
2815        assert_eq!(
2816            clauses[0].1.len(),
2817            2,
2818            "two SET assignments: one list_append and one plain"
2819        );
2820    }
2821
2822    #[test]
2823    fn test_evaluate_key_condition_simple() {
2824        let mut item = HashMap::new();
2825        item.insert("pk".to_string(), json!({"S": "user1"}));
2826        item.insert("sk".to_string(), json!({"S": "order1"}));
2827
2828        let mut expr_values = HashMap::new();
2829        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
2830
2831        assert!(evaluate_key_condition(
2832            "pk = :pk",
2833            &item,
2834            &HashMap::new(),
2835            &expr_values,
2836        ));
2837    }
2838
2839    #[test]
2840    fn test_compare_attribute_values_numbers() {
2841        let a = json!({"N": "10"});
2842        let b = json!({"N": "20"});
2843        assert_eq!(
2844            compare_attribute_values(Some(&a), Some(&b)),
2845            std::cmp::Ordering::Less
2846        );
2847    }
2848
2849    #[test]
2850    fn test_compare_attribute_values_strings() {
2851        let a = json!({"S": "apple"});
2852        let b = json!({"S": "banana"});
2853        assert_eq!(
2854            compare_attribute_values(Some(&a), Some(&b)),
2855            std::cmp::Ordering::Less
2856        );
2857    }
2858
2859    #[test]
2860    fn test_split_on_and() {
2861        let parts = split_on_and("pk = :pk AND sk > :sk");
2862        assert_eq!(parts.len(), 2);
2863        assert_eq!(parts[0].trim(), "pk = :pk");
2864        assert_eq!(parts[1].trim(), "sk > :sk");
2865    }
2866
2867    #[test]
2868    fn test_split_on_and_respects_parentheses() {
2869        // Before fix: split_on_and would split inside the parens
2870        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
2871        // Should NOT split on the AND inside parentheses
2872        assert_eq!(parts.len(), 1);
2873        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
2874    }
2875
2876    #[test]
2877    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
2878        // (a AND b) OR c — should match when c is true but a is false
2879        let mut item = HashMap::new();
2880        item.insert("x".to_string(), json!({"S": "no"}));
2881        item.insert("y".to_string(), json!({"S": "no"}));
2882        item.insert("z".to_string(), json!({"S": "yes"}));
2883
2884        let mut expr_values = HashMap::new();
2885        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
2886
2887        // x=yes AND y=yes => false, but z=yes => true => overall true
2888        let result = evaluate_filter_expression(
2889            "(x = :yes AND y = :yes) OR z = :yes",
2890            &item,
2891            &HashMap::new(),
2892            &expr_values,
2893        );
2894        assert!(result, "should match because z = :yes is true");
2895
2896        // x=yes AND y=yes => false, z=yes => false => overall false
2897        let mut item2 = HashMap::new();
2898        item2.insert("x".to_string(), json!({"S": "no"}));
2899        item2.insert("y".to_string(), json!({"S": "no"}));
2900        item2.insert("z".to_string(), json!({"S": "no"}));
2901
2902        let result2 = evaluate_filter_expression(
2903            "(x = :yes AND y = :yes) OR z = :yes",
2904            &item2,
2905            &HashMap::new(),
2906            &expr_values,
2907        );
2908        assert!(!result2, "should not match because nothing is true");
2909    }
2910
2911    #[test]
2912    fn test_project_item_nested_path() {
2913        // Item with a list attribute containing maps
2914        let mut item = HashMap::new();
2915        item.insert("pk".to_string(), json!({"S": "key1"}));
2916        item.insert(
2917            "data".to_string(),
2918            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
2919        );
2920
2921        let body = json!({
2922            "ProjectionExpression": "data[0].name"
2923        });
2924
2925        let projected = project_item(&item, &body);
2926        // Should contain data[0].name = "Alice", not the entire data[0] element
2927        let name = projected
2928            .get("data")
2929            .and_then(|v| v.get("L"))
2930            .and_then(|v| v.get(0))
2931            .and_then(|v| v.get("M"))
2932            .and_then(|v| v.get("name"))
2933            .and_then(|v| v.get("S"))
2934            .and_then(|v| v.as_str());
2935        assert_eq!(name, Some("Alice"));
2936
2937        // Should NOT contain the "age" field
2938        let age = projected
2939            .get("data")
2940            .and_then(|v| v.get("L"))
2941            .and_then(|v| v.get(0))
2942            .and_then(|v| v.get("M"))
2943            .and_then(|v| v.get("age"));
2944        assert!(age.is_none(), "age should not be present in projection");
2945    }
2946
2947    #[test]
2948    fn test_resolve_nested_path_map() {
2949        let mut item = HashMap::new();
2950        item.insert(
2951            "info".to_string(),
2952            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
2953        );
2954
2955        let result = resolve_nested_path(&item, "info.address.city");
2956        assert_eq!(result, Some(json!({"S": "NYC"})));
2957    }
2958
2959    #[test]
2960    fn test_resolve_nested_path_list_then_map() {
2961        let mut item = HashMap::new();
2962        item.insert(
2963            "items".to_string(),
2964            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
2965        );
2966
2967        let result = resolve_nested_path(&item, "items[0].sku");
2968        assert_eq!(result, Some(json!({"S": "ABC"})));
2969    }
2970
2971    // -- Integration-style tests using DynamoDbService --
2972
2973    use crate::state::SharedDynamoDbState;
2974    use parking_lot::RwLock;
2975    use std::sync::Arc;
2976
2977    fn make_service() -> DynamoDbService {
2978        let state: SharedDynamoDbState = Arc::new(RwLock::new(
2979            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2980        ));
2981        DynamoDbService::new(state)
2982    }
2983
2984    fn make_request(action: &str, body: Value) -> AwsRequest {
2985        AwsRequest {
2986            service: "dynamodb".to_string(),
2987            action: action.to_string(),
2988            region: "us-east-1".to_string(),
2989            account_id: "123456789012".to_string(),
2990            request_id: "test-id".to_string(),
2991            headers: http::HeaderMap::new(),
2992            query_params: HashMap::new(),
2993            body: serde_json::to_vec(&body).unwrap().into(),
2994            path_segments: vec![],
2995            raw_path: "/".to_string(),
2996            raw_query: String::new(),
2997            method: http::Method::POST,
2998            is_query_protocol: false,
2999            access_key_id: None,
3000            principal: None,
3001        }
3002    }
3003
3004    fn create_test_table(svc: &DynamoDbService) {
3005        let req = make_request(
3006            "CreateTable",
3007            json!({
3008                "TableName": "test-table",
3009                "KeySchema": [
3010                    { "AttributeName": "pk", "KeyType": "HASH" }
3011                ],
3012                "AttributeDefinitions": [
3013                    { "AttributeName": "pk", "AttributeType": "S" }
3014                ],
3015                "BillingMode": "PAY_PER_REQUEST"
3016            }),
3017        );
3018        svc.create_table(&req).unwrap();
3019    }
3020
3021    #[test]
3022    fn describe_table_returns_stable_table_id_and_active_warm_throughput() {
3023        let svc = make_service();
3024        let req = make_request(
3025            "CreateTable",
3026            json!({
3027                "TableName": "warm-throughput-table",
3028                "KeySchema": [
3029                    { "AttributeName": "pk", "KeyType": "HASH" }
3030                ],
3031                "AttributeDefinitions": [
3032                    { "AttributeName": "pk", "AttributeType": "S" }
3033                ],
3034                "BillingMode": "PAY_PER_REQUEST"
3035            }),
3036        );
3037        let create_resp = svc.create_table(&req).unwrap();
3038        let create_body: Value = serde_json::from_slice(create_resp.body.expect_bytes()).unwrap();
3039        let create_table = &create_body["TableDescription"];
3040
3041        assert_eq!(create_table["TableStatus"], "ACTIVE");
3042        assert_eq!(create_table["WarmThroughput"]["Status"], "ACTIVE");
3043        let table_id = create_table["TableId"].as_str().unwrap().to_string();
3044        assert!(!table_id.is_empty());
3045
3046        let describe_req = make_request(
3047            "DescribeTable",
3048            json!({ "TableName": "warm-throughput-table" }),
3049        );
3050        let describe_resp = svc.describe_table(&describe_req).unwrap();
3051        let describe_body: Value =
3052            serde_json::from_slice(describe_resp.body.expect_bytes()).unwrap();
3053        let described_table = &describe_body["Table"];
3054
3055        assert_eq!(described_table["TableStatus"], "ACTIVE");
3056        assert_eq!(described_table["WarmThroughput"]["Status"], "ACTIVE");
3057        assert_eq!(described_table["TableId"], table_id);
3058
3059        let describe_resp_again = svc.describe_table(&describe_req).unwrap();
3060        let describe_body_again: Value =
3061            serde_json::from_slice(describe_resp_again.body.expect_bytes()).unwrap();
3062        assert_eq!(describe_body_again["Table"]["TableId"], table_id);
3063    }
3064
3065    #[test]
3066    fn delete_item_return_values_all_old() {
3067        let svc = make_service();
3068        create_test_table(&svc);
3069
3070        // Put an item
3071        let req = make_request(
3072            "PutItem",
3073            json!({
3074                "TableName": "test-table",
3075                "Item": {
3076                    "pk": { "S": "key1" },
3077                    "name": { "S": "Alice" },
3078                    "age": { "N": "30" }
3079                }
3080            }),
3081        );
3082        svc.put_item(&req).unwrap();
3083
3084        // Delete with ReturnValues=ALL_OLD
3085        let req = make_request(
3086            "DeleteItem",
3087            json!({
3088                "TableName": "test-table",
3089                "Key": { "pk": { "S": "key1" } },
3090                "ReturnValues": "ALL_OLD"
3091            }),
3092        );
3093        let resp = svc.delete_item(&req).unwrap();
3094        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3095
3096        // Verify the old item is returned
3097        let attrs = &body["Attributes"];
3098        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
3099        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
3100        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
3101
3102        // Verify the item is actually deleted
3103        let req = make_request(
3104            "GetItem",
3105            json!({
3106                "TableName": "test-table",
3107                "Key": { "pk": { "S": "key1" } }
3108            }),
3109        );
3110        let resp = svc.get_item(&req).unwrap();
3111        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3112        assert!(body.get("Item").is_none(), "item should be deleted");
3113    }
3114
3115    #[test]
3116    fn transact_get_items_returns_existing_and_missing() {
3117        let svc = make_service();
3118        create_test_table(&svc);
3119
3120        // Put one item
3121        let req = make_request(
3122            "PutItem",
3123            json!({
3124                "TableName": "test-table",
3125                "Item": {
3126                    "pk": { "S": "exists" },
3127                    "val": { "S": "hello" }
3128                }
3129            }),
3130        );
3131        svc.put_item(&req).unwrap();
3132
3133        let req = make_request(
3134            "TransactGetItems",
3135            json!({
3136                "TransactItems": [
3137                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
3138                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
3139                ]
3140            }),
3141        );
3142        let resp = svc.transact_get_items(&req).unwrap();
3143        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3144        let responses = body["Responses"].as_array().unwrap();
3145        assert_eq!(responses.len(), 2);
3146        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
3147        assert!(responses[1].get("Item").is_none());
3148    }
3149
3150    #[test]
3151    fn transact_write_items_put_and_delete() {
3152        let svc = make_service();
3153        create_test_table(&svc);
3154
3155        // Put initial item
3156        let req = make_request(
3157            "PutItem",
3158            json!({
3159                "TableName": "test-table",
3160                "Item": {
3161                    "pk": { "S": "to-delete" },
3162                    "val": { "S": "bye" }
3163                }
3164            }),
3165        );
3166        svc.put_item(&req).unwrap();
3167
3168        // TransactWrite: put new + delete existing
3169        let req = make_request(
3170            "TransactWriteItems",
3171            json!({
3172                "TransactItems": [
3173                    {
3174                        "Put": {
3175                            "TableName": "test-table",
3176                            "Item": {
3177                                "pk": { "S": "new-item" },
3178                                "val": { "S": "hi" }
3179                            }
3180                        }
3181                    },
3182                    {
3183                        "Delete": {
3184                            "TableName": "test-table",
3185                            "Key": { "pk": { "S": "to-delete" } }
3186                        }
3187                    }
3188                ]
3189            }),
3190        );
3191        let resp = svc.transact_write_items(&req).unwrap();
3192        assert_eq!(resp.status, StatusCode::OK);
3193
3194        // Verify new item exists
3195        let req = make_request(
3196            "GetItem",
3197            json!({
3198                "TableName": "test-table",
3199                "Key": { "pk": { "S": "new-item" } }
3200            }),
3201        );
3202        let resp = svc.get_item(&req).unwrap();
3203        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3204        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
3205
3206        // Verify deleted item is gone
3207        let req = make_request(
3208            "GetItem",
3209            json!({
3210                "TableName": "test-table",
3211                "Key": { "pk": { "S": "to-delete" } }
3212            }),
3213        );
3214        let resp = svc.get_item(&req).unwrap();
3215        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3216        assert!(body.get("Item").is_none());
3217    }
3218
3219    #[test]
3220    fn transact_write_items_condition_check_failure() {
3221        let svc = make_service();
3222        create_test_table(&svc);
3223
3224        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
3225        let req = make_request(
3226            "TransactWriteItems",
3227            json!({
3228                "TransactItems": [
3229                    {
3230                        "ConditionCheck": {
3231                            "TableName": "test-table",
3232                            "Key": { "pk": { "S": "nonexistent" } },
3233                            "ConditionExpression": "attribute_exists(pk)"
3234                        }
3235                    }
3236                ]
3237            }),
3238        );
3239        let resp = svc.transact_write_items(&req).unwrap();
3240        // Should be a 400 error response
3241        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
3242        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3243        assert_eq!(
3244            body["__type"].as_str().unwrap(),
3245            "TransactionCanceledException"
3246        );
3247        assert!(body["CancellationReasons"].as_array().is_some());
3248    }
3249
3250    #[test]
3251    fn update_and_describe_time_to_live() {
3252        let svc = make_service();
3253        create_test_table(&svc);
3254
3255        // Enable TTL
3256        let req = make_request(
3257            "UpdateTimeToLive",
3258            json!({
3259                "TableName": "test-table",
3260                "TimeToLiveSpecification": {
3261                    "AttributeName": "ttl",
3262                    "Enabled": true
3263                }
3264            }),
3265        );
3266        let resp = svc.update_time_to_live(&req).unwrap();
3267        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3268        assert_eq!(
3269            body["TimeToLiveSpecification"]["AttributeName"]
3270                .as_str()
3271                .unwrap(),
3272            "ttl"
3273        );
3274        assert!(body["TimeToLiveSpecification"]["Enabled"]
3275            .as_bool()
3276            .unwrap());
3277
3278        // Describe TTL
3279        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3280        let resp = svc.describe_time_to_live(&req).unwrap();
3281        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3282        assert_eq!(
3283            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3284                .as_str()
3285                .unwrap(),
3286            "ENABLED"
3287        );
3288        assert_eq!(
3289            body["TimeToLiveDescription"]["AttributeName"]
3290                .as_str()
3291                .unwrap(),
3292            "ttl"
3293        );
3294
3295        // Disable TTL
3296        let req = make_request(
3297            "UpdateTimeToLive",
3298            json!({
3299                "TableName": "test-table",
3300                "TimeToLiveSpecification": {
3301                    "AttributeName": "ttl",
3302                    "Enabled": false
3303                }
3304            }),
3305        );
3306        svc.update_time_to_live(&req).unwrap();
3307
3308        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3309        let resp = svc.describe_time_to_live(&req).unwrap();
3310        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3311        assert_eq!(
3312            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3313                .as_str()
3314                .unwrap(),
3315            "DISABLED"
3316        );
3317    }
3318
3319    #[test]
3320    fn resource_policy_lifecycle() {
3321        let svc = make_service();
3322        create_test_table(&svc);
3323
3324        let table_arn = {
3325            let __mas = svc.state.read();
3326            let state = __mas.default_ref();
3327            state.tables.get("test-table").unwrap().arn.clone()
3328        };
3329
3330        // Put policy
3331        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
3332        let req = make_request(
3333            "PutResourcePolicy",
3334            json!({
3335                "ResourceArn": table_arn,
3336                "Policy": policy_doc
3337            }),
3338        );
3339        let resp = svc.put_resource_policy(&req).unwrap();
3340        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3341        assert!(body["RevisionId"].as_str().is_some());
3342
3343        // Get policy
3344        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3345        let resp = svc.get_resource_policy(&req).unwrap();
3346        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3347        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
3348
3349        // Delete policy
3350        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
3351        svc.delete_resource_policy(&req).unwrap();
3352
3353        // Get should return null now
3354        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3355        let resp = svc.get_resource_policy(&req).unwrap();
3356        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3357        assert!(body["Policy"].is_null());
3358    }
3359
3360    #[test]
3361    fn describe_endpoints() {
3362        let svc = make_service();
3363        let req = make_request("DescribeEndpoints", json!({}));
3364        let resp = svc.describe_endpoints(&req).unwrap();
3365        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3366        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
3367    }
3368
3369    #[test]
3370    fn describe_limits() {
3371        let svc = make_service();
3372        let req = make_request("DescribeLimits", json!({}));
3373        let resp = svc.describe_limits(&req).unwrap();
3374        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3375        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
3376    }
3377
3378    #[test]
3379    fn backup_lifecycle() {
3380        let svc = make_service();
3381        create_test_table(&svc);
3382
3383        // Create backup
3384        let req = make_request(
3385            "CreateBackup",
3386            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
3387        );
3388        let resp = svc.create_backup(&req).unwrap();
3389        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3390        let backup_arn = body["BackupDetails"]["BackupArn"]
3391            .as_str()
3392            .unwrap()
3393            .to_string();
3394        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
3395
3396        // Describe backup
3397        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
3398        let resp = svc.describe_backup(&req).unwrap();
3399        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3400        assert_eq!(
3401            body["BackupDescription"]["BackupDetails"]["BackupName"],
3402            "my-backup"
3403        );
3404
3405        // List backups
3406        let req = make_request("ListBackups", json!({}));
3407        let resp = svc.list_backups(&req).unwrap();
3408        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3409        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
3410
3411        // Restore from backup
3412        let req = make_request(
3413            "RestoreTableFromBackup",
3414            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
3415        );
3416        svc.restore_table_from_backup(&req).unwrap();
3417
3418        // Verify restored table exists
3419        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
3420        let resp = svc.describe_table(&req).unwrap();
3421        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3422        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3423
3424        // Delete backup
3425        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
3426        svc.delete_backup(&req).unwrap();
3427
3428        // List should be empty
3429        let req = make_request("ListBackups", json!({}));
3430        let resp = svc.list_backups(&req).unwrap();
3431        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3432        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
3433    }
3434
3435    #[test]
3436    fn continuous_backups() {
3437        let svc = make_service();
3438        create_test_table(&svc);
3439
3440        // Initially disabled
3441        let req = make_request(
3442            "DescribeContinuousBackups",
3443            json!({ "TableName": "test-table" }),
3444        );
3445        let resp = svc.describe_continuous_backups(&req).unwrap();
3446        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3447        assert_eq!(
3448            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3449                ["PointInTimeRecoveryStatus"],
3450            "DISABLED"
3451        );
3452
3453        // Enable
3454        let req = make_request(
3455            "UpdateContinuousBackups",
3456            json!({
3457                "TableName": "test-table",
3458                "PointInTimeRecoverySpecification": {
3459                    "PointInTimeRecoveryEnabled": true
3460                }
3461            }),
3462        );
3463        svc.update_continuous_backups(&req).unwrap();
3464
3465        // Verify
3466        let req = make_request(
3467            "DescribeContinuousBackups",
3468            json!({ "TableName": "test-table" }),
3469        );
3470        let resp = svc.describe_continuous_backups(&req).unwrap();
3471        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3472        assert_eq!(
3473            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3474                ["PointInTimeRecoveryStatus"],
3475            "ENABLED"
3476        );
3477    }
3478
3479    #[test]
3480    fn restore_table_to_point_in_time() {
3481        let svc = make_service();
3482        create_test_table(&svc);
3483
3484        let req = make_request(
3485            "RestoreTableToPointInTime",
3486            json!({
3487                "SourceTableName": "test-table",
3488                "TargetTableName": "pitr-restored"
3489            }),
3490        );
3491        svc.restore_table_to_point_in_time(&req).unwrap();
3492
3493        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
3494        let resp = svc.describe_table(&req).unwrap();
3495        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3496        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3497    }
3498
3499    #[test]
3500    fn global_table_lifecycle() {
3501        let svc = make_service();
3502
3503        // Create global table
3504        let req = make_request(
3505            "CreateGlobalTable",
3506            json!({
3507                "GlobalTableName": "my-global",
3508                "ReplicationGroup": [
3509                    { "RegionName": "us-east-1" },
3510                    { "RegionName": "eu-west-1" }
3511                ]
3512            }),
3513        );
3514        let resp = svc.create_global_table(&req).unwrap();
3515        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3516        assert_eq!(
3517            body["GlobalTableDescription"]["GlobalTableStatus"],
3518            "ACTIVE"
3519        );
3520
3521        // Describe
3522        let req = make_request(
3523            "DescribeGlobalTable",
3524            json!({ "GlobalTableName": "my-global" }),
3525        );
3526        let resp = svc.describe_global_table(&req).unwrap();
3527        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3528        assert_eq!(
3529            body["GlobalTableDescription"]["ReplicationGroup"]
3530                .as_array()
3531                .unwrap()
3532                .len(),
3533            2
3534        );
3535
3536        // List
3537        let req = make_request("ListGlobalTables", json!({}));
3538        let resp = svc.list_global_tables(&req).unwrap();
3539        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3540        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
3541
3542        // Update - add a region
3543        let req = make_request(
3544            "UpdateGlobalTable",
3545            json!({
3546                "GlobalTableName": "my-global",
3547                "ReplicaUpdates": [
3548                    { "Create": { "RegionName": "ap-southeast-1" } }
3549                ]
3550            }),
3551        );
3552        let resp = svc.update_global_table(&req).unwrap();
3553        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3554        assert_eq!(
3555            body["GlobalTableDescription"]["ReplicationGroup"]
3556                .as_array()
3557                .unwrap()
3558                .len(),
3559            3
3560        );
3561
3562        // Describe settings
3563        let req = make_request(
3564            "DescribeGlobalTableSettings",
3565            json!({ "GlobalTableName": "my-global" }),
3566        );
3567        let resp = svc.describe_global_table_settings(&req).unwrap();
3568        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3569        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
3570
3571        // Update settings (no-op, just verify no error)
3572        let req = make_request(
3573            "UpdateGlobalTableSettings",
3574            json!({ "GlobalTableName": "my-global" }),
3575        );
3576        svc.update_global_table_settings(&req).unwrap();
3577    }
3578
3579    #[test]
3580    fn table_replica_auto_scaling() {
3581        let svc = make_service();
3582        create_test_table(&svc);
3583
3584        let req = make_request(
3585            "DescribeTableReplicaAutoScaling",
3586            json!({ "TableName": "test-table" }),
3587        );
3588        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
3589        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3590        assert_eq!(
3591            body["TableAutoScalingDescription"]["TableName"],
3592            "test-table"
3593        );
3594
3595        let req = make_request(
3596            "UpdateTableReplicaAutoScaling",
3597            json!({ "TableName": "test-table" }),
3598        );
3599        svc.update_table_replica_auto_scaling(&req).unwrap();
3600    }
3601
3602    #[test]
3603    fn kinesis_streaming_lifecycle() {
3604        let svc = make_service();
3605        create_test_table(&svc);
3606
3607        // Enable
3608        let req = make_request(
3609            "EnableKinesisStreamingDestination",
3610            json!({
3611                "TableName": "test-table",
3612                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3613            }),
3614        );
3615        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
3616        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3617        assert_eq!(body["DestinationStatus"], "ACTIVE");
3618
3619        // Describe
3620        let req = make_request(
3621            "DescribeKinesisStreamingDestination",
3622            json!({ "TableName": "test-table" }),
3623        );
3624        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
3625        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3626        assert_eq!(
3627            body["KinesisDataStreamDestinations"]
3628                .as_array()
3629                .unwrap()
3630                .len(),
3631            1
3632        );
3633
3634        // Update
3635        let req = make_request(
3636            "UpdateKinesisStreamingDestination",
3637            json!({
3638                "TableName": "test-table",
3639                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
3640                "UpdateKinesisStreamingConfiguration": {
3641                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
3642                }
3643            }),
3644        );
3645        svc.update_kinesis_streaming_destination(&req).unwrap();
3646
3647        // Disable
3648        let req = make_request(
3649            "DisableKinesisStreamingDestination",
3650            json!({
3651                "TableName": "test-table",
3652                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3653            }),
3654        );
3655        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
3656        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3657        assert_eq!(body["DestinationStatus"], "DISABLED");
3658    }
3659
3660    #[test]
3661    fn contributor_insights_lifecycle() {
3662        let svc = make_service();
3663        create_test_table(&svc);
3664
3665        // Initially disabled
3666        let req = make_request(
3667            "DescribeContributorInsights",
3668            json!({ "TableName": "test-table" }),
3669        );
3670        let resp = svc.describe_contributor_insights(&req).unwrap();
3671        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3672        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
3673
3674        // Enable
3675        let req = make_request(
3676            "UpdateContributorInsights",
3677            json!({
3678                "TableName": "test-table",
3679                "ContributorInsightsAction": "ENABLE"
3680            }),
3681        );
3682        let resp = svc.update_contributor_insights(&req).unwrap();
3683        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3684        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
3685
3686        // List
3687        let req = make_request("ListContributorInsights", json!({}));
3688        let resp = svc.list_contributor_insights(&req).unwrap();
3689        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3690        assert_eq!(
3691            body["ContributorInsightsSummaries"]
3692                .as_array()
3693                .unwrap()
3694                .len(),
3695            1
3696        );
3697    }
3698
3699    #[test]
3700    fn export_lifecycle() {
3701        let svc = make_service();
3702        create_test_table(&svc);
3703
3704        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
3705
3706        // Export
3707        let req = make_request(
3708            "ExportTableToPointInTime",
3709            json!({
3710                "TableArn": table_arn,
3711                "S3Bucket": "my-bucket"
3712            }),
3713        );
3714        let resp = svc.export_table_to_point_in_time(&req).unwrap();
3715        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3716        let export_arn = body["ExportDescription"]["ExportArn"]
3717            .as_str()
3718            .unwrap()
3719            .to_string();
3720        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
3721
3722        // Describe
3723        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
3724        let resp = svc.describe_export(&req).unwrap();
3725        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3726        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
3727
3728        // List
3729        let req = make_request("ListExports", json!({}));
3730        let resp = svc.list_exports(&req).unwrap();
3731        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3732        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
3733    }
3734
3735    #[test]
3736    fn import_lifecycle() {
3737        let svc = make_service();
3738
3739        let req = make_request(
3740            "ImportTable",
3741            json!({
3742                "InputFormat": "DYNAMODB_JSON",
3743                "S3BucketSource": { "S3Bucket": "import-bucket" },
3744                "TableCreationParameters": {
3745                    "TableName": "imported-table",
3746                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
3747                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
3748                }
3749            }),
3750        );
3751        let resp = svc.import_table(&req).unwrap();
3752        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3753        let import_arn = body["ImportTableDescription"]["ImportArn"]
3754            .as_str()
3755            .unwrap()
3756            .to_string();
3757        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3758
3759        // Describe import
3760        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
3761        let resp = svc.describe_import(&req).unwrap();
3762        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3763        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3764
3765        // List imports
3766        let req = make_request("ListImports", json!({}));
3767        let resp = svc.list_imports(&req).unwrap();
3768        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3769        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
3770
3771        // Verify the table was created
3772        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
3773        let resp = svc.describe_table(&req).unwrap();
3774        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3775        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3776    }
3777
3778    #[test]
3779    fn backup_restore_preserves_items() {
3780        let svc = make_service();
3781        create_test_table(&svc);
3782
3783        // Put 3 items
3784        for i in 1..=3 {
3785            let req = make_request(
3786                "PutItem",
3787                json!({
3788                    "TableName": "test-table",
3789                    "Item": {
3790                        "pk": { "S": format!("key{i}") },
3791                        "data": { "S": format!("value{i}") }
3792                    }
3793                }),
3794            );
3795            svc.put_item(&req).unwrap();
3796        }
3797
3798        // Create backup
3799        let req = make_request(
3800            "CreateBackup",
3801            json!({
3802                "TableName": "test-table",
3803                "BackupName": "my-backup"
3804            }),
3805        );
3806        let resp = svc.create_backup(&req).unwrap();
3807        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3808        let backup_arn = body["BackupDetails"]["BackupArn"]
3809            .as_str()
3810            .unwrap()
3811            .to_string();
3812
3813        // Delete all items from the original table
3814        for i in 1..=3 {
3815            let req = make_request(
3816                "DeleteItem",
3817                json!({
3818                    "TableName": "test-table",
3819                    "Key": { "pk": { "S": format!("key{i}") } }
3820                }),
3821            );
3822            svc.delete_item(&req).unwrap();
3823        }
3824
3825        // Verify original table is empty
3826        let req = make_request("Scan", json!({ "TableName": "test-table" }));
3827        let resp = svc.scan(&req).unwrap();
3828        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3829        assert_eq!(body["Count"], 0);
3830
3831        // Restore from backup
3832        let req = make_request(
3833            "RestoreTableFromBackup",
3834            json!({
3835                "BackupArn": backup_arn,
3836                "TargetTableName": "restored-table"
3837            }),
3838        );
3839        svc.restore_table_from_backup(&req).unwrap();
3840
3841        // Scan restored table — should have 3 items
3842        let req = make_request("Scan", json!({ "TableName": "restored-table" }));
3843        let resp = svc.scan(&req).unwrap();
3844        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3845        assert_eq!(body["Count"], 3);
3846        assert_eq!(body["Items"].as_array().unwrap().len(), 3);
3847    }
3848
3849    #[test]
3850    fn global_table_replicates_writes() {
3851        let svc = make_service();
3852        create_test_table(&svc);
3853
3854        // Create global table with replicas
3855        let req = make_request(
3856            "CreateGlobalTable",
3857            json!({
3858                "GlobalTableName": "test-table",
3859                "ReplicationGroup": [
3860                    { "RegionName": "us-east-1" },
3861                    { "RegionName": "eu-west-1" }
3862                ]
3863            }),
3864        );
3865        let resp = svc.create_global_table(&req).unwrap();
3866        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3867        assert_eq!(
3868            body["GlobalTableDescription"]["GlobalTableStatus"],
3869            "ACTIVE"
3870        );
3871
3872        // Put an item
3873        let req = make_request(
3874            "PutItem",
3875            json!({
3876                "TableName": "test-table",
3877                "Item": {
3878                    "pk": { "S": "replicated-key" },
3879                    "data": { "S": "replicated-value" }
3880                }
3881            }),
3882        );
3883        svc.put_item(&req).unwrap();
3884
3885        // Verify the item is readable (since all replicas share the same table)
3886        let req = make_request(
3887            "GetItem",
3888            json!({
3889                "TableName": "test-table",
3890                "Key": { "pk": { "S": "replicated-key" } }
3891            }),
3892        );
3893        let resp = svc.get_item(&req).unwrap();
3894        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3895        assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
3896        assert_eq!(body["Item"]["data"]["S"], "replicated-value");
3897    }
3898
3899    #[test]
3900    fn contributor_insights_tracks_access() {
3901        let svc = make_service();
3902        create_test_table(&svc);
3903
3904        // Enable contributor insights
3905        let req = make_request(
3906            "UpdateContributorInsights",
3907            json!({
3908                "TableName": "test-table",
3909                "ContributorInsightsAction": "ENABLE"
3910            }),
3911        );
3912        svc.update_contributor_insights(&req).unwrap();
3913
3914        // Put items with different partition keys
3915        for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
3916            let req = make_request(
3917                "PutItem",
3918                json!({
3919                    "TableName": "test-table",
3920                    "Item": {
3921                        "pk": { "S": key },
3922                        "data": { "S": "value" }
3923                    }
3924                }),
3925            );
3926            svc.put_item(&req).unwrap();
3927        }
3928
3929        // Get items (to also track read access)
3930        for _ in 0..3 {
3931            let req = make_request(
3932                "GetItem",
3933                json!({
3934                    "TableName": "test-table",
3935                    "Key": { "pk": { "S": "alpha" } }
3936                }),
3937            );
3938            svc.get_item(&req).unwrap();
3939        }
3940
3941        // Describe contributor insights — should show top contributors
3942        let req = make_request(
3943            "DescribeContributorInsights",
3944            json!({ "TableName": "test-table" }),
3945        );
3946        let resp = svc.describe_contributor_insights(&req).unwrap();
3947        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3948        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
3949
3950        let contributors = body["TopContributors"].as_array().unwrap();
3951        assert!(
3952            !contributors.is_empty(),
3953            "TopContributors should not be empty"
3954        );
3955
3956        // alpha was accessed 3 (put) + 3 (get) = 6 times, beta 2 times
3957        // alpha should be the top contributor
3958        let top = &contributors[0];
3959        assert!(top["Count"].as_u64().unwrap() > 0);
3960
3961        // Verify the rule list is populated
3962        let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
3963        assert!(!rules.is_empty());
3964    }
3965
3966    #[test]
3967    fn contributor_insights_not_tracked_when_disabled() {
3968        let svc = make_service();
3969        create_test_table(&svc);
3970
3971        // Put items without enabling insights
3972        let req = make_request(
3973            "PutItem",
3974            json!({
3975                "TableName": "test-table",
3976                "Item": {
3977                    "pk": { "S": "key1" },
3978                    "data": { "S": "value" }
3979                }
3980            }),
3981        );
3982        svc.put_item(&req).unwrap();
3983
3984        // Describe — should show empty contributors
3985        let req = make_request(
3986            "DescribeContributorInsights",
3987            json!({ "TableName": "test-table" }),
3988        );
3989        let resp = svc.describe_contributor_insights(&req).unwrap();
3990        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3991        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
3992
3993        let contributors = body["TopContributors"].as_array().unwrap();
3994        assert!(contributors.is_empty());
3995    }
3996
3997    #[test]
3998    fn contributor_insights_disabled_table_no_counters_after_scan() {
3999        let svc = make_service();
4000        create_test_table(&svc);
4001
4002        // Put items
4003        for key in &["alpha", "beta"] {
4004            let req = make_request(
4005                "PutItem",
4006                json!({
4007                    "TableName": "test-table",
4008                    "Item": { "pk": { "S": key } }
4009                }),
4010            );
4011            svc.put_item(&req).unwrap();
4012        }
4013
4014        // Enable insights, then scan, then disable, then check counters are cleared
4015        let req = make_request(
4016            "UpdateContributorInsights",
4017            json!({
4018                "TableName": "test-table",
4019                "ContributorInsightsAction": "ENABLE"
4020            }),
4021        );
4022        svc.update_contributor_insights(&req).unwrap();
4023
4024        // Scan to trigger counter collection
4025        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4026        svc.scan(&req).unwrap();
4027
4028        // Verify counters were collected
4029        let req = make_request(
4030            "DescribeContributorInsights",
4031            json!({ "TableName": "test-table" }),
4032        );
4033        let resp = svc.describe_contributor_insights(&req).unwrap();
4034        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4035        let contributors = body["TopContributors"].as_array().unwrap();
4036        assert!(
4037            !contributors.is_empty(),
4038            "counters should be non-empty while enabled"
4039        );
4040
4041        // Disable insights (this clears counters)
4042        let req = make_request(
4043            "UpdateContributorInsights",
4044            json!({
4045                "TableName": "test-table",
4046                "ContributorInsightsAction": "DISABLE"
4047            }),
4048        );
4049        svc.update_contributor_insights(&req).unwrap();
4050
4051        // Scan again -- should NOT accumulate counters since insights is disabled
4052        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4053        svc.scan(&req).unwrap();
4054
4055        // Verify counters are still empty
4056        let req = make_request(
4057            "DescribeContributorInsights",
4058            json!({ "TableName": "test-table" }),
4059        );
4060        let resp = svc.describe_contributor_insights(&req).unwrap();
4061        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4062        let contributors = body["TopContributors"].as_array().unwrap();
4063        assert!(
4064            contributors.is_empty(),
4065            "counters should be empty after disabling insights"
4066        );
4067    }
4068
4069    #[test]
4070    fn scan_pagination_with_limit() {
4071        let svc = make_service();
4072        create_test_table(&svc);
4073
4074        // Insert 5 items
4075        for i in 0..5 {
4076            let req = make_request(
4077                "PutItem",
4078                json!({
4079                    "TableName": "test-table",
4080                    "Item": {
4081                        "pk": { "S": format!("item{i}") },
4082                        "data": { "S": format!("value{i}") }
4083                    }
4084                }),
4085            );
4086            svc.put_item(&req).unwrap();
4087        }
4088
4089        // Scan with limit=2
4090        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
4091        let resp = svc.scan(&req).unwrap();
4092        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4093        assert_eq!(body["Count"], 2);
4094        assert!(
4095            body["LastEvaluatedKey"].is_object(),
4096            "should have LastEvaluatedKey when limit < total items"
4097        );
4098        assert!(body["LastEvaluatedKey"]["pk"].is_object());
4099
4100        // Page through all items
4101        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
4102        let mut lek = body["LastEvaluatedKey"].clone();
4103
4104        while lek.is_object() {
4105            let req = make_request(
4106                "Scan",
4107                json!({
4108                    "TableName": "test-table",
4109                    "Limit": 2,
4110                    "ExclusiveStartKey": lek
4111                }),
4112            );
4113            let resp = svc.scan(&req).unwrap();
4114            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4115            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
4116            lek = body["LastEvaluatedKey"].clone();
4117        }
4118
4119        assert_eq!(
4120            all_items.len(),
4121            5,
4122            "should retrieve all 5 items via pagination"
4123        );
4124    }
4125
4126    #[test]
4127    fn scan_no_pagination_when_all_fit() {
4128        let svc = make_service();
4129        create_test_table(&svc);
4130
4131        for i in 0..3 {
4132            let req = make_request(
4133                "PutItem",
4134                json!({
4135                    "TableName": "test-table",
4136                    "Item": {
4137                        "pk": { "S": format!("item{i}") }
4138                    }
4139                }),
4140            );
4141            svc.put_item(&req).unwrap();
4142        }
4143
4144        // Scan with limit > item count
4145        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
4146        let resp = svc.scan(&req).unwrap();
4147        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4148        assert_eq!(body["Count"], 3);
4149        assert!(
4150            body["LastEvaluatedKey"].is_null(),
4151            "should not have LastEvaluatedKey when all items fit"
4152        );
4153
4154        // Scan without limit
4155        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4156        let resp = svc.scan(&req).unwrap();
4157        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4158        assert_eq!(body["Count"], 3);
4159        assert!(body["LastEvaluatedKey"].is_null());
4160    }
4161
4162    fn create_composite_table(svc: &DynamoDbService) {
4163        let req = make_request(
4164            "CreateTable",
4165            json!({
4166                "TableName": "composite-table",
4167                "KeySchema": [
4168                    { "AttributeName": "pk", "KeyType": "HASH" },
4169                    { "AttributeName": "sk", "KeyType": "RANGE" }
4170                ],
4171                "AttributeDefinitions": [
4172                    { "AttributeName": "pk", "AttributeType": "S" },
4173                    { "AttributeName": "sk", "AttributeType": "S" }
4174                ],
4175                "BillingMode": "PAY_PER_REQUEST"
4176            }),
4177        );
4178        svc.create_table(&req).unwrap();
4179    }
4180
4181    #[test]
4182    fn query_pagination_with_composite_key() {
4183        let svc = make_service();
4184        create_composite_table(&svc);
4185
4186        // Insert 5 items under the same partition key
4187        for i in 0..5 {
4188            let req = make_request(
4189                "PutItem",
4190                json!({
4191                    "TableName": "composite-table",
4192                    "Item": {
4193                        "pk": { "S": "user1" },
4194                        "sk": { "S": format!("item{i:03}") },
4195                        "data": { "S": format!("value{i}") }
4196                    }
4197                }),
4198            );
4199            svc.put_item(&req).unwrap();
4200        }
4201
4202        // Query with limit=2
4203        let req = make_request(
4204            "Query",
4205            json!({
4206                "TableName": "composite-table",
4207                "KeyConditionExpression": "pk = :pk",
4208                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4209                "Limit": 2
4210            }),
4211        );
4212        let resp = svc.query(&req).unwrap();
4213        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4214        assert_eq!(body["Count"], 2);
4215        assert!(body["LastEvaluatedKey"].is_object());
4216        assert!(body["LastEvaluatedKey"]["pk"].is_object());
4217        assert!(body["LastEvaluatedKey"]["sk"].is_object());
4218
4219        // Page through all items
4220        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
4221        let mut lek = body["LastEvaluatedKey"].clone();
4222
4223        while lek.is_object() {
4224            let req = make_request(
4225                "Query",
4226                json!({
4227                    "TableName": "composite-table",
4228                    "KeyConditionExpression": "pk = :pk",
4229                    "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4230                    "Limit": 2,
4231                    "ExclusiveStartKey": lek
4232                }),
4233            );
4234            let resp = svc.query(&req).unwrap();
4235            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4236            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
4237            lek = body["LastEvaluatedKey"].clone();
4238        }
4239
4240        assert_eq!(
4241            all_items.len(),
4242            5,
4243            "should retrieve all 5 items via pagination"
4244        );
4245
4246        // Verify items came back sorted by sort key
4247        let sks: Vec<String> = all_items
4248            .iter()
4249            .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
4250            .collect();
4251        let mut sorted = sks.clone();
4252        sorted.sort();
4253        assert_eq!(sks, sorted, "items should be sorted by sort key");
4254    }
4255
4256    #[test]
4257    fn query_no_pagination_when_all_fit() {
4258        let svc = make_service();
4259        create_composite_table(&svc);
4260
4261        for i in 0..2 {
4262            let req = make_request(
4263                "PutItem",
4264                json!({
4265                    "TableName": "composite-table",
4266                    "Item": {
4267                        "pk": { "S": "user1" },
4268                        "sk": { "S": format!("item{i}") }
4269                    }
4270                }),
4271            );
4272            svc.put_item(&req).unwrap();
4273        }
4274
4275        let req = make_request(
4276            "Query",
4277            json!({
4278                "TableName": "composite-table",
4279                "KeyConditionExpression": "pk = :pk",
4280                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4281                "Limit": 10
4282            }),
4283        );
4284        let resp = svc.query(&req).unwrap();
4285        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4286        assert_eq!(body["Count"], 2);
4287        assert!(
4288            body["LastEvaluatedKey"].is_null(),
4289            "should not have LastEvaluatedKey when all items fit"
4290        );
4291    }
4292
4293    fn create_gsi_table(svc: &DynamoDbService) {
4294        let req = make_request(
4295            "CreateTable",
4296            json!({
4297                "TableName": "gsi-table",
4298                "KeySchema": [
4299                    { "AttributeName": "pk", "KeyType": "HASH" }
4300                ],
4301                "AttributeDefinitions": [
4302                    { "AttributeName": "pk", "AttributeType": "S" },
4303                    { "AttributeName": "gsi_pk", "AttributeType": "S" },
4304                    { "AttributeName": "gsi_sk", "AttributeType": "S" }
4305                ],
4306                "BillingMode": "PAY_PER_REQUEST",
4307                "GlobalSecondaryIndexes": [
4308                    {
4309                        "IndexName": "gsi-index",
4310                        "KeySchema": [
4311                            { "AttributeName": "gsi_pk", "KeyType": "HASH" },
4312                            { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
4313                        ],
4314                        "Projection": { "ProjectionType": "ALL" }
4315                    }
4316                ]
4317            }),
4318        );
4319        svc.create_table(&req).unwrap();
4320    }
4321
4322    #[test]
4323    fn gsi_query_last_evaluated_key_includes_table_pk() {
4324        let svc = make_service();
4325        create_gsi_table(&svc);
4326
4327        // Insert 3 items with the SAME GSI key but different table PKs
4328        for i in 0..3 {
4329            let req = make_request(
4330                "PutItem",
4331                json!({
4332                    "TableName": "gsi-table",
4333                    "Item": {
4334                        "pk": { "S": format!("item{i}") },
4335                        "gsi_pk": { "S": "shared" },
4336                        "gsi_sk": { "S": "sort" }
4337                    }
4338                }),
4339            );
4340            svc.put_item(&req).unwrap();
4341        }
4342
4343        // Query GSI with Limit=1 to trigger pagination
4344        let req = make_request(
4345            "Query",
4346            json!({
4347                "TableName": "gsi-table",
4348                "IndexName": "gsi-index",
4349                "KeyConditionExpression": "gsi_pk = :v",
4350                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4351                "Limit": 1
4352            }),
4353        );
4354        let resp = svc.query(&req).unwrap();
4355        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4356        assert_eq!(body["Count"], 1);
4357        let lek = &body["LastEvaluatedKey"];
4358        assert!(lek.is_object(), "should have LastEvaluatedKey");
4359        // Must contain the index keys
4360        assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
4361        assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
4362        // Must also contain the table PK
4363        assert!(
4364            lek["pk"].is_object(),
4365            "LEK must contain table PK for GSI queries"
4366        );
4367    }
4368
4369    #[test]
4370    fn gsi_query_pagination_returns_all_items() {
4371        let svc = make_service();
4372        create_gsi_table(&svc);
4373
4374        // Insert 4 items with the SAME GSI key but different table PKs
4375        for i in 0..4 {
4376            let req = make_request(
4377                "PutItem",
4378                json!({
4379                    "TableName": "gsi-table",
4380                    "Item": {
4381                        "pk": { "S": format!("item{i:03}") },
4382                        "gsi_pk": { "S": "shared" },
4383                        "gsi_sk": { "S": "sort" }
4384                    }
4385                }),
4386            );
4387            svc.put_item(&req).unwrap();
4388        }
4389
4390        // Paginate through all items with Limit=2
4391        let mut all_pks = Vec::new();
4392        let mut lek: Option<Value> = None;
4393
4394        loop {
4395            let mut query = json!({
4396                "TableName": "gsi-table",
4397                "IndexName": "gsi-index",
4398                "KeyConditionExpression": "gsi_pk = :v",
4399                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4400                "Limit": 2
4401            });
4402            if let Some(ref start_key) = lek {
4403                query["ExclusiveStartKey"] = start_key.clone();
4404            }
4405
4406            let req = make_request("Query", query);
4407            let resp = svc.query(&req).unwrap();
4408            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4409
4410            for item in body["Items"].as_array().unwrap() {
4411                let pk = item["pk"]["S"].as_str().unwrap().to_string();
4412                all_pks.push(pk);
4413            }
4414
4415            if body["LastEvaluatedKey"].is_object() {
4416                lek = Some(body["LastEvaluatedKey"].clone());
4417            } else {
4418                break;
4419            }
4420        }
4421
4422        all_pks.sort();
4423        assert_eq!(
4424            all_pks,
4425            vec!["item000", "item001", "item002", "item003"],
4426            "pagination should return all items without duplicates"
4427        );
4428    }
4429
4430    fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
4431        pairs
4432            .iter()
4433            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4434            .collect()
4435    }
4436
4437    fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
4438        pairs
4439            .iter()
4440            .map(|(k, v)| (k.to_string(), v.to_string()))
4441            .collect()
4442    }
4443
4444    fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
4445        pairs
4446            .iter()
4447            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4448            .collect()
4449    }
4450
4451    #[test]
4452    fn test_evaluate_condition_bare_not_equal() {
4453        let item = cond_item(&[("state", "active")]);
4454        let names = cond_names(&[("#s", "state")]);
4455        let values = cond_values(&[(":c", "complete")]);
4456
4457        assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
4458
4459        let item2 = cond_item(&[("state", "complete")]);
4460        assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
4461    }
4462
4463    #[test]
4464    fn test_evaluate_condition_parenthesized_not_equal() {
4465        let item = cond_item(&[("state", "active")]);
4466        let names = cond_names(&[("#s", "state")]);
4467        let values = cond_values(&[(":c", "complete")]);
4468
4469        assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
4470    }
4471
4472    #[test]
4473    fn test_evaluate_condition_parenthesized_equal_mismatch() {
4474        let item = cond_item(&[("state", "active")]);
4475        let names = cond_names(&[("#s", "state")]);
4476        let values = cond_values(&[(":c", "complete")]);
4477
4478        assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
4479    }
4480
4481    #[test]
4482    fn test_evaluate_condition_compound_and() {
4483        let item = cond_item(&[("state", "active")]);
4484        let names = cond_names(&[("#s", "state")]);
4485        let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
4486
4487        // active <> complete AND active <> failed => true
4488        assert!(
4489            evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
4490        );
4491    }
4492
4493    #[test]
4494    fn test_evaluate_condition_compound_and_mismatch() {
4495        let item = cond_item(&[("state", "inactive")]);
4496        let names = cond_names(&[("#s", "state")]);
4497        let values = cond_values(&[(":a", "active"), (":b", "active")]);
4498
4499        // inactive = active AND inactive = active => false
4500        assert!(
4501            evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
4502        );
4503    }
4504
4505    #[test]
4506    fn test_evaluate_condition_compound_or() {
4507        let item = cond_item(&[("state", "running")]);
4508        let names = cond_names(&[("#s", "state")]);
4509        let values = cond_values(&[(":a", "active"), (":b", "idle")]);
4510
4511        // running = active OR running = idle => false
4512        assert!(
4513            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
4514        );
4515
4516        // running = active OR running = running => true
4517        let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
4518        assert!(
4519            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
4520        );
4521    }
4522
4523    #[test]
4524    fn test_evaluate_condition_not_operator() {
4525        let item = cond_item(&[("state", "active")]);
4526        let names = cond_names(&[("#s", "state")]);
4527        let values = cond_values(&[(":c", "complete")]);
4528
4529        // NOT (active = complete) => NOT false => true
4530        assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
4531
4532        // NOT (active <> complete) => NOT true => false
4533        assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
4534
4535        // NOT attribute_exists(#s) on existing item => NOT true => false
4536        assert!(
4537            evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
4538        );
4539
4540        // NOT attribute_exists(#s) on missing item => NOT false => true
4541        assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
4542    }
4543
4544    #[test]
4545    fn test_evaluate_condition_begins_with() {
4546        // After unification, conditions support begins_with via
4547        // evaluate_single_filter_condition (previously only filters had it).
4548        let item = cond_item(&[("name", "fakecloud-dynamodb")]);
4549        let names = cond_names(&[("#n", "name")]);
4550        let values = cond_values(&[(":p", "fakecloud")]);
4551
4552        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
4553
4554        let values2 = cond_values(&[(":p", "realcloud")]);
4555        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
4556    }
4557
4558    #[test]
4559    fn test_evaluate_condition_contains() {
4560        let item = cond_item(&[("tags", "alpha,beta,gamma")]);
4561        let names = cond_names(&[("#t", "tags")]);
4562        let values = cond_values(&[(":v", "beta")]);
4563
4564        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
4565
4566        let values2 = cond_values(&[(":v", "delta")]);
4567        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
4568    }
4569
4570    #[test]
4571    fn test_evaluate_condition_no_existing_item() {
4572        // When no item exists (PutItem with condition), attribute_not_exists
4573        // should succeed and attribute_exists should fail.
4574        let names = cond_names(&[("#s", "state")]);
4575        let values = cond_values(&[(":v", "active")]);
4576
4577        assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
4578        assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
4579        // Comparison against missing item: None != Some(val) => true for <>
4580        assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
4581        // None == Some(val) => false for =
4582        assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
4583    }
4584
4585    #[test]
4586    fn test_evaluate_filter_not_operator() {
4587        let item = cond_item(&[("status", "pending")]);
4588        let names = cond_names(&[("#s", "status")]);
4589        let values = cond_values(&[(":v", "pending")]);
4590
4591        assert!(!evaluate_filter_expression(
4592            "NOT (#s = :v)",
4593            &item,
4594            &names,
4595            &values
4596        ));
4597        assert!(evaluate_filter_expression(
4598            "NOT (#s <> :v)",
4599            &item,
4600            &names,
4601            &values
4602        ));
4603    }
4604
4605    #[test]
4606    fn test_evaluate_filter_expression_in_match() {
4607        // aws-sdk-go v2's expression.Name("state").In(Value("active"), Value("pending"))
4608        // emits "#0 IN (:0, :1)". Before fix: neither evaluate_single_filter_condition
4609        // nor evaluate_single_key_condition handled IN, so the filter leaf fell through
4610        // to the simple-comparison loop, hit no operators, and returned `true` — meaning
4611        // every item matched every IN filter regardless of value.
4612        let item = cond_item(&[("state", "active")]);
4613        let names = cond_names(&[("#s", "state")]);
4614        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4615
4616        assert!(
4617            evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4618            "state=active should match IN (active, pending)"
4619        );
4620    }
4621
4622    #[test]
4623    fn test_evaluate_filter_expression_in_no_match() {
4624        let item = cond_item(&[("state", "complete")]);
4625        let names = cond_names(&[("#s", "state")]);
4626        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4627
4628        assert!(
4629            !evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4630            "state=complete should not match IN (active, pending)"
4631        );
4632    }
4633
4634    #[test]
4635    fn test_evaluate_filter_expression_in_no_spaces() {
4636        // orderbot emits the raw form
4637        //     "#status IN (" + strings.Join(keys, ",") + ")"
4638        // which produces "IN (:v0,:v1,:v2)" — no spaces after commas. Must parse.
4639        let item = cond_item(&[("status", "shipped")]);
4640        let names = cond_names(&[("#s", "status")]);
4641        let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
4642
4643        assert!(
4644            evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
4645            "no-space IN list should still parse"
4646        );
4647    }
4648
4649    #[test]
4650    fn test_evaluate_filter_expression_in_missing_attribute() {
4651        // A missing attribute must not match any IN list — the silent-true
4652        // fallthrough would wrongly accept these items.
4653        let item: HashMap<String, AttributeValue> = HashMap::new();
4654        let names = cond_names(&[("#s", "state")]);
4655        let values = cond_values(&[(":a", "active")]);
4656
4657        assert!(
4658            !evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
4659            "missing attribute should not match any IN list"
4660        );
4661    }
4662
4663    #[test]
4664    fn test_evaluate_filter_expression_compound_in_and_eq() {
4665        // Shape emitted by `Name("state").In(...).And(Name("priority").Equal(...))`:
4666        //     "(#0 IN (:0, :1)) AND (#1 = :2)"
4667        // split_on_and handles the outer parens, but the IN leaf had the
4668        // silent-true fallthrough, so any item with priority=high would match
4669        // regardless of state.
4670        let item = cond_item(&[("state", "active"), ("priority", "high")]);
4671        let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
4672        let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
4673
4674        assert!(
4675            evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
4676            "(active IN (active, pending)) AND (high = high) should match"
4677        );
4678
4679        let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
4680        assert!(
4681            !evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
4682            "(complete IN (active, pending)) AND (high = high) should not match"
4683        );
4684    }
4685
4686    #[test]
4687    fn test_evaluate_condition_attribute_exists_with_space() {
4688        // aws-sdk-go v2's expression.NewBuilder emits function calls with a
4689        // space between the name and the opening paren:
4690        //     "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))"
4691        // Before fix: extract_function_arg used strip_prefix("attribute_exists(")
4692        // with no space, so these fell through the filter leaf entirely and
4693        // hit evaluate_single_key_condition's silent-true fallthrough —
4694        // every conditional write was silently accepted.
4695        let item = cond_item(&[("store_id", "s-1")]);
4696        let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
4697        let values = cond_values(&[(":0", "tab-A")]);
4698
4699        // On an existing item without active_viewer_tab_id: exists(store_id)
4700        // is true, not_exists(active_viewer_tab_id) is true → OK.
4701        assert!(
4702            evaluate_condition(
4703                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4704                Some(&item),
4705                &names,
4706                &values,
4707            )
4708            .is_ok(),
4709            "claim-lease compound on free item should succeed"
4710        );
4711
4712        // On a missing item: exists(store_id) is false → whole AND false → Err.
4713        assert!(
4714            evaluate_condition(
4715                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4716                None,
4717                &names,
4718                &values,
4719            )
4720            .is_err(),
4721            "claim-lease compound on missing item must fail attribute_exists branch"
4722        );
4723
4724        // On an item already held by tab-B: exists ✓, not_exists ✗, #1 = :0 ✗
4725        // → (✓) AND ((✗) OR (✗)) → false → Err.
4726        let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
4727        assert!(
4728            evaluate_condition(
4729                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4730                Some(&held),
4731                &names,
4732                &values,
4733            )
4734            .is_err(),
4735            "claim-lease compound on item held by another tab must fail"
4736        );
4737
4738        // Same tab re-claiming: exists ✓, not_exists ✗, #1 = :0 ✓
4739        // → (✓) AND ((✗) OR (✓)) → true → Ok.
4740        let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
4741        assert!(
4742            evaluate_condition(
4743                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4744                Some(&self_held),
4745                &names,
4746                &values,
4747            )
4748            .is_ok(),
4749            "same-tab re-claim must succeed"
4750        );
4751    }
4752
4753    #[test]
4754    fn test_evaluate_condition_in_match() {
4755        // evaluate_condition delegates to evaluate_filter_expression, so this
4756        // also proves the ConditionExpression path. Before fix: silently Ok.
4757        let item = cond_item(&[("state", "active")]);
4758        let names = cond_names(&[("#s", "state")]);
4759        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4760
4761        assert!(
4762            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
4763            "IN should succeed when actual value is in the list"
4764        );
4765    }
4766
4767    #[test]
4768    fn test_evaluate_condition_in_no_match() {
4769        // Before fix: evaluate_condition silently returned Ok(()) for IN — any
4770        // conditional write was accepted regardless of actual state, the
4771        // opposite of what the caller asked for.
4772        let item = cond_item(&[("state", "complete")]);
4773        let names = cond_names(&[("#s", "state")]);
4774        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4775
4776        assert!(
4777            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
4778            "IN should fail when actual value is not in the list"
4779        );
4780    }
4781
4782    #[test]
4783    fn test_apply_update_set_list_index_replaces_existing() {
4784        // Shape emitted by orderbot's order-item update retry loop:
4785        //     UpdateExpression: fmt.Sprintf("SET #items[%d] = :item", index)
4786        // Before fix: apply_set_assignment called resolve_attr_name on the
4787        // whole "#items[0]" token, which misses the name map, and then
4788        // item.insert("#items[0]", :item), producing a top-level key
4789        // literally named "#items[0]" rather than mutating the list.
4790        let mut item = HashMap::new();
4791        item.insert(
4792            "items".to_string(),
4793            json!({"L": [
4794                {"M": {"sku": {"S": "OLD-A"}}},
4795                {"M": {"sku": {"S": "OLD-B"}}},
4796            ]}),
4797        );
4798
4799        let names = cond_names(&[("#items", "items")]);
4800        let mut values = HashMap::new();
4801        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
4802
4803        apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
4804
4805        let items_list = item
4806            .get("items")
4807            .and_then(|v| v.get("L"))
4808            .and_then(|v| v.as_array())
4809            .expect("items should still be a list");
4810        assert_eq!(items_list.len(), 2, "list length should be unchanged");
4811        let sku0 = items_list[0]
4812            .get("M")
4813            .and_then(|m| m.get("sku"))
4814            .and_then(|s| s.get("S"))
4815            .and_then(|s| s.as_str());
4816        assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
4817        let sku1 = items_list[1]
4818            .get("M")
4819            .and_then(|m| m.get("sku"))
4820            .and_then(|s| s.get("S"))
4821            .and_then(|s| s.as_str());
4822        assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
4823
4824        assert!(!item.contains_key("items[0]"));
4825        assert!(!item.contains_key("#items[0]"));
4826    }
4827
4828    #[test]
4829    fn test_apply_update_set_list_index_second_slot() {
4830        let mut item = HashMap::new();
4831        item.insert(
4832            "items".to_string(),
4833            json!({"L": [
4834                {"M": {"sku": {"S": "A"}}},
4835                {"M": {"sku": {"S": "B"}}},
4836                {"M": {"sku": {"S": "C"}}},
4837            ]}),
4838        );
4839
4840        let names = cond_names(&[("#items", "items")]);
4841        let mut values = HashMap::new();
4842        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
4843
4844        apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
4845
4846        let items_list = item
4847            .get("items")
4848            .and_then(|v| v.get("L"))
4849            .and_then(|v| v.as_array())
4850            .unwrap();
4851        let skus: Vec<&str> = items_list
4852            .iter()
4853            .map(|v| {
4854                v.get("M")
4855                    .and_then(|m| m.get("sku"))
4856                    .and_then(|s| s.get("S"))
4857                    .and_then(|s| s.as_str())
4858                    .unwrap()
4859            })
4860            .collect();
4861        assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
4862    }
4863
4864    #[test]
4865    fn test_apply_update_set_list_index_without_name_ref() {
4866        // Same fix must also work when the LHS is a literal attribute name,
4867        // not an expression attribute name ref.
4868        let mut item = HashMap::new();
4869        item.insert(
4870            "tags".to_string(),
4871            json!({"L": [{"S": "red"}, {"S": "blue"}]}),
4872        );
4873
4874        let names: HashMap<String, String> = HashMap::new();
4875        let mut values = HashMap::new();
4876        values.insert(":t".to_string(), json!({"S": "green"}));
4877
4878        apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
4879
4880        let tags = item
4881            .get("tags")
4882            .and_then(|v| v.get("L"))
4883            .and_then(|v| v.as_array())
4884            .unwrap();
4885        assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
4886        assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
4887    }
4888
4889    #[test]
4890    fn test_list_append_into_empty_list() {
4891        // Regression: UpdateItem with `SET #0 = list_append(#0, :0)` where
4892        // the attribute already exists as an empty list silently no-oped.
4893        // Root cause: parse_update_clauses split `list_append(#0, :0)` at
4894        // the inner comma, so apply_set_list_append received a truncated
4895        // `rest` with no closing ')' and returned early without writing.
4896        let mut item = HashMap::new();
4897        item.insert("files".to_string(), json!({"L": []}));
4898
4899        let names = cond_names(&[("#0", "files")]);
4900        let mut values = HashMap::new();
4901        values.insert(
4902            ":0".to_string(),
4903            json!({"L": [{"M": {"field": {"S": "value"}}}]}),
4904        );
4905
4906        apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
4907            .unwrap();
4908
4909        let list = item
4910            .get("files")
4911            .and_then(|v| v.get("L"))
4912            .and_then(|v| v.as_array())
4913            .expect("files should be an L-typed attribute");
4914        assert_eq!(list.len(), 1, "one element should have been appended");
4915    }
4916
4917    #[test]
4918    fn test_list_append_into_nonempty_list() {
4919        // Verifies the same fix works when the existing list already has elements.
4920        let mut item = HashMap::new();
4921        item.insert(
4922            "files".to_string(),
4923            json!({"L": [{"M": {"field": {"S": "existing"}}}]}),
4924        );
4925
4926        let names = cond_names(&[("#0", "files")]);
4927        let mut values = HashMap::new();
4928        values.insert(
4929            ":0".to_string(),
4930            json!({"L": [{"M": {"field": {"S": "new"}}}]}),
4931        );
4932
4933        apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
4934            .unwrap();
4935
4936        let list = item
4937            .get("files")
4938            .and_then(|v| v.get("L"))
4939            .and_then(|v| v.as_array())
4940            .expect("files should be an L-typed attribute");
4941        assert_eq!(list.len(), 2, "existing element plus one new element");
4942    }
4943
4944    #[test]
4945    fn test_list_append_combined_with_plain_set() {
4946        // Verifies that a mixed expression like
4947        // `SET #a = list_append(#a, :v), #b = :other` correctly applies
4948        // both assignments after the paren-aware comma split fix.
4949        let mut item = HashMap::new();
4950        item.insert("logs".to_string(), json!({"L": []}));
4951        item.insert("count".to_string(), json!({"N": "0"}));
4952
4953        let names = cond_names(&[("#a", "logs"), ("#b", "count")]);
4954        let mut values = HashMap::new();
4955        values.insert(":v".to_string(), json!({"L": [{"S": "entry"}]}));
4956        values.insert(":other".to_string(), json!({"N": "1"}));
4957
4958        apply_update_expression(
4959            &mut item,
4960            "SET #a = list_append(#a, :v), #b = :other",
4961            &names,
4962            &values,
4963        )
4964        .unwrap();
4965
4966        let list = item
4967            .get("logs")
4968            .and_then(|v| v.get("L"))
4969            .and_then(|v| v.as_array())
4970            .expect("logs should be an L-typed attribute");
4971        assert_eq!(list.len(), 1, "one log entry appended");
4972
4973        let count = item
4974            .get("count")
4975            .and_then(|v| v.get("N"))
4976            .and_then(|v| v.as_str())
4977            .expect("count should be an N-typed attribute");
4978        assert_eq!(count, "1", "count updated to 1");
4979    }
4980
4981    #[test]
4982    fn test_unrecognized_expression_returns_false() {
4983        // evaluate_single_key_condition must fail-closed: an expression shape
4984        // it doesn't recognize should return false (reject), not true (accept).
4985        let item = cond_item(&[("x", "1")]);
4986        let names: HashMap<String, String> = HashMap::new();
4987        let values: HashMap<String, Value> = HashMap::new();
4988
4989        assert!(
4990            !evaluate_single_key_condition("GARBAGE NONSENSE", &item, &names, &values),
4991            "unrecognized expression must return false"
4992        );
4993    }
4994
4995    #[test]
4996    fn test_set_list_index_out_of_range_returns_error() {
4997        // SET list[N] where N > len must return a ValidationException,
4998        // not silently no-op.
4999        let mut item = HashMap::new();
5000        item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
5001
5002        let names: HashMap<String, String> = HashMap::new();
5003        let mut values = HashMap::new();
5004        values.insert(":v".to_string(), json!({"S": "z"}));
5005
5006        let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
5007        assert!(
5008            result.is_err(),
5009            "out-of-range list index must return an error"
5010        );
5011
5012        // List should be unchanged
5013        let list = item
5014            .get("items")
5015            .and_then(|v| v.get("L"))
5016            .and_then(|v| v.as_array())
5017            .unwrap();
5018        assert_eq!(list.len(), 2);
5019    }
5020
5021    #[test]
5022    fn test_set_list_index_on_non_list_returns_error() {
5023        // SET attr[0] = :v where attr is a string (not a list) must return
5024        // a ValidationException.
5025        let mut item = HashMap::new();
5026        item.insert("name".to_string(), json!({"S": "hello"}));
5027
5028        let names: HashMap<String, String> = HashMap::new();
5029        let mut values = HashMap::new();
5030        values.insert(":v".to_string(), json!({"S": "z"}));
5031
5032        let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
5033        assert!(
5034            result.is_err(),
5035            "list index on non-list attribute must return an error"
5036        );
5037    }
5038
5039    #[test]
5040    fn test_unrecognized_update_action_returns_error() {
5041        let mut item = HashMap::new();
5042        item.insert("name".to_string(), json!({"S": "hello"}));
5043
5044        let names: HashMap<String, String> = HashMap::new();
5045        let mut values = HashMap::new();
5046        values.insert(":bar".to_string(), json!({"S": "baz"}));
5047
5048        let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
5049        assert!(
5050            result.is_err(),
5051            "unrecognized UpdateExpression action must return an error"
5052        );
5053        let err_msg = format!("{}", result.unwrap_err());
5054        assert!(
5055            err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
5056            "error should mention Invalid UpdateExpression, got: {err_msg}"
5057        );
5058    }
5059
5060    // ── size() function tests ──────────────────────────────────────────
5061
5062    #[test]
5063    fn test_size_string() {
5064        let mut item = HashMap::new();
5065        item.insert("name".to_string(), json!({"S": "hello"}));
5066        let names = HashMap::new();
5067        let mut values = HashMap::new();
5068        values.insert(":limit".to_string(), json!({"N": "5"}));
5069
5070        assert!(evaluate_single_filter_condition(
5071            "size(name) = :limit",
5072            &item,
5073            &names,
5074            &values,
5075        ));
5076        values.insert(":limit".to_string(), json!({"N": "4"}));
5077        assert!(evaluate_single_filter_condition(
5078            "size(name) > :limit",
5079            &item,
5080            &names,
5081            &values,
5082        ));
5083    }
5084
5085    #[test]
5086    fn test_size_list() {
5087        let mut item = HashMap::new();
5088        item.insert(
5089            "items".to_string(),
5090            json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
5091        );
5092        let names = HashMap::new();
5093        let mut values = HashMap::new();
5094        values.insert(":limit".to_string(), json!({"N": "3"}));
5095
5096        assert!(evaluate_single_filter_condition(
5097            "size(items) = :limit",
5098            &item,
5099            &names,
5100            &values,
5101        ));
5102    }
5103
5104    #[test]
5105    fn test_size_map() {
5106        let mut item = HashMap::new();
5107        item.insert(
5108            "data".to_string(),
5109            json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
5110        );
5111        let names = HashMap::new();
5112        let mut values = HashMap::new();
5113        values.insert(":limit".to_string(), json!({"N": "2"}));
5114
5115        assert!(evaluate_single_filter_condition(
5116            "size(data) = :limit",
5117            &item,
5118            &names,
5119            &values,
5120        ));
5121    }
5122
5123    #[test]
5124    fn test_size_set() {
5125        let mut item = HashMap::new();
5126        item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
5127        let names = HashMap::new();
5128        let mut values = HashMap::new();
5129        values.insert(":limit".to_string(), json!({"N": "3"}));
5130
5131        assert!(evaluate_single_filter_condition(
5132            "size(tags) > :limit",
5133            &item,
5134            &names,
5135            &values,
5136        ));
5137    }
5138
5139    // ── attribute_type() function tests ────────────────────────────────
5140
5141    #[test]
5142    fn test_attribute_type_string() {
5143        let mut item = HashMap::new();
5144        item.insert("name".to_string(), json!({"S": "hello"}));
5145        let names = HashMap::new();
5146        let mut values = HashMap::new();
5147        values.insert(":t".to_string(), json!({"S": "S"}));
5148
5149        assert!(evaluate_single_filter_condition(
5150            "attribute_type(name, :t)",
5151            &item,
5152            &names,
5153            &values,
5154        ));
5155
5156        values.insert(":t".to_string(), json!({"S": "N"}));
5157        assert!(!evaluate_single_filter_condition(
5158            "attribute_type(name, :t)",
5159            &item,
5160            &names,
5161            &values,
5162        ));
5163    }
5164
5165    #[test]
5166    fn test_attribute_type_number() {
5167        let mut item = HashMap::new();
5168        item.insert("age".to_string(), json!({"N": "42"}));
5169        let names = HashMap::new();
5170        let mut values = HashMap::new();
5171        values.insert(":t".to_string(), json!({"S": "N"}));
5172
5173        assert!(evaluate_single_filter_condition(
5174            "attribute_type(age, :t)",
5175            &item,
5176            &names,
5177            &values,
5178        ));
5179    }
5180
5181    #[test]
5182    fn test_attribute_type_list() {
5183        let mut item = HashMap::new();
5184        item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
5185        let names = HashMap::new();
5186        let mut values = HashMap::new();
5187        values.insert(":t".to_string(), json!({"S": "L"}));
5188
5189        assert!(evaluate_single_filter_condition(
5190            "attribute_type(items, :t)",
5191            &item,
5192            &names,
5193            &values,
5194        ));
5195    }
5196
5197    #[test]
5198    fn test_attribute_type_map() {
5199        let mut item = HashMap::new();
5200        item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
5201        let names = HashMap::new();
5202        let mut values = HashMap::new();
5203        values.insert(":t".to_string(), json!({"S": "M"}));
5204
5205        assert!(evaluate_single_filter_condition(
5206            "attribute_type(data, :t)",
5207            &item,
5208            &names,
5209            &values,
5210        ));
5211    }
5212
5213    #[test]
5214    fn test_attribute_type_bool() {
5215        let mut item = HashMap::new();
5216        item.insert("active".to_string(), json!({"BOOL": true}));
5217        let names = HashMap::new();
5218        let mut values = HashMap::new();
5219        values.insert(":t".to_string(), json!({"S": "BOOL"}));
5220
5221        assert!(evaluate_single_filter_condition(
5222            "attribute_type(active, :t)",
5223            &item,
5224            &names,
5225            &values,
5226        ));
5227    }
5228
5229    // ── begins_with rejects non-string types ───────────────────────────
5230
5231    #[test]
5232    fn test_begins_with_rejects_number_type() {
5233        let mut item = HashMap::new();
5234        item.insert("code".to_string(), json!({"N": "12345"}));
5235        let names = HashMap::new();
5236        let mut values = HashMap::new();
5237        values.insert(":prefix".to_string(), json!({"S": "123"}));
5238
5239        assert!(
5240            !evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
5241            "begins_with must return false for N-type attributes"
5242        );
5243    }
5244
5245    #[test]
5246    fn test_begins_with_works_on_string_type() {
5247        let mut item = HashMap::new();
5248        item.insert("code".to_string(), json!({"S": "abc123"}));
5249        let names = HashMap::new();
5250        let mut values = HashMap::new();
5251        values.insert(":prefix".to_string(), json!({"S": "abc"}));
5252
5253        assert!(evaluate_single_filter_condition(
5254            "begins_with(code, :prefix)",
5255            &item,
5256            &names,
5257            &values,
5258        ));
5259    }
5260
5261    // ── contains on sets ───────────────────────────────────────────────
5262
5263    #[test]
5264    fn test_contains_string_set() {
5265        let mut item = HashMap::new();
5266        item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
5267        let names = HashMap::new();
5268        let mut values = HashMap::new();
5269        values.insert(":val".to_string(), json!({"S": "blue"}));
5270
5271        assert!(evaluate_single_filter_condition(
5272            "contains(tags, :val)",
5273            &item,
5274            &names,
5275            &values,
5276        ));
5277
5278        values.insert(":val".to_string(), json!({"S": "yellow"}));
5279        assert!(!evaluate_single_filter_condition(
5280            "contains(tags, :val)",
5281            &item,
5282            &names,
5283            &values,
5284        ));
5285    }
5286
5287    #[test]
5288    fn test_contains_number_set() {
5289        let mut item = HashMap::new();
5290        item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
5291        let names = HashMap::new();
5292        let mut values = HashMap::new();
5293        values.insert(":val".to_string(), json!({"N": "2"}));
5294
5295        assert!(evaluate_single_filter_condition(
5296            "contains(scores, :val)",
5297            &item,
5298            &names,
5299            &values,
5300        ));
5301    }
5302
5303    // ── SET arithmetic type validation ─────────────────────────────────
5304
5305    #[test]
5306    fn test_set_arithmetic_rejects_string_operand() {
5307        let mut item = HashMap::new();
5308        item.insert("name".to_string(), json!({"S": "hello"}));
5309        let names = HashMap::new();
5310        let mut values = HashMap::new();
5311        values.insert(":val".to_string(), json!({"N": "1"}));
5312
5313        let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
5314        assert!(
5315            result.is_err(),
5316            "arithmetic on S-type attribute must return a ValidationException"
5317        );
5318    }
5319
5320    #[test]
5321    fn test_set_arithmetic_rejects_string_value() {
5322        let mut item = HashMap::new();
5323        item.insert("count".to_string(), json!({"N": "5"}));
5324        let names = HashMap::new();
5325        let mut values = HashMap::new();
5326        values.insert(":val".to_string(), json!({"S": "notanumber"}));
5327
5328        let result =
5329            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5330        assert!(
5331            result.is_err(),
5332            "arithmetic with S-type value must return a ValidationException"
5333        );
5334    }
5335
5336    #[test]
5337    fn test_set_arithmetic_valid_numbers() {
5338        let mut item = HashMap::new();
5339        item.insert("count".to_string(), json!({"N": "10"}));
5340        let names = HashMap::new();
5341        let mut values = HashMap::new();
5342        values.insert(":val".to_string(), json!({"N": "3"}));
5343
5344        let result =
5345            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5346        assert!(result.is_ok());
5347        assert_eq!(item["count"], json!({"N": "13"}));
5348    }
5349
5350    // ── Binary Set (BS) support in ADD/DELETE ──────────────────────────
5351
5352    #[test]
5353    fn test_add_binary_set() {
5354        let mut item = HashMap::new();
5355        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
5356        let names = HashMap::new();
5357        let mut values = HashMap::new();
5358        values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
5359
5360        let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
5361        assert!(result.is_ok());
5362        let bs = item["data"]["BS"].as_array().unwrap();
5363        assert_eq!(bs.len(), 3, "should merge sets without duplicates");
5364        assert!(bs.contains(&json!("YQ==")));
5365        assert!(bs.contains(&json!("Yg==")));
5366        assert!(bs.contains(&json!("Yw==")));
5367    }
5368
5369    #[test]
5370    fn test_delete_binary_set() {
5371        let mut item = HashMap::new();
5372        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
5373        let names = HashMap::new();
5374        let mut values = HashMap::new();
5375        values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
5376
5377        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5378        assert!(result.is_ok());
5379        let bs = item["data"]["BS"].as_array().unwrap();
5380        assert_eq!(bs.len(), 2);
5381        assert!(!bs.contains(&json!("Yg==")));
5382    }
5383
5384    #[test]
5385    fn test_delete_binary_set_removes_attr_when_empty() {
5386        let mut item = HashMap::new();
5387        item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
5388        let names = HashMap::new();
5389        let mut values = HashMap::new();
5390        values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
5391
5392        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5393        assert!(result.is_ok());
5394        assert!(
5395            !item.contains_key("data"),
5396            "attribute should be removed when set becomes empty"
5397        );
5398    }
5399
5400    fn body_json(resp: &AwsResponse) -> Value {
5401        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
5402    }
5403
5404    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
5405        match result {
5406            Err(e) => e,
5407            Ok(_) => panic!("expected error, got Ok"),
5408        }
5409    }
5410
5411    // ── CreateTable ──
5412
5413    #[test]
5414    fn create_table_basic() {
5415        let svc = make_service();
5416        let req = make_request(
5417            "CreateTable",
5418            json!({
5419                "TableName": "my-table",
5420                "KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
5421                "AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"}],
5422                "BillingMode": "PAY_PER_REQUEST",
5423            }),
5424        );
5425        let resp = svc.create_table(&req).unwrap();
5426        let b = body_json(&resp);
5427        assert_eq!(b["TableDescription"]["TableName"], "my-table");
5428        assert_eq!(b["TableDescription"]["TableStatus"], "ACTIVE");
5429        assert!(b["TableDescription"]["TableArn"].as_str().is_some());
5430    }
5431
5432    #[test]
5433    fn create_table_with_sort_key_and_gsi() {
5434        let svc = make_service();
5435        let req = make_request(
5436            "CreateTable",
5437            json!({
5438                "TableName": "gsi-table",
5439                "KeySchema": [
5440                    {"AttributeName": "pk", "KeyType": "HASH"},
5441                    {"AttributeName": "sk", "KeyType": "RANGE"},
5442                ],
5443                "AttributeDefinitions": [
5444                    {"AttributeName": "pk", "AttributeType": "S"},
5445                    {"AttributeName": "sk", "AttributeType": "S"},
5446                    {"AttributeName": "gsi_key", "AttributeType": "N"},
5447                ],
5448                "GlobalSecondaryIndexes": [{
5449                    "IndexName": "gsi1",
5450                    "KeySchema": [{"AttributeName": "gsi_key", "KeyType": "HASH"}],
5451                    "Projection": {"ProjectionType": "ALL"},
5452                }],
5453                "BillingMode": "PAY_PER_REQUEST",
5454            }),
5455        );
5456        let resp = svc.create_table(&req).unwrap();
5457        let b = body_json(&resp);
5458        let gsi = b["TableDescription"]["GlobalSecondaryIndexes"]
5459            .as_array()
5460            .unwrap();
5461        assert_eq!(gsi.len(), 1);
5462        assert_eq!(gsi[0]["IndexName"], "gsi1");
5463    }
5464
5465    #[test]
5466    fn create_table_duplicate_fails() {
5467        let svc = make_service();
5468        create_test_table(&svc);
5469
5470        let req = make_request(
5471            "CreateTable",
5472            json!({
5473                "TableName": "test-table",
5474                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
5475                "AttributeDefinitions": [{"AttributeName": "pk", "AttributeType": "S"}],
5476                "BillingMode": "PAY_PER_REQUEST",
5477            }),
5478        );
5479        let err = expect_err(svc.create_table(&req));
5480        assert!(err.to_string().contains("ResourceInUseException"));
5481    }
5482
5483    #[test]
5484    fn create_table_missing_key_attr_in_definitions() {
5485        let svc = make_service();
5486        let req = make_request(
5487            "CreateTable",
5488            json!({
5489                "TableName": "bad",
5490                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
5491                "AttributeDefinitions": [{"AttributeName": "other", "AttributeType": "S"}],
5492                "BillingMode": "PAY_PER_REQUEST",
5493            }),
5494        );
5495        let err = expect_err(svc.create_table(&req));
5496        assert!(err.to_string().contains("ValidationException"));
5497    }
5498
5499    // ── DescribeTable ──
5500
5501    #[test]
5502    fn describe_table_found() {
5503        let svc = make_service();
5504        create_test_table(&svc);
5505
5506        let req = make_request("DescribeTable", json!({"TableName": "test-table"}));
5507        let resp = svc.describe_table(&req).unwrap();
5508        let b = body_json(&resp);
5509        assert_eq!(b["Table"]["TableName"], "test-table");
5510        assert_eq!(b["Table"]["TableStatus"], "ACTIVE");
5511    }
5512
5513    #[test]
5514    fn describe_table_not_found() {
5515        let svc = make_service();
5516        let req = make_request("DescribeTable", json!({"TableName": "nope"}));
5517        let err = expect_err(svc.describe_table(&req));
5518        assert!(err.to_string().contains("ResourceNotFoundException"));
5519    }
5520
5521    // ── DeleteTable ──
5522
5523    #[test]
5524    fn delete_table_removes_table() {
5525        let svc = make_service();
5526        create_test_table(&svc);
5527
5528        let req = make_request("DeleteTable", json!({"TableName": "test-table"}));
5529        let resp = svc.delete_table(&req).unwrap();
5530        let b = body_json(&resp);
5531        assert_eq!(b["TableDescription"]["TableName"], "test-table");
5532
5533        // Should be gone
5534        let req = make_request("DescribeTable", json!({"TableName": "test-table"}));
5535        assert!(svc.describe_table(&req).is_err());
5536    }
5537
5538    // ── ListTables ──
5539
5540    #[test]
5541    fn list_tables_returns_names() {
5542        let svc = make_service();
5543        create_test_table(&svc);
5544
5545        let req = make_request("ListTables", json!({}));
5546        let resp = svc.list_tables(&req).unwrap();
5547        let b = body_json(&resp);
5548        let names = b["TableNames"].as_array().unwrap();
5549        assert!(names.iter().any(|n| n == "test-table"));
5550    }
5551
5552    // ── PutItem / GetItem / DeleteItem ──
5553
5554    #[test]
5555    fn put_and_get_item() {
5556        let svc = make_service();
5557        create_test_table(&svc);
5558
5559        let req = make_request(
5560            "PutItem",
5561            json!({
5562                "TableName": "test-table",
5563                "Item": {
5564                    "pk": {"S": "key1"},
5565                    "name": {"S": "Alice"},
5566                    "age": {"N": "30"},
5567                },
5568            }),
5569        );
5570        svc.put_item(&req).unwrap();
5571
5572        let req = make_request(
5573            "GetItem",
5574            json!({
5575                "TableName": "test-table",
5576                "Key": {"pk": {"S": "key1"}},
5577            }),
5578        );
5579        let resp = svc.get_item(&req).unwrap();
5580        let b = body_json(&resp);
5581        assert_eq!(b["Item"]["name"]["S"], "Alice");
5582        assert_eq!(b["Item"]["age"]["N"], "30");
5583    }
5584
5585    #[test]
5586    fn get_item_not_found() {
5587        let svc = make_service();
5588        create_test_table(&svc);
5589
5590        let req = make_request(
5591            "GetItem",
5592            json!({
5593                "TableName": "test-table",
5594                "Key": {"pk": {"S": "nonexistent"}},
5595            }),
5596        );
5597        let resp = svc.get_item(&req).unwrap();
5598        let b = body_json(&resp);
5599        assert!(b.get("Item").is_none() || b["Item"].is_null());
5600    }
5601
5602    #[test]
5603    fn delete_item_removes_item() {
5604        let svc = make_service();
5605        create_test_table(&svc);
5606
5607        let req = make_request(
5608            "PutItem",
5609            json!({
5610                "TableName": "test-table",
5611                "Item": {"pk": {"S": "del-me"}},
5612            }),
5613        );
5614        svc.put_item(&req).unwrap();
5615
5616        let req = make_request(
5617            "DeleteItem",
5618            json!({
5619                "TableName": "test-table",
5620                "Key": {"pk": {"S": "del-me"}},
5621            }),
5622        );
5623        svc.delete_item(&req).unwrap();
5624
5625        let req = make_request(
5626            "GetItem",
5627            json!({
5628                "TableName": "test-table",
5629                "Key": {"pk": {"S": "del-me"}},
5630            }),
5631        );
5632        let resp = svc.get_item(&req).unwrap();
5633        let b = body_json(&resp);
5634        assert!(b.get("Item").is_none() || b["Item"].is_null());
5635    }
5636
5637    #[test]
5638    fn put_item_returns_old_item() {
5639        let svc = make_service();
5640        create_test_table(&svc);
5641
5642        let req = make_request(
5643            "PutItem",
5644            json!({
5645                "TableName": "test-table",
5646                "Item": {"pk": {"S": "overwrite"}, "v": {"N": "1"}},
5647            }),
5648        );
5649        svc.put_item(&req).unwrap();
5650
5651        let req = make_request(
5652            "PutItem",
5653            json!({
5654                "TableName": "test-table",
5655                "Item": {"pk": {"S": "overwrite"}, "v": {"N": "2"}},
5656                "ReturnValues": "ALL_OLD",
5657            }),
5658        );
5659        let resp = svc.put_item(&req).unwrap();
5660        let b = body_json(&resp);
5661        assert_eq!(b["Attributes"]["v"]["N"], "1");
5662    }
5663
5664    // ── UpdateItem ──
5665
5666    #[test]
5667    fn update_item_set_attribute() {
5668        let svc = make_service();
5669        create_test_table(&svc);
5670
5671        let req = make_request(
5672            "PutItem",
5673            json!({
5674                "TableName": "test-table",
5675                "Item": {"pk": {"S": "upd"}, "count": {"N": "0"}},
5676            }),
5677        );
5678        svc.put_item(&req).unwrap();
5679
5680        let req = make_request(
5681            "UpdateItem",
5682            json!({
5683                "TableName": "test-table",
5684                "Key": {"pk": {"S": "upd"}},
5685                "UpdateExpression": "SET #c = :val",
5686                "ExpressionAttributeNames": {"#c": "count"},
5687                "ExpressionAttributeValues": {":val": {"N": "42"}},
5688                "ReturnValues": "ALL_NEW",
5689            }),
5690        );
5691        let resp = svc.update_item(&req).unwrap();
5692        let b = body_json(&resp);
5693        assert_eq!(b["Attributes"]["count"]["N"], "42");
5694    }
5695
5696    // ── Query ──
5697
5698    #[test]
5699    fn query_returns_matching_items() {
5700        let svc = make_service();
5701        // Table with hash+range
5702        let req = make_request(
5703            "CreateTable",
5704            json!({
5705                "TableName": "query-table",
5706                "KeySchema": [
5707                    {"AttributeName": "pk", "KeyType": "HASH"},
5708                    {"AttributeName": "sk", "KeyType": "RANGE"},
5709                ],
5710                "AttributeDefinitions": [
5711                    {"AttributeName": "pk", "AttributeType": "S"},
5712                    {"AttributeName": "sk", "AttributeType": "S"},
5713                ],
5714                "BillingMode": "PAY_PER_REQUEST",
5715            }),
5716        );
5717        svc.create_table(&req).unwrap();
5718
5719        for i in 0..3 {
5720            let req = make_request(
5721                "PutItem",
5722                json!({
5723                    "TableName": "query-table",
5724                    "Item": {
5725                        "pk": {"S": "user1"},
5726                        "sk": {"S": format!("item-{i}")},
5727                    },
5728                }),
5729            );
5730            svc.put_item(&req).unwrap();
5731        }
5732        // Different partition
5733        let req = make_request(
5734            "PutItem",
5735            json!({
5736                "TableName": "query-table",
5737                "Item": {"pk": {"S": "user2"}, "sk": {"S": "item-0"}},
5738            }),
5739        );
5740        svc.put_item(&req).unwrap();
5741
5742        let req = make_request(
5743            "Query",
5744            json!({
5745                "TableName": "query-table",
5746                "KeyConditionExpression": "pk = :pk",
5747                "ExpressionAttributeValues": {":pk": {"S": "user1"}},
5748            }),
5749        );
5750        let resp = svc.query(&req).unwrap();
5751        let b = body_json(&resp);
5752        assert_eq!(b["Count"], 3);
5753        assert_eq!(b["Items"].as_array().unwrap().len(), 3);
5754    }
5755
5756    // ── Scan ──
5757
5758    #[test]
5759    fn scan_returns_all_items() {
5760        let svc = make_service();
5761        create_test_table(&svc);
5762
5763        for i in 0..5 {
5764            let req = make_request(
5765                "PutItem",
5766                json!({
5767                    "TableName": "test-table",
5768                    "Item": {"pk": {"S": format!("scan-{i}")}},
5769                }),
5770            );
5771            svc.put_item(&req).unwrap();
5772        }
5773
5774        let req = make_request("Scan", json!({"TableName": "test-table"}));
5775        let resp = svc.scan(&req).unwrap();
5776        let b = body_json(&resp);
5777        assert_eq!(b["Count"], 5);
5778    }
5779
5780    // ── BatchWriteItem / BatchGetItem ──
5781
5782    #[test]
5783    fn batch_write_and_get_items() {
5784        let svc = make_service();
5785        create_test_table(&svc);
5786
5787        let req = make_request(
5788            "BatchWriteItem",
5789            json!({
5790                "RequestItems": {
5791                    "test-table": [
5792                        {"PutRequest": {"Item": {"pk": {"S": "b1"}, "val": {"S": "v1"}}}},
5793                        {"PutRequest": {"Item": {"pk": {"S": "b2"}, "val": {"S": "v2"}}}},
5794                        {"PutRequest": {"Item": {"pk": {"S": "b3"}, "val": {"S": "v3"}}}},
5795                    ]
5796                }
5797            }),
5798        );
5799        let resp = svc.batch_write_item(&req).unwrap();
5800        let b = body_json(&resp);
5801        // Unprocessed should be empty
5802        assert!(
5803            b["UnprocessedItems"].as_object().unwrap().is_empty()
5804                || b["UnprocessedItems"]["test-table"]
5805                    .as_array()
5806                    .is_none_or(|a| a.is_empty())
5807        );
5808
5809        // BatchGetItem
5810        let req = make_request(
5811            "BatchGetItem",
5812            json!({
5813                "RequestItems": {
5814                    "test-table": {
5815                        "Keys": [
5816                            {"pk": {"S": "b1"}},
5817                            {"pk": {"S": "b2"}},
5818                            {"pk": {"S": "b3"}},
5819                        ]
5820                    }
5821                }
5822            }),
5823        );
5824        let resp = svc.batch_get_item(&req).unwrap();
5825        let b = body_json(&resp);
5826        let items = b["Responses"]["test-table"].as_array().unwrap();
5827        assert_eq!(items.len(), 3);
5828    }
5829
5830    // ── TransactWriteItems / TransactGetItems ──
5831
5832    #[test]
5833    fn transact_write_and_get() {
5834        let svc = make_service();
5835        create_test_table(&svc);
5836
5837        let req = make_request(
5838            "TransactWriteItems",
5839            json!({
5840                "TransactItems": [
5841                    {"Put": {"TableName": "test-table", "Item": {"pk": {"S": "tx1"}}}},
5842                    {"Put": {"TableName": "test-table", "Item": {"pk": {"S": "tx2"}}}},
5843                ]
5844            }),
5845        );
5846        svc.transact_write_items(&req).unwrap();
5847
5848        let req = make_request(
5849            "TransactGetItems",
5850            json!({
5851                "TransactItems": [
5852                    {"Get": {"TableName": "test-table", "Key": {"pk": {"S": "tx1"}}}},
5853                    {"Get": {"TableName": "test-table", "Key": {"pk": {"S": "tx2"}}}},
5854                ]
5855            }),
5856        );
5857        let resp = svc.transact_get_items(&req).unwrap();
5858        let b = body_json(&resp);
5859        let responses = b["Responses"].as_array().unwrap();
5860        assert_eq!(responses.len(), 2);
5861    }
5862
5863    // ── TagResource / UntagResource / ListTagsOfResource ──
5864
5865    #[test]
5866    fn tag_operations() {
5867        let svc = make_service();
5868        create_test_table(&svc);
5869        let arn = {
5870            let s = svc.state.read();
5871            s.default_ref()
5872                .tables
5873                .get("test-table")
5874                .unwrap()
5875                .arn
5876                .clone()
5877        };
5878
5879        let req = make_request(
5880            "TagResource",
5881            json!({
5882                "ResourceArn": arn,
5883                "Tags": [{"Key": "env", "Value": "test"}],
5884            }),
5885        );
5886        svc.tag_resource(&req).unwrap();
5887
5888        let req = make_request("ListTagsOfResource", json!({"ResourceArn": arn}));
5889        let resp = svc.list_tags_of_resource(&req).unwrap();
5890        let b = body_json(&resp);
5891        let tags = b["Tags"].as_array().unwrap();
5892        assert_eq!(tags.len(), 1);
5893        assert_eq!(tags[0]["Key"], "env");
5894
5895        let req = make_request(
5896            "UntagResource",
5897            json!({
5898                "ResourceArn": arn,
5899                "TagKeys": ["env"],
5900            }),
5901        );
5902        svc.untag_resource(&req).unwrap();
5903
5904        let req = make_request("ListTagsOfResource", json!({"ResourceArn": arn}));
5905        let resp = svc.list_tags_of_resource(&req).unwrap();
5906        let b = body_json(&resp);
5907        assert!(b["Tags"].as_array().unwrap().is_empty());
5908    }
5909
5910    // ── UpdateTable ──
5911
5912    #[test]
5913    fn update_table_add_gsi() {
5914        let svc = make_service();
5915        let req = make_request(
5916            "CreateTable",
5917            json!({
5918                "TableName": "upd-table",
5919                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
5920                "AttributeDefinitions": [
5921                    {"AttributeName": "pk", "AttributeType": "S"},
5922                    {"AttributeName": "gk", "AttributeType": "S"},
5923                ],
5924                "BillingMode": "PAY_PER_REQUEST",
5925            }),
5926        );
5927        svc.create_table(&req).unwrap();
5928
5929        let req = make_request(
5930            "UpdateTable",
5931            json!({
5932                "TableName": "upd-table",
5933                "GlobalSecondaryIndexUpdates": [{
5934                    "Create": {
5935                        "IndexName": "new-gsi",
5936                        "KeySchema": [{"AttributeName": "gk", "KeyType": "HASH"}],
5937                        "Projection": {"ProjectionType": "ALL"},
5938                    }
5939                }],
5940            }),
5941        );
5942        let resp = svc.update_table(&req).unwrap();
5943        let b = body_json(&resp);
5944        let gsi = b["TableDescription"]["GlobalSecondaryIndexes"]
5945            .as_array()
5946            .unwrap();
5947        assert_eq!(gsi.len(), 1);
5948        assert_eq!(gsi[0]["IndexName"], "new-gsi");
5949    }
5950
5951    // ── Scan with FilterExpression ──
5952
5953    #[test]
5954    fn scan_with_filter_expression() {
5955        let svc = make_service();
5956        create_test_table(&svc);
5957
5958        for i in 0..5 {
5959            let req = make_request(
5960                "PutItem",
5961                json!({
5962                    "TableName": "test-table",
5963                    "Item": {
5964                        "pk": {"S": format!("f-{i}")},
5965                        "status": {"S": if i % 2 == 0 { "active" } else { "inactive" }},
5966                    },
5967                }),
5968            );
5969            svc.put_item(&req).unwrap();
5970        }
5971
5972        let req = make_request(
5973            "Scan",
5974            json!({
5975                "TableName": "test-table",
5976                "FilterExpression": "#s = :val",
5977                "ExpressionAttributeNames": {"#s": "status"},
5978                "ExpressionAttributeValues": {":val": {"S": "active"}},
5979            }),
5980        );
5981        let resp = svc.scan(&req).unwrap();
5982        let b = body_json(&resp);
5983        assert_eq!(b["Count"], 3);
5984    }
5985
5986    // ── PartiQL operations (batch.rs coverage) ──
5987
5988    #[test]
5989    fn execute_statement_select() {
5990        let svc = make_service();
5991        create_test_table(&svc);
5992
5993        let req = make_request(
5994            "PutItem",
5995            json!({"TableName": "test-table", "Item": {"pk": {"S": "qs1"}, "val": {"S": "hello"}}}),
5996        );
5997        svc.put_item(&req).unwrap();
5998
5999        let req = make_request(
6000            "ExecuteStatement",
6001            json!({"Statement": "SELECT * FROM \"test-table\" WHERE pk='qs1'"}),
6002        );
6003        let resp = svc.execute_statement(&req).unwrap();
6004        let b = body_json(&resp);
6005        assert!(!b["Items"].as_array().unwrap().is_empty());
6006    }
6007
6008    #[test]
6009    fn execute_statement_insert() {
6010        let svc = make_service();
6011        create_test_table(&svc);
6012
6013        let req = make_request(
6014            "ExecuteStatement",
6015            json!({"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'ins1', 'data': 'val'}"}),
6016        );
6017        svc.execute_statement(&req).unwrap();
6018
6019        let req = make_request(
6020            "GetItem",
6021            json!({"TableName": "test-table", "Key": {"pk": {"S": "ins1"}}}),
6022        );
6023        let resp = svc.get_item(&req).unwrap();
6024        let b = body_json(&resp);
6025        assert_eq!(b["Item"]["data"]["S"], "val");
6026    }
6027
6028    #[test]
6029    fn batch_execute_statement() {
6030        let svc = make_service();
6031        create_test_table(&svc);
6032
6033        let req = make_request(
6034            "PutItem",
6035            json!({"TableName": "test-table", "Item": {"pk": {"S": "be1"}}}),
6036        );
6037        svc.put_item(&req).unwrap();
6038
6039        let req = make_request(
6040            "BatchExecuteStatement",
6041            json!({
6042                "Statements": [
6043                    {"Statement": "SELECT * FROM \"test-table\" WHERE pk='be1'"},
6044                ]
6045            }),
6046        );
6047        let resp = svc.batch_execute_statement(&req).unwrap();
6048        let b = body_json(&resp);
6049        assert!(b["Responses"].as_array().is_some());
6050    }
6051
6052    #[test]
6053    fn execute_transaction() {
6054        let svc = make_service();
6055        create_test_table(&svc);
6056
6057        let req = make_request(
6058            "ExecuteTransaction",
6059            json!({
6060                "TransactStatements": [
6061                    {"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'tx1'}"},
6062                    {"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'tx2'}"},
6063                ]
6064            }),
6065        );
6066        svc.execute_transaction(&req).unwrap();
6067
6068        let req = make_request(
6069            "GetItem",
6070            json!({"TableName": "test-table", "Key": {"pk": {"S": "tx1"}}}),
6071        );
6072        let resp = svc.get_item(&req).unwrap();
6073        let b = body_json(&resp);
6074        assert!(b["Item"].is_object());
6075    }
6076
6077    // ── Batch write with delete ──
6078
6079    #[test]
6080    fn batch_write_with_delete_requests() {
6081        let svc = make_service();
6082        create_test_table(&svc);
6083
6084        // Put items first
6085        for key in &["bwd1", "bwd2", "bwd3"] {
6086            let req = make_request(
6087                "PutItem",
6088                json!({"TableName": "test-table", "Item": {"pk": {"S": key}}}),
6089            );
6090            svc.put_item(&req).unwrap();
6091        }
6092
6093        // Batch delete two
6094        let req = make_request(
6095            "BatchWriteItem",
6096            json!({
6097                "RequestItems": {
6098                    "test-table": [
6099                        {"DeleteRequest": {"Key": {"pk": {"S": "bwd1"}}}},
6100                        {"DeleteRequest": {"Key": {"pk": {"S": "bwd2"}}}},
6101                    ]
6102                }
6103            }),
6104        );
6105        svc.batch_write_item(&req).unwrap();
6106
6107        // bwd3 should still exist
6108        let req = make_request(
6109            "GetItem",
6110            json!({"TableName": "test-table", "Key": {"pk": {"S": "bwd3"}}}),
6111        );
6112        let resp = svc.get_item(&req).unwrap();
6113        let b = body_json(&resp);
6114        assert!(b["Item"].is_object());
6115
6116        // bwd1 should be gone
6117        let req = make_request(
6118            "GetItem",
6119            json!({"TableName": "test-table", "Key": {"pk": {"S": "bwd1"}}}),
6120        );
6121        let resp = svc.get_item(&req).unwrap();
6122        let b = body_json(&resp);
6123        assert!(b.get("Item").is_none() || b["Item"].is_null());
6124    }
6125
6126    // ── Query with sort key condition ──
6127
6128    #[test]
6129    fn query_with_sort_key_begins_with() {
6130        let svc = make_service();
6131        // Table with hash+range
6132        let req = make_request(
6133            "CreateTable",
6134            json!({
6135                "TableName": "sk-table",
6136                "KeySchema": [
6137                    {"AttributeName": "pk", "KeyType": "HASH"},
6138                    {"AttributeName": "sk", "KeyType": "RANGE"},
6139                ],
6140                "AttributeDefinitions": [
6141                    {"AttributeName": "pk", "AttributeType": "S"},
6142                    {"AttributeName": "sk", "AttributeType": "S"},
6143                ],
6144                "BillingMode": "PAY_PER_REQUEST",
6145            }),
6146        );
6147        svc.create_table(&req).unwrap();
6148
6149        for sk in &["order#001", "order#002", "profile#main"] {
6150            let req = make_request(
6151                "PutItem",
6152                json!({"TableName": "sk-table", "Item": {"pk": {"S": "u1"}, "sk": {"S": sk}}}),
6153            );
6154            svc.put_item(&req).unwrap();
6155        }
6156
6157        let req = make_request(
6158            "Query",
6159            json!({
6160                "TableName": "sk-table",
6161                "KeyConditionExpression": "pk = :pk AND begins_with(sk, :prefix)",
6162                "ExpressionAttributeValues": {":pk": {"S": "u1"}, ":prefix": {"S": "order#"}},
6163            }),
6164        );
6165        let resp = svc.query(&req).unwrap();
6166        let b = body_json(&resp);
6167        assert_eq!(b["Count"], 2);
6168    }
6169
6170    // ── Scan with limit ──
6171
6172    #[test]
6173    fn scan_with_limit() {
6174        let svc = make_service();
6175        create_test_table(&svc);
6176
6177        for i in 0..10 {
6178            let req = make_request(
6179                "PutItem",
6180                json!({"TableName": "test-table", "Item": {"pk": {"S": format!("lim{i}")}}}),
6181            );
6182            svc.put_item(&req).unwrap();
6183        }
6184
6185        let req = make_request("Scan", json!({"TableName": "test-table", "Limit": 3}));
6186        let resp = svc.scan(&req).unwrap();
6187        let b = body_json(&resp);
6188        assert_eq!(b["Count"], 3);
6189        assert!(b["LastEvaluatedKey"].is_object());
6190    }
6191
6192    // ── Error branches ──
6193
6194    #[test]
6195    fn batch_get_item_table_not_found() {
6196        let svc = make_service();
6197        let req = make_request(
6198            "BatchGetItem",
6199            json!({"RequestItems": {"ghost": {"Keys": [{"pk": {"S": "k"}}]}}}),
6200        );
6201        assert!(svc.batch_get_item(&req).is_err());
6202    }
6203
6204    #[test]
6205    fn batch_write_item_table_not_found() {
6206        let svc = make_service();
6207        let req = make_request(
6208            "BatchWriteItem",
6209            json!({"RequestItems": {"ghost": [{"PutRequest": {"Item": {"pk": {"S": "k"}}}}]}}),
6210        );
6211        assert!(svc.batch_write_item(&req).is_err());
6212    }
6213
6214    // ── Global tables ──
6215
6216    #[test]
6217    fn create_and_describe_global_table() {
6218        let svc = make_service();
6219        create_test_table(&svc);
6220
6221        let req = make_request(
6222            "CreateGlobalTable",
6223            json!({
6224                "GlobalTableName": "test-table",
6225                "ReplicationGroup": [{"RegionName": "us-east-1"}, {"RegionName": "eu-west-1"}],
6226            }),
6227        );
6228        svc.create_global_table(&req).unwrap();
6229
6230        let req = make_request(
6231            "DescribeGlobalTable",
6232            json!({"GlobalTableName": "test-table"}),
6233        );
6234        let resp = svc.describe_global_table(&req).unwrap();
6235        let b = body_json(&resp);
6236        assert!(b["GlobalTableDescription"].is_object());
6237    }
6238
6239    #[test]
6240    fn list_global_tables() {
6241        let svc = make_service();
6242        let req = make_request("ListGlobalTables", json!({}));
6243        let resp = svc.list_global_tables(&req).unwrap();
6244        let b = body_json(&resp);
6245        assert!(b["GlobalTables"].as_array().is_some());
6246    }
6247
6248    // ── Backup operations ──
6249
6250    #[test]
6251    fn create_and_list_backups() {
6252        let svc = make_service();
6253        create_test_table(&svc);
6254
6255        let req = make_request(
6256            "CreateBackup",
6257            json!({"TableName": "test-table", "BackupName": "bak1"}),
6258        );
6259        let resp = svc.create_backup(&req).unwrap();
6260        let b = body_json(&resp);
6261        assert!(b["BackupDetails"]["BackupArn"].as_str().is_some());
6262
6263        let req = make_request("ListBackups", json!({}));
6264        let resp = svc.list_backups(&req).unwrap();
6265        let b = body_json(&resp);
6266        assert!(!b["BackupSummaries"].as_array().unwrap().is_empty());
6267    }
6268
6269    // ── Import/Export ──
6270
6271    #[test]
6272    fn describe_import_not_found() {
6273        let svc = make_service();
6274        let req = make_request(
6275            "DescribeImport",
6276            json!({"ImportArn": "arn:aws:dynamodb:us-east-1:123:table/t/import/ghost"}),
6277        );
6278        assert!(svc.describe_import(&req).is_err());
6279    }
6280
6281    #[test]
6282    fn describe_export_not_found() {
6283        let svc = make_service();
6284        let req = make_request(
6285            "DescribeExport",
6286            json!({"ExportArn": "arn:aws:dynamodb:us-east-1:123:table/t/export/ghost"}),
6287        );
6288        assert!(svc.describe_export(&req).is_err());
6289    }
6290
6291    // ── tables.rs error branches ──
6292
6293    #[test]
6294    fn create_table_missing_name_errors() {
6295        let svc = make_service();
6296        let req = make_request(
6297            "CreateTable",
6298            json!({
6299                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6300                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6301                "BillingMode": "PAY_PER_REQUEST"
6302            }),
6303        );
6304        assert!(svc.create_table(&req).is_err());
6305    }
6306
6307    #[test]
6308    fn create_table_duplicate_errors() {
6309        let svc = make_service();
6310        let req = make_request(
6311            "CreateTable",
6312            json!({
6313                "TableName": "dup",
6314                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6315                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6316                "BillingMode": "PAY_PER_REQUEST"
6317            }),
6318        );
6319        svc.create_table(&req).unwrap();
6320        assert!(svc.create_table(&req).is_err());
6321    }
6322
6323    #[test]
6324    fn delete_table_missing_name_errors() {
6325        let svc = make_service();
6326        let req = make_request("DeleteTable", json!({}));
6327        assert!(svc.delete_table(&req).is_err());
6328    }
6329
6330    #[test]
6331    fn delete_table_not_found_errors() {
6332        let svc = make_service();
6333        let req = make_request("DeleteTable", json!({"TableName": "ghost"}));
6334        assert!(svc.delete_table(&req).is_err());
6335    }
6336
6337    #[test]
6338    fn describe_table_missing_name_errors() {
6339        let svc = make_service();
6340        let req = make_request("DescribeTable", json!({}));
6341        assert!(svc.describe_table(&req).is_err());
6342    }
6343
6344    #[test]
6345    fn describe_table_not_found_errors() {
6346        let svc = make_service();
6347        let req = make_request("DescribeTable", json!({"TableName": "ghost"}));
6348        assert!(svc.describe_table(&req).is_err());
6349    }
6350
6351    #[test]
6352    fn update_table_missing_name_errors() {
6353        let svc = make_service();
6354        let req = make_request("UpdateTable", json!({}));
6355        assert!(svc.update_table(&req).is_err());
6356    }
6357
6358    #[test]
6359    fn update_table_not_found_errors() {
6360        let svc = make_service();
6361        let req = make_request("UpdateTable", json!({"TableName": "ghost"}));
6362        assert!(svc.update_table(&req).is_err());
6363    }
6364
6365    #[test]
6366    fn list_tables_pagination() {
6367        let svc = make_service();
6368        for i in 0..5 {
6369            let req = make_request(
6370                "CreateTable",
6371                json!({
6372                    "TableName": format!("pt{i}"),
6373                    "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6374                    "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6375                    "BillingMode": "PAY_PER_REQUEST"
6376                }),
6377            );
6378            svc.create_table(&req).unwrap();
6379        }
6380        let req = make_request("ListTables", json!({"Limit": 2}));
6381        let resp = svc.list_tables(&req).unwrap();
6382        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6383        assert_eq!(body["TableNames"].as_array().unwrap().len(), 2);
6384        assert!(body["LastEvaluatedTableName"].is_string());
6385    }
6386
6387    #[test]
6388    fn list_tables_start_exclusive() {
6389        let svc = make_service();
6390        for i in 0..3 {
6391            let req = make_request(
6392                "CreateTable",
6393                json!({
6394                    "TableName": format!("pt{i}"),
6395                    "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6396                    "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6397                    "BillingMode": "PAY_PER_REQUEST"
6398                }),
6399            );
6400            svc.create_table(&req).unwrap();
6401        }
6402        let req = make_request("ListTables", json!({"ExclusiveStartTableName": "pt0"}));
6403        let resp = svc.list_tables(&req).unwrap();
6404        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6405        let names = body["TableNames"].as_array().unwrap();
6406        assert!(!names.iter().any(|n| n == "pt0"));
6407    }
6408
6409    #[test]
6410    fn update_time_to_live_unknown_table_errors() {
6411        let svc = make_service();
6412        let req = make_request(
6413            "UpdateTimeToLive",
6414            json!({
6415                "TableName": "ghost",
6416                "TimeToLiveSpecification": {"Enabled": true, "AttributeName": "ttl"}
6417            }),
6418        );
6419        assert!(svc.update_time_to_live(&req).is_err());
6420    }
6421
6422    #[test]
6423    fn describe_time_to_live_unknown_table_errors() {
6424        let svc = make_service();
6425        let req = make_request("DescribeTimeToLive", json!({"TableName": "ghost"}));
6426        assert!(svc.describe_time_to_live(&req).is_err());
6427    }
6428
6429    // ── resource policy ──
6430
6431    #[test]
6432    fn put_resource_policy_missing_policy_errors() {
6433        let svc = make_service();
6434        let req = make_request(
6435            "CreateTable",
6436            json!({
6437                "TableName": "rp",
6438                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6439                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6440                "BillingMode": "PAY_PER_REQUEST"
6441            }),
6442        );
6443        svc.create_table(&req).unwrap();
6444        let req = make_request(
6445            "PutResourcePolicy",
6446            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/rp"}),
6447        );
6448        assert!(svc.put_resource_policy(&req).is_err());
6449    }
6450
6451    #[test]
6452    fn get_resource_policy_unknown_table_errors() {
6453        let svc = make_service();
6454        let req = make_request(
6455            "GetResourcePolicy",
6456            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost"}),
6457        );
6458        assert!(svc.get_resource_policy(&req).is_err());
6459    }
6460
6461    // ── tags ──
6462
6463    #[test]
6464    fn tag_resource_unknown_table_errors() {
6465        let svc = make_service();
6466        let req = make_request(
6467            "TagResource",
6468            json!({
6469                "ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost",
6470                "Tags": [{"Key": "k", "Value": "v"}]
6471            }),
6472        );
6473        assert!(svc.tag_resource(&req).is_err());
6474    }
6475
6476    #[test]
6477    fn list_tags_unknown_table_errors() {
6478        let svc = make_service();
6479        let req = make_request(
6480            "ListTagsOfResource",
6481            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost"}),
6482        );
6483        assert!(svc.list_tags_of_resource(&req).is_err());
6484    }
6485
6486    // ── backups ──
6487
6488    #[test]
6489    fn create_backup_unknown_table_errors() {
6490        let svc = make_service();
6491        let req = make_request(
6492            "CreateBackup",
6493            json!({"TableName": "ghost", "BackupName": "b1"}),
6494        );
6495        assert!(svc.create_backup(&req).is_err());
6496    }
6497
6498    #[test]
6499    fn delete_backup_not_found_errors() {
6500        let svc = make_service();
6501        let req = make_request(
6502            "DeleteBackup",
6503            json!({"BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"}),
6504        );
6505        assert!(svc.delete_backup(&req).is_err());
6506    }
6507
6508    #[test]
6509    fn describe_backup_not_found_errors() {
6510        let svc = make_service();
6511        let req = make_request(
6512            "DescribeBackup",
6513            json!({"BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"}),
6514        );
6515        assert!(svc.describe_backup(&req).is_err());
6516    }
6517
6518    #[test]
6519    fn restore_table_from_backup_not_found_errors() {
6520        let svc = make_service();
6521        let req = make_request(
6522            "RestoreTableFromBackup",
6523            json!({
6524                "TargetTableName": "restored",
6525                "BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"
6526            }),
6527        );
6528        assert!(svc.restore_table_from_backup(&req).is_err());
6529    }
6530
6531    #[test]
6532    fn update_continuous_backups_unknown_table_errors() {
6533        let svc = make_service();
6534        let req = make_request(
6535            "UpdateContinuousBackups",
6536            json!({
6537                "TableName": "ghost",
6538                "PointInTimeRecoverySpecification": {"PointInTimeRecoveryEnabled": true}
6539            }),
6540        );
6541        assert!(svc.update_continuous_backups(&req).is_err());
6542    }
6543
6544    // ── items.rs: put_item error branches ──
6545
6546    #[test]
6547    fn put_item_unknown_table_errors() {
6548        let svc = make_service();
6549        let req = make_request(
6550            "PutItem",
6551            json!({
6552                "TableName": "ghost",
6553                "Item": {"k": {"S": "v"}}
6554            }),
6555        );
6556        assert!(svc.put_item(&req).is_err());
6557    }
6558
6559    #[test]
6560    fn put_item_missing_key_attribute_errors() {
6561        let svc = make_service();
6562        svc.create_table(&make_request(
6563            "CreateTable",
6564            json!({
6565                "TableName": "pmk",
6566                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6567                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6568                "BillingMode": "PAY_PER_REQUEST"
6569            }),
6570        ))
6571        .unwrap();
6572        let req = make_request(
6573            "PutItem",
6574            json!({
6575                "TableName": "pmk",
6576                "Item": {"other": {"S": "v"}}
6577            }),
6578        );
6579        assert!(svc.put_item(&req).is_err());
6580    }
6581
6582    #[test]
6583    fn get_item_unknown_table_errors() {
6584        let svc = make_service();
6585        let req = make_request(
6586            "GetItem",
6587            json!({"TableName": "ghost", "Key": {"k": {"S": "1"}}}),
6588        );
6589        assert!(svc.get_item(&req).is_err());
6590    }
6591
6592    #[test]
6593    fn delete_item_unknown_table_errors() {
6594        let svc = make_service();
6595        let req = make_request(
6596            "DeleteItem",
6597            json!({"TableName": "ghost", "Key": {"k": {"S": "1"}}}),
6598        );
6599        assert!(svc.delete_item(&req).is_err());
6600    }
6601
6602    #[test]
6603    fn update_item_unknown_table_errors() {
6604        let svc = make_service();
6605        let req = make_request(
6606            "UpdateItem",
6607            json!({
6608                "TableName": "ghost",
6609                "Key": {"k": {"S": "1"}},
6610                "UpdateExpression": "SET x = :v",
6611                "ExpressionAttributeValues": {":v": {"S": "val"}}
6612            }),
6613        );
6614        assert!(svc.update_item(&req).is_err());
6615    }
6616
6617    #[test]
6618    fn query_unknown_table_errors() {
6619        let svc = make_service();
6620        let req = make_request(
6621            "Query",
6622            json!({
6623                "TableName": "ghost",
6624                "KeyConditionExpression": "k = :v",
6625                "ExpressionAttributeValues": {":v": {"S": "x"}}
6626            }),
6627        );
6628        assert!(svc.query(&req).is_err());
6629    }
6630
6631    #[test]
6632    fn scan_unknown_table_errors() {
6633        let svc = make_service();
6634        let req = make_request("Scan", json!({"TableName": "ghost"}));
6635        assert!(svc.scan(&req).is_err());
6636    }
6637
6638    #[test]
6639    fn scan_with_limit_returns_ok() {
6640        let svc = make_service();
6641        svc.create_table(&make_request(
6642            "CreateTable",
6643            json!({
6644                "TableName": "slt",
6645                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6646                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6647                "BillingMode": "PAY_PER_REQUEST"
6648            }),
6649        ))
6650        .unwrap();
6651        for i in 0..5 {
6652            svc.put_item(&make_request(
6653                "PutItem",
6654                json!({
6655                    "TableName": "slt",
6656                    "Item": {"k": {"S": format!("key-{i}")}}
6657                }),
6658            ))
6659            .unwrap();
6660        }
6661        let req = make_request("Scan", json!({"TableName": "slt", "Limit": 2}));
6662        let resp = svc.scan(&req).unwrap();
6663        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6664        assert_eq!(body["Count"], 2);
6665    }
6666
6667    #[test]
6668    fn batch_get_item_unknown_table_errors() {
6669        let svc = make_service();
6670        let req = make_request(
6671            "BatchGetItem",
6672            json!({
6673                "RequestItems": {
6674                    "ghost": {"Keys": [{"k": {"S": "1"}}]}
6675                }
6676            }),
6677        );
6678        assert!(svc.batch_get_item(&req).is_err());
6679    }
6680
6681    #[test]
6682    fn batch_write_item_unknown_table_errors() {
6683        let svc = make_service();
6684        let req = make_request(
6685            "BatchWriteItem",
6686            json!({
6687                "RequestItems": {
6688                    "ghost": [{"PutRequest": {"Item": {"k": {"S": "1"}}}}]
6689                }
6690            }),
6691        );
6692        assert!(svc.batch_write_item(&req).is_err());
6693    }
6694
6695    #[test]
6696    fn transact_write_items_unknown_table_errors() {
6697        let svc = make_service();
6698        let req = make_request(
6699            "TransactWriteItems",
6700            json!({
6701                "TransactItems": [{
6702                    "Put": {"TableName": "ghost", "Item": {"k": {"S": "1"}}}
6703                }]
6704            }),
6705        );
6706        assert!(svc.transact_write_items(&req).is_err());
6707    }
6708
6709    #[test]
6710    fn transact_get_items_unknown_table_errors() {
6711        let svc = make_service();
6712        let req = make_request(
6713            "TransactGetItems",
6714            json!({
6715                "TransactItems": [{
6716                    "Get": {"TableName": "ghost", "Key": {"k": {"S": "1"}}}
6717                }]
6718            }),
6719        );
6720        assert!(svc.transact_get_items(&req).is_err());
6721    }
6722
6723    #[test]
6724    fn describe_global_table_not_found_b() {
6725        let svc = make_service();
6726        let req = make_request("DescribeGlobalTable", json!({"GlobalTableName": "ghost"}));
6727        assert!(svc.describe_global_table(&req).is_err());
6728    }
6729
6730    #[test]
6731    fn list_global_tables_empty_ok() {
6732        let svc = make_service();
6733        let req = make_request("ListGlobalTables", json!({}));
6734        let resp = svc.list_global_tables(&req).unwrap();
6735        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6736        assert!(body["GlobalTables"].is_array());
6737    }
6738}