Skip to main content

fakecloud_dynamodb/service/
mod.rs

1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use base64::Engine;
15use http::StatusCode;
16use serde_json::{json, Value};
17
18use fakecloud_core::delivery::DeliveryBus;
19use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
20
21use fakecloud_persistence::{S3Store, SnapshotStore};
22use fakecloud_s3::state::SharedS3State;
23
24use crate::state::{
25    attribute_type_and_value, AttributeDefinition, AttributeValue, DynamoDbSnapshot, DynamoTable,
26    GlobalSecondaryIndex, KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection,
27    ProvisionedThroughput, SharedDynamoDbState, DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
28};
29
30/// Minimal subset of a ``DynamoTable`` that Kinesis streaming delivery needs.
31///
32/// A table can carry megabytes of items; cloning the whole table just to
33/// release the write lock and deliver one change record is extremely wasteful.
34/// Extracting only the fields the delivery path actually reads (destinations,
35/// arn, name) keeps the clone small.
36pub(super) struct KinesisDeliveryTarget {
37    pub destinations: Vec<KinesisDestination>,
38    pub arn: String,
39    pub name: String,
40}
41
42/// Operation flavor for the per-item KMS audit-trail emitter. Reads
43/// emit a paired `Decrypt` after `GenerateDataKey`; writes only emit
44/// `GenerateDataKey`, mirroring AWS's audit shape.
45pub(crate) enum TableKmsOp {
46    Read,
47    Write,
48}
49
50pub struct DynamoDbService {
51    state: SharedDynamoDbState,
52    pub(crate) s3_state: Option<SharedS3State>,
53    pub(crate) s3_store: Option<Arc<dyn S3Store>>,
54    delivery: Option<Arc<DeliveryBus>>,
55    snapshot_store: Option<Arc<dyn SnapshotStore>>,
56    pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
57    pub(crate) region: String,
58    /// Serializes concurrent snapshot writes so the newest observed
59    /// state always wins on disk. Without it, two tasks could race
60    /// between state.read().clone() and store.save() and leave older
61    /// bytes as the final on-disk state.
62    snapshot_lock: Arc<tokio::sync::Mutex<()>>,
63}
64
65impl DynamoDbService {
66    pub fn new(state: SharedDynamoDbState) -> Self {
67        Self {
68            state,
69            s3_state: None,
70            s3_store: None,
71            delivery: None,
72            snapshot_store: None,
73            kms_hook: None,
74            region: "us-east-1".to_string(),
75            snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
76        }
77    }
78
79    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
80        self.s3_state = Some(s3_state);
81        self
82    }
83
84    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
85        self.s3_store = Some(store);
86        self
87    }
88
89    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
90        self.delivery = Some(delivery);
91        self
92    }
93
94    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
95        self.snapshot_store = Some(store);
96        self
97    }
98
99    pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
100        self.kms_hook = Some(hook);
101        self
102    }
103
104    pub fn with_region(mut self, region: impl Into<String>) -> Self {
105        self.region = region.into();
106        self
107    }
108
109    /// Record `GenerateDataKey` + `Decrypt` for an SSE-KMS table on a
110    /// PutItem/UpdateItem (write) and GetItem/Query/Scan (read). DDB
111    /// item bodies are nested attribute maps — encrypting them in
112    /// fakecloud would balloon scope without adding test coverage that
113    /// users actually want, so we just emit the audit-trail records the
114    /// AWS API produces and let callers assert KMS usage via
115    /// `/_fakecloud/kms/usage`.
116    pub(crate) fn record_table_kms_usage(
117        &self,
118        account_id: &str,
119        table_arn: &str,
120        kms_key_arn: Option<&str>,
121        operation: TableKmsOp,
122    ) {
123        let Some(hook) = &self.kms_hook else { return };
124        let key = kms_key_arn
125            .filter(|k| !k.is_empty())
126            .unwrap_or("aws/dynamodb");
127        // DynamoDB SSE-KMS uses the AWS-documented encryption context:
128        // {aws:dynamodb:tableName: <name>, aws:dynamodb:subscriberId: <account>}
129        // — see the AWS DynamoDB encryption-at-rest docs. The table arn
130        // ends with `:table/<name>`, so derive the name from it.
131        let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
132        let mut ctx = std::collections::HashMap::new();
133        ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
134        ctx.insert(
135            "aws:dynamodb:subscriberId".to_string(),
136            account_id.to_string(),
137        );
138        let envelope = match hook.encrypt(
139            account_id,
140            &self.region,
141            key,
142            b"ddb-item",
143            "dynamodb.amazonaws.com",
144            ctx.clone(),
145        ) {
146            Ok(env) => env,
147            Err(_) => return,
148        };
149        if matches!(operation, TableKmsOp::Read) {
150            let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
151        }
152    }
153
154    /// Persist the current in-memory state as a snapshot. Called after
155    /// every state-mutating action. A noop when no snapshot store is
156    /// configured (i.e. `StorageMode::Memory`).
157    ///
158    /// The snapshot lock serializes the full clone + serialize + write
159    /// so concurrent mutators cannot leave older bytes on disk, and
160    /// serialization + the blocking file write are offloaded to the
161    /// blocking pool to keep Tokio workers responsive.
162    async fn save_snapshot(&self) {
163        let Some(store) = self.snapshot_store.clone() else {
164            return;
165        };
166        let _guard = self.snapshot_lock.lock().await;
167        let snapshot = DynamoDbSnapshot {
168            schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
169            accounts: Some(self.state.read().clone()),
170            state: None,
171        };
172        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
173            let bytes = serde_json::to_vec(&snapshot)
174                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
175            store.save(&bytes)
176        })
177        .await;
178        match join {
179            Ok(Ok(())) => {}
180            Ok(Err(err)) => tracing::error!(%err, "failed to write dynamodb snapshot"),
181            Err(err) => tracing::error!(%err, "dynamodb snapshot task panicked"),
182        }
183    }
184
185    fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
186        if table
187            .kinesis_destinations
188            .iter()
189            .any(|d| d.destination_status == "ACTIVE")
190        {
191            Some(KinesisDeliveryTarget {
192                destinations: table.kinesis_destinations.clone(),
193                arn: table.arn.clone(),
194                name: table.name.clone(),
195            })
196        } else {
197            None
198        }
199    }
200
201    /// Deliver a change record to all active Kinesis streaming destinations for a table.
202    pub(super) fn deliver_to_kinesis_destinations(
203        &self,
204        target: &KinesisDeliveryTarget,
205        event_name: &str,
206        keys: &HashMap<String, AttributeValue>,
207        old_image: Option<&HashMap<String, AttributeValue>>,
208        new_image: Option<&HashMap<String, AttributeValue>>,
209    ) {
210        let delivery = match &self.delivery {
211            Some(d) => d,
212            None => return,
213        };
214
215        let active_destinations: Vec<_> = target
216            .destinations
217            .iter()
218            .filter(|d| d.destination_status == "ACTIVE")
219            .collect();
220
221        if active_destinations.is_empty() {
222            return;
223        }
224
225        let mut record = json!({
226            "eventID": uuid::Uuid::new_v4().to_string(),
227            "eventName": event_name,
228            "eventVersion": "1.1",
229            "eventSource": "aws:dynamodb",
230            "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
231            "dynamodb": {
232                "Keys": keys,
233                "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
234                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
235                "StreamViewType": "NEW_AND_OLD_IMAGES",
236            },
237            "eventSourceARN": &target.arn,
238            "tableName": &target.name,
239        });
240
241        if let Some(old) = old_image {
242            record["dynamodb"]["OldImage"] = json!(old);
243        }
244        if let Some(new) = new_image {
245            record["dynamodb"]["NewImage"] = json!(new);
246        }
247
248        let record_str = serde_json::to_string(&record).unwrap_or_default();
249        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
250        let partition_key = serde_json::to_string(keys).unwrap_or_default();
251
252        for dest in active_destinations {
253            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
254        }
255    }
256
257    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
258        serde_json::from_slice(&req.body).map_err(|e| {
259            AwsServiceError::aws_error(
260                StatusCode::BAD_REQUEST,
261                "SerializationException",
262                format!("Invalid JSON: {e}"),
263            )
264        })
265    }
266
267    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
268        Ok(AwsResponse::ok_json(body))
269    }
270}
271
272#[async_trait]
273impl AwsService for DynamoDbService {
274    fn service_name(&self) -> &str {
275        "dynamodb"
276    }
277
278    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
279        let mutates = is_mutating_action(req.action.as_str());
280        let result = match req.action.as_str() {
281            "CreateTable" => self.create_table(&req),
282            "DeleteTable" => self.delete_table(&req),
283            "DescribeTable" => self.describe_table(&req),
284            "ListTables" => self.list_tables(&req),
285            "UpdateTable" => self.update_table(&req),
286            "PutItem" => self.put_item(&req),
287            "GetItem" => self.get_item(&req),
288            "DeleteItem" => self.delete_item(&req),
289            "UpdateItem" => self.update_item(&req),
290            "Query" => self.query(&req),
291            "Scan" => self.scan(&req),
292            "BatchGetItem" => self.batch_get_item(&req),
293            "BatchWriteItem" => self.batch_write_item(&req),
294            "TagResource" => self.tag_resource(&req),
295            "UntagResource" => self.untag_resource(&req),
296            "ListTagsOfResource" => self.list_tags_of_resource(&req),
297            "TransactGetItems" => self.transact_get_items(&req),
298            "TransactWriteItems" => self.transact_write_items(&req),
299            "ExecuteStatement" => self.execute_statement(&req),
300            "BatchExecuteStatement" => self.batch_execute_statement(&req),
301            "ExecuteTransaction" => self.execute_transaction(&req),
302            "UpdateTimeToLive" => self.update_time_to_live(&req),
303            "DescribeTimeToLive" => self.describe_time_to_live(&req),
304            "PutResourcePolicy" => self.put_resource_policy(&req),
305            "GetResourcePolicy" => self.get_resource_policy(&req),
306            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
307            // Stubs
308            "DescribeEndpoints" => self.describe_endpoints(&req),
309            "DescribeLimits" => self.describe_limits(&req),
310            // Backups
311            "CreateBackup" => self.create_backup(&req),
312            "DeleteBackup" => self.delete_backup(&req),
313            "DescribeBackup" => self.describe_backup(&req),
314            "ListBackups" => self.list_backups(&req),
315            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
316            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
317            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
318            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
319            // Global tables
320            "CreateGlobalTable" => self.create_global_table(&req),
321            "DescribeGlobalTable" => self.describe_global_table(&req),
322            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
323            "ListGlobalTables" => self.list_global_tables(&req),
324            "UpdateGlobalTable" => self.update_global_table(&req),
325            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
326            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
327            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
328            // Kinesis streaming
329            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
330            "DisableKinesisStreamingDestination" => {
331                self.disable_kinesis_streaming_destination(&req)
332            }
333            "DescribeKinesisStreamingDestination" => {
334                self.describe_kinesis_streaming_destination(&req)
335            }
336            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
337            // Contributor insights
338            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
339            "UpdateContributorInsights" => self.update_contributor_insights(&req),
340            "ListContributorInsights" => self.list_contributor_insights(&req),
341            // Import/Export
342            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
343            "DescribeExport" => self.describe_export(&req),
344            "ListExports" => self.list_exports(&req),
345            "ImportTable" => self.import_table(&req),
346            "DescribeImport" => self.describe_import(&req),
347            "ListImports" => self.list_imports(&req),
348            _ => Err(AwsServiceError::action_not_implemented(
349                "dynamodb",
350                &req.action,
351            )),
352        };
353        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
354            self.save_snapshot().await;
355        }
356        result
357    }
358
359    fn supported_actions(&self) -> &[&str] {
360        &[
361            "CreateTable",
362            "DeleteTable",
363            "DescribeTable",
364            "ListTables",
365            "UpdateTable",
366            "PutItem",
367            "GetItem",
368            "DeleteItem",
369            "UpdateItem",
370            "Query",
371            "Scan",
372            "BatchGetItem",
373            "BatchWriteItem",
374            "TagResource",
375            "UntagResource",
376            "ListTagsOfResource",
377            "TransactGetItems",
378            "TransactWriteItems",
379            "ExecuteStatement",
380            "BatchExecuteStatement",
381            "ExecuteTransaction",
382            "UpdateTimeToLive",
383            "DescribeTimeToLive",
384            "PutResourcePolicy",
385            "GetResourcePolicy",
386            "DeleteResourcePolicy",
387            "DescribeEndpoints",
388            "DescribeLimits",
389            "CreateBackup",
390            "DeleteBackup",
391            "DescribeBackup",
392            "ListBackups",
393            "RestoreTableFromBackup",
394            "RestoreTableToPointInTime",
395            "UpdateContinuousBackups",
396            "DescribeContinuousBackups",
397            "CreateGlobalTable",
398            "DescribeGlobalTable",
399            "DescribeGlobalTableSettings",
400            "ListGlobalTables",
401            "UpdateGlobalTable",
402            "UpdateGlobalTableSettings",
403            "DescribeTableReplicaAutoScaling",
404            "UpdateTableReplicaAutoScaling",
405            "EnableKinesisStreamingDestination",
406            "DisableKinesisStreamingDestination",
407            "DescribeKinesisStreamingDestination",
408            "UpdateKinesisStreamingDestination",
409            "DescribeContributorInsights",
410            "UpdateContributorInsights",
411            "ListContributorInsights",
412            "ExportTableToPointInTime",
413            "DescribeExport",
414            "ListExports",
415            "ImportTable",
416            "DescribeImport",
417            "ListImports",
418        ]
419    }
420}
421/// Actions that mutate DynamoDB state and therefore require a snapshot
422/// write after success. Kept in sync with the dispatch table above.
423fn is_mutating_action(action: &str) -> bool {
424    matches!(
425        action,
426        "CreateTable"
427            | "DeleteTable"
428            | "UpdateTable"
429            | "PutItem"
430            | "DeleteItem"
431            | "UpdateItem"
432            | "BatchWriteItem"
433            | "TagResource"
434            | "UntagResource"
435            | "TransactWriteItems"
436            | "ExecuteStatement"
437            | "BatchExecuteStatement"
438            | "ExecuteTransaction"
439            | "UpdateTimeToLive"
440            | "PutResourcePolicy"
441            | "DeleteResourcePolicy"
442            | "CreateBackup"
443            | "DeleteBackup"
444            | "RestoreTableFromBackup"
445            | "RestoreTableToPointInTime"
446            | "UpdateContinuousBackups"
447            | "CreateGlobalTable"
448            | "UpdateGlobalTable"
449            | "UpdateGlobalTableSettings"
450            | "UpdateTableReplicaAutoScaling"
451            | "EnableKinesisStreamingDestination"
452            | "DisableKinesisStreamingDestination"
453            | "UpdateKinesisStreamingDestination"
454            | "UpdateContributorInsights"
455            | "ExportTableToPointInTime"
456            | "ImportTable"
457    )
458}
459
460// ── Helper functions ────────────────────────────────────────────────────
461
462fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
463    body[field].as_str().ok_or_else(|| {
464        AwsServiceError::aws_error(
465            StatusCode::BAD_REQUEST,
466            "ValidationException",
467            format!("{field} is required"),
468        )
469    })
470}
471
472fn require_object(
473    body: &Value,
474    field: &str,
475) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
476    let obj = body[field].as_object().ok_or_else(|| {
477        AwsServiceError::aws_error(
478            StatusCode::BAD_REQUEST,
479            "ValidationException",
480            format!("{field} is required"),
481        )
482    })?;
483    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
484}
485
486fn get_table<'a>(
487    tables: &'a HashMap<String, DynamoTable>,
488    name: &str,
489) -> Result<&'a DynamoTable, AwsServiceError> {
490    tables.get(name).ok_or_else(|| {
491        AwsServiceError::aws_error(
492            StatusCode::BAD_REQUEST,
493            "ResourceNotFoundException",
494            format!("Requested resource not found: Table: {name} not found"),
495        )
496    })
497}
498
499fn get_table_mut<'a>(
500    tables: &'a mut HashMap<String, DynamoTable>,
501    name: &str,
502) -> Result<&'a mut DynamoTable, AwsServiceError> {
503    tables.get_mut(name).ok_or_else(|| {
504        AwsServiceError::aws_error(
505            StatusCode::BAD_REQUEST,
506            "ResourceNotFoundException",
507            format!("Requested resource not found: Table: {name} not found"),
508        )
509    })
510}
511
512fn find_table_by_arn<'a>(
513    tables: &'a HashMap<String, DynamoTable>,
514    arn: &str,
515) -> Result<&'a DynamoTable, AwsServiceError> {
516    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
517        AwsServiceError::aws_error(
518            StatusCode::BAD_REQUEST,
519            "ResourceNotFoundException",
520            format!("Requested resource not found: {arn}"),
521        )
522    })
523}
524
525fn find_table_by_arn_mut<'a>(
526    tables: &'a mut HashMap<String, DynamoTable>,
527    arn: &str,
528) -> Result<&'a mut DynamoTable, AwsServiceError> {
529    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
530        AwsServiceError::aws_error(
531            StatusCode::BAD_REQUEST,
532            "ResourceNotFoundException",
533            format!("Requested resource not found: {arn}"),
534        )
535    })
536}
537
538fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
539    let arr = val.as_array().ok_or_else(|| {
540        AwsServiceError::aws_error(
541            StatusCode::BAD_REQUEST,
542            "ValidationException",
543            "KeySchema is required",
544        )
545    })?;
546    Ok(arr
547        .iter()
548        .map(|elem| KeySchemaElement {
549            attribute_name: elem["AttributeName"]
550                .as_str()
551                .unwrap_or_default()
552                .to_string(),
553            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
554        })
555        .collect())
556}
557
558fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
559    let arr = val.as_array().ok_or_else(|| {
560        AwsServiceError::aws_error(
561            StatusCode::BAD_REQUEST,
562            "ValidationException",
563            "AttributeDefinitions is required",
564        )
565    })?;
566    Ok(arr
567        .iter()
568        .map(|elem| AttributeDefinition {
569            attribute_name: elem["AttributeName"]
570                .as_str()
571                .unwrap_or_default()
572                .to_string(),
573            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
574        })
575        .collect())
576}
577
578fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
579    Ok(ProvisionedThroughput {
580        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
581        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
582    })
583}
584
585fn parse_gsi(val: &Value, billing_mode: &str) -> Vec<GlobalSecondaryIndex> {
586    let Some(arr) = val.as_array() else {
587        return Vec::new();
588    };
589    arr.iter()
590        .filter_map(|g| {
591            Some(GlobalSecondaryIndex {
592                index_name: g["IndexName"].as_str()?.to_string(),
593                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
594                projection: parse_projection(&g["Projection"]),
595                provisioned_throughput: Some(parse_gsi_throughput(
596                    &g["ProvisionedThroughput"],
597                    billing_mode,
598                )),
599                on_demand_throughput: parse_on_demand_throughput(&g["OnDemandThroughput"]),
600            })
601        })
602        .collect()
603}
604
605/// Parse an `OnDemandThroughput` block. Absent fields default to `-1`,
606/// which is AWS's sentinel for "no cap" — and the value real AWS echoes
607/// back on DescribeTable when the caller omitted either axis.
608pub(super) fn parse_on_demand_throughput(val: &Value) -> Option<crate::state::OnDemandThroughput> {
609    if !val.is_object() {
610        return None;
611    }
612    Some(crate::state::OnDemandThroughput {
613        max_read_request_units: val["MaxReadRequestUnits"].as_i64().unwrap_or(-1),
614        max_write_request_units: val["MaxWriteRequestUnits"].as_i64().unwrap_or(-1),
615    })
616}
617
618/// Resolve the provisioned-throughput slot for a GSI on a CreateTable or
619/// UpdateTable Create action. Real DynamoDB returns `{0, 0}` for GSIs on
620/// PAY_PER_REQUEST tables regardless of whether the caller sent a
621/// `ProvisionedThroughput` block, and the Terraform provider's `flatten`
622/// code keys `name`/`read_capacity`/`write_capacity` off the presence of
623/// that field — returning `None` would desynchronise state.
624fn parse_gsi_throughput(val: &Value, billing_mode: &str) -> ProvisionedThroughput {
625    if billing_mode == "PAY_PER_REQUEST" {
626        return ProvisionedThroughput {
627            read_capacity_units: 0,
628            write_capacity_units: 0,
629        };
630    }
631    ProvisionedThroughput {
632        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
633        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
634    }
635}
636
637fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
638    let Some(arr) = val.as_array() else {
639        return Vec::new();
640    };
641    arr.iter()
642        .filter_map(|l| {
643            Some(LocalSecondaryIndex {
644                index_name: l["IndexName"].as_str()?.to_string(),
645                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
646                projection: parse_projection(&l["Projection"]),
647            })
648        })
649        .collect()
650}
651
652pub(super) fn parse_projection(val: &Value) -> Projection {
653    Projection {
654        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
655        non_key_attributes: val["NonKeyAttributes"]
656            .as_array()
657            .map(|arr| {
658                arr.iter()
659                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
660                    .collect()
661            })
662            .unwrap_or_default(),
663    }
664}
665
666fn parse_tags(val: &Value) -> HashMap<String, String> {
667    let mut tags = HashMap::new();
668    if let Some(arr) = val.as_array() {
669        for tag in arr {
670            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
671                tags.insert(k.to_string(), v.to_string());
672            }
673        }
674    }
675    tags
676}
677
678fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
679    let mut names = HashMap::new();
680    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
681        for (k, v) in obj {
682            if let Some(s) = v.as_str() {
683                names.insert(k.clone(), s.to_string());
684            }
685        }
686    }
687    names
688}
689
690fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
691    let mut values = HashMap::new();
692    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
693        for (k, v) in obj {
694            values.insert(k.clone(), v.clone());
695        }
696    }
697    values
698}
699
700fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
701    if name.starts_with('#') {
702        expr_attr_names
703            .get(name)
704            .cloned()
705            .unwrap_or_else(|| name.to_string())
706    } else {
707        name.to_string()
708    }
709}
710
711/// Resolve a (possibly dotted, possibly `#name`-containing) document path to
712/// the leaf `AttributeValue` inside `item`. Single-segment paths (`foo`,
713/// `#foo`) resolve to a top-level attribute. Dotted paths (`profile.email`,
714/// `#p.#e`, `items[0].sku`) walk into `M`/`L` containers. Returns `None` if
715/// any segment is missing or the intermediate value isn't a map/list.
716fn resolve_path(
717    path: &str,
718    item: &HashMap<String, AttributeValue>,
719    expr_attr_names: &HashMap<String, String>,
720) -> Option<Value> {
721    // Fast path: a single-segment expression (no `.` and no `[` in the raw
722    // input) refers to a top-level attribute by its literal name, even if the
723    // resolved alias contains a `.`. Without this, `#sw` -> `Safety.Warning`
724    // would be misread as the nested path `Safety` -> `Warning`.
725    if !path.contains('.') && !path.contains('[') {
726        return item.get(&resolve_attr_name(path, expr_attr_names)).cloned();
727    }
728    let resolved = resolve_projection_path(path, expr_attr_names);
729    resolve_nested_path(item, &resolved)
730}
731
732fn extract_key(
733    table: &DynamoTable,
734    item: &HashMap<String, AttributeValue>,
735) -> HashMap<String, AttributeValue> {
736    let mut key = HashMap::new();
737    let hash_key = table.hash_key_name();
738    if let Some(v) = item.get(hash_key) {
739        key.insert(hash_key.to_string(), v.clone());
740    }
741    if let Some(range_key) = table.range_key_name() {
742        if let Some(v) = item.get(range_key) {
743            key.insert(range_key.to_string(), v.clone());
744        }
745    }
746    key
747}
748
749/// Parse a JSON object into a key map (used for ExclusiveStartKey).
750fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
751    let obj = value.as_object()?;
752    if obj.is_empty() {
753        return None;
754    }
755    Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
756}
757
758/// Check whether an item's key attributes match the given key map.
759fn item_matches_key(
760    item: &HashMap<String, AttributeValue>,
761    key: &HashMap<String, AttributeValue>,
762    hash_key_name: &str,
763    range_key_name: Option<&str>,
764) -> bool {
765    let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
766        (Some(a), Some(b)) => a == b,
767        _ => false,
768    };
769    if !hash_match {
770        return false;
771    }
772    match range_key_name {
773        Some(rk) => match (item.get(rk), key.get(rk)) {
774            (Some(a), Some(b)) => a == b,
775            (None, None) => true,
776            _ => false,
777        },
778        None => true,
779    }
780}
781
782/// Extract the primary key from an item given explicit key attribute names.
783fn extract_key_for_schema(
784    item: &HashMap<String, AttributeValue>,
785    hash_key_name: &str,
786    range_key_name: Option<&str>,
787) -> HashMap<String, AttributeValue> {
788    let mut key = HashMap::new();
789    if let Some(v) = item.get(hash_key_name) {
790        key.insert(hash_key_name.to_string(), v.clone());
791    }
792    if let Some(rk) = range_key_name {
793        if let Some(v) = item.get(rk) {
794            key.insert(rk.to_string(), v.clone());
795        }
796    }
797    key
798}
799
800fn validate_key_in_item(
801    table: &DynamoTable,
802    item: &HashMap<String, AttributeValue>,
803) -> Result<(), AwsServiceError> {
804    let hash_key = table.hash_key_name();
805    if !item.contains_key(hash_key) {
806        return Err(AwsServiceError::aws_error(
807            StatusCode::BAD_REQUEST,
808            "ValidationException",
809            format!("Missing the key {hash_key} in the item"),
810        ));
811    }
812    if let Some(range_key) = table.range_key_name() {
813        if !item.contains_key(range_key) {
814            return Err(AwsServiceError::aws_error(
815                StatusCode::BAD_REQUEST,
816                "ValidationException",
817                format!("Missing the key {range_key} in the item"),
818            ));
819        }
820    }
821    Ok(())
822}
823
824fn validate_key_attributes_in_key(
825    table: &DynamoTable,
826    key: &HashMap<String, AttributeValue>,
827) -> Result<(), AwsServiceError> {
828    let hash_key = table.hash_key_name();
829    if !key.contains_key(hash_key) {
830        return Err(AwsServiceError::aws_error(
831            StatusCode::BAD_REQUEST,
832            "ValidationException",
833            format!("Missing the key {hash_key} in the item"),
834        ));
835    }
836    Ok(())
837}
838
839fn project_item(
840    item: &HashMap<String, AttributeValue>,
841    body: &Value,
842) -> HashMap<String, AttributeValue> {
843    let projection = body["ProjectionExpression"].as_str();
844    match projection {
845        Some(proj) if !proj.is_empty() => {
846            let expr_attr_names = parse_expression_attribute_names(body);
847            let mut result = HashMap::new();
848            for raw in proj.split(',') {
849                let raw = raw.trim();
850                // Single-segment: treat as literal top-level attribute even if
851                // the alias resolves to a name containing `.` (e.g. `#sw` ->
852                // `Safety.Warning`).
853                if !raw.contains('.') && !raw.contains('[') {
854                    let key = resolve_attr_name(raw, &expr_attr_names);
855                    if let Some(v) = item.get(&key) {
856                        result.insert(key, v.clone());
857                    }
858                } else {
859                    let resolved = resolve_projection_path(raw, &expr_attr_names);
860                    if let Some(v) = resolve_nested_path(item, &resolved) {
861                        insert_nested_value(&mut result, &resolved, v);
862                    }
863                }
864            }
865            result
866        }
867        _ => item.clone(),
868    }
869}
870
871/// Resolve expression attribute names within each segment of a projection path.
872/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
873fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
874    // Split on dots, resolve each part, rejoin
875    let mut result = String::new();
876    for (i, segment) in path.split('.').enumerate() {
877        if i > 0 {
878            result.push('.');
879        }
880        // A segment might be like "#n" or "people[0]" or "#attr[0]"
881        if let Some(bracket_pos) = segment.find('[') {
882            let key_part = &segment[..bracket_pos];
883            let index_part = &segment[bracket_pos..];
884            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
885            result.push_str(index_part);
886        } else {
887            result.push_str(&resolve_attr_name(segment, expr_attr_names));
888        }
889    }
890    result
891}
892
893/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
894fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
895    let segments = parse_path_segments(path);
896    if segments.is_empty() {
897        return None;
898    }
899
900    let first = &segments[0];
901    let top_key = match first {
902        PathSegment::Key(k) => k.as_str(),
903        _ => return None,
904    };
905
906    let mut current = item.get(top_key)?.clone();
907
908    for segment in &segments[1..] {
909        match segment {
910            PathSegment::Key(k) => {
911                // Navigate into a Map: {"M": {"key": ...}}
912                current = current.get("M")?.get(k)?.clone();
913            }
914            PathSegment::Index(idx) => {
915                // Navigate into a List: {"L": [...]}
916                current = current.get("L")?.get(*idx)?.clone();
917            }
918        }
919    }
920
921    Some(current)
922}
923
924#[derive(Debug)]
925enum PathSegment {
926    Key(String),
927    Index(usize),
928}
929
930/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
931fn parse_path_segments(path: &str) -> Vec<PathSegment> {
932    let mut segments = Vec::new();
933    let mut current = String::new();
934
935    let chars: Vec<char> = path.chars().collect();
936    let mut i = 0;
937    while i < chars.len() {
938        match chars[i] {
939            '.' => {
940                if !current.is_empty() {
941                    segments.push(PathSegment::Key(current.clone()));
942                    current.clear();
943                }
944            }
945            '[' => {
946                if !current.is_empty() {
947                    segments.push(PathSegment::Key(current.clone()));
948                    current.clear();
949                }
950                i += 1;
951                let mut num = String::new();
952                while i < chars.len() && chars[i] != ']' {
953                    num.push(chars[i]);
954                    i += 1;
955                }
956                if let Ok(idx) = num.parse::<usize>() {
957                    segments.push(PathSegment::Index(idx));
958                }
959                // skip ']'
960            }
961            c => {
962                current.push(c);
963            }
964        }
965        i += 1;
966    }
967    if !current.is_empty() {
968        segments.push(PathSegment::Key(current));
969    }
970    segments
971}
972
973/// Insert a value at a nested path in the result HashMap.
974/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
975fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
976    // Simple case: no nesting
977    if !path.contains('.') && !path.contains('[') {
978        result.insert(path.to_string(), value);
979        return;
980    }
981
982    let segments = parse_path_segments(path);
983    if segments.is_empty() {
984        return;
985    }
986
987    let top_key = match &segments[0] {
988        PathSegment::Key(k) => k.clone(),
989        _ => return,
990    };
991
992    if segments.len() == 1 {
993        result.insert(top_key, value);
994        return;
995    }
996
997    // For nested paths, wrap the value back into the nested structure
998    let wrapped = wrap_value_in_path(&segments[1..], value);
999    // Merge into existing value if present
1000    let existing = result.remove(&top_key);
1001    let merged = match existing {
1002        Some(existing) => merge_attribute_values(existing, wrapped),
1003        None => wrapped,
1004    };
1005    result.insert(top_key, merged);
1006}
1007
1008/// Wrap a value in the nested path structure.
1009fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
1010    if segments.is_empty() {
1011        return value;
1012    }
1013    let inner = wrap_value_in_path(&segments[1..], value);
1014    match &segments[0] {
1015        PathSegment::Key(k) => {
1016            json!({"M": {k.clone(): inner}})
1017        }
1018        PathSegment::Index(idx) => {
1019            let mut arr = vec![Value::Null; idx + 1];
1020            arr[*idx] = inner;
1021            json!({"L": arr})
1022        }
1023    }
1024}
1025
1026/// Merge two attribute values (for overlapping projections).
1027fn merge_attribute_values(a: Value, b: Value) -> Value {
1028    if let (Some(a_map), Some(b_map)) = (
1029        a.get("M").and_then(|v| v.as_object()),
1030        b.get("M").and_then(|v| v.as_object()),
1031    ) {
1032        let mut merged = a_map.clone();
1033        for (k, v) in b_map {
1034            if let Some(existing) = merged.get(k) {
1035                merged.insert(
1036                    k.clone(),
1037                    merge_attribute_values(existing.clone(), v.clone()),
1038                );
1039            } else {
1040                merged.insert(k.clone(), v.clone());
1041            }
1042        }
1043        json!({"M": merged})
1044    } else {
1045        b
1046    }
1047}
1048
1049fn evaluate_condition(
1050    condition: &str,
1051    existing: Option<&HashMap<String, AttributeValue>>,
1052    expr_attr_names: &HashMap<String, String>,
1053    expr_attr_values: &HashMap<String, Value>,
1054) -> Result<(), AwsServiceError> {
1055    // ConditionExpression and FilterExpression share the same DynamoDB grammar,
1056    // so we delegate to evaluate_filter_expression. An empty map models "item
1057    // doesn't exist" correctly: attribute_exists → false, attribute_not_exists
1058    // → true, comparisons against missing attributes → None vs Some(val).
1059    let empty = HashMap::new();
1060    let item = existing.unwrap_or(&empty);
1061    if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
1062        Ok(())
1063    } else {
1064        Err(AwsServiceError::aws_error(
1065            StatusCode::BAD_REQUEST,
1066            "ConditionalCheckFailedException",
1067            "The conditional request failed",
1068        ))
1069    }
1070}
1071
1072fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
1073    // aws-sdk-go v2's expression builder emits function calls with a space
1074    // between the name and the opening paren (`attribute_exists (#0)`),
1075    // while hand-written expressions usually don't — accept both.
1076    let with_paren = format!("{func_name}(");
1077    let with_space = format!("{func_name} (");
1078    let rest = expr
1079        .strip_prefix(&with_paren)
1080        .or_else(|| expr.strip_prefix(&with_space))?;
1081    let inner = rest.strip_suffix(')')?;
1082    Some(inner.trim())
1083}
1084
1085fn evaluate_key_condition(
1086    expr: &str,
1087    item: &HashMap<String, AttributeValue>,
1088    expr_attr_names: &HashMap<String, String>,
1089    expr_attr_values: &HashMap<String, Value>,
1090) -> bool {
1091    let trimmed = expr.trim();
1092
1093    let parts = split_on_and(trimmed);
1094    if parts.len() > 1 {
1095        return parts.iter().all(|part| {
1096            evaluate_key_condition(part.trim(), item, expr_attr_names, expr_attr_values)
1097        });
1098    }
1099
1100    let stripped = strip_outer_parens(trimmed);
1101    if stripped != trimmed {
1102        return evaluate_key_condition(stripped, item, expr_attr_names, expr_attr_values);
1103    }
1104
1105    evaluate_single_key_condition(trimmed, item, expr_attr_names, expr_attr_values)
1106}
1107
1108/// Split a DynamoDB condition expression on a top-level keyword (``AND`` /
1109/// ``OR``), case-insensitive, with ASCII-whitespace word boundaries so
1110/// ``:s\tAND\t:o`` and ``:s\nAND\n:o`` split the same as ``:s AND :o``.
1111///
1112/// Parenthesised groups are skipped so only unparenthesised occurrences of the
1113/// keyword act as separators. When splitting on ``AND``, each top-level
1114/// ``BETWEEN`` keyword consumes the next top-level ``AND`` as its own inner
1115/// separator (``x BETWEEN :lo AND :hi``) rather than letting it split the
1116/// expression.
1117fn split_on_top_level_keyword<'a>(expr: &'a str, keyword: &str) -> Vec<&'a str> {
1118    let bytes = expr.as_bytes();
1119    let len = bytes.len();
1120    let kw = keyword.as_bytes();
1121    let is_and = keyword.eq_ignore_ascii_case("AND");
1122
1123    let mut parts: Vec<&str> = Vec::new();
1124    let mut start = 0usize;
1125    let mut depth: i32 = 0;
1126    let mut between_skip: u32 = 0;
1127    let mut i = 0usize;
1128
1129    while i < len {
1130        let ch = bytes[i];
1131        if ch == b'(' {
1132            depth += 1;
1133            i += 1;
1134            continue;
1135        }
1136        if ch == b')' {
1137            if depth > 0 {
1138                depth -= 1;
1139            }
1140            i += 1;
1141            continue;
1142        }
1143        if depth == 0 {
1144            if is_and {
1145                if let Some(end) = match_keyword(bytes, i, b"BETWEEN") {
1146                    between_skip = between_skip.saturating_add(1);
1147                    i = end;
1148                    continue;
1149                }
1150            }
1151            if let Some(end) = match_keyword(bytes, i, kw) {
1152                if is_and && between_skip > 0 {
1153                    between_skip -= 1;
1154                    i = end;
1155                    continue;
1156                }
1157                parts.push(&expr[start..i]);
1158                start = end;
1159                i = end;
1160                continue;
1161            }
1162        }
1163        i += 1;
1164    }
1165    parts.push(&expr[start..]);
1166    parts
1167}
1168
1169/// Case-insensitive keyword match. For alphanumeric keywords (``AND``,
1170/// ``OR``, ``BETWEEN``) the match also requires ASCII-whitespace word
1171/// boundaries so substrings of identifiers are not mistaken for keywords.
1172/// Punctuation keywords (``,``) match literally.
1173fn match_keyword(bytes: &[u8], i: usize, keyword: &[u8]) -> Option<usize> {
1174    let end = i + keyword.len();
1175    if end > bytes.len() {
1176        return None;
1177    }
1178    for k in 0..keyword.len() {
1179        if !bytes[i + k].eq_ignore_ascii_case(&keyword[k]) {
1180            return None;
1181        }
1182    }
1183    let needs_word_boundary = keyword.iter().all(|b| b.is_ascii_alphanumeric());
1184    if needs_word_boundary {
1185        let left_ok = i == 0 || bytes[i - 1].is_ascii_whitespace();
1186        if !left_ok {
1187            return None;
1188        }
1189        let right_ok = end == bytes.len() || bytes[end].is_ascii_whitespace();
1190        if !right_ok {
1191            return None;
1192        }
1193    }
1194    Some(end)
1195}
1196
1197fn split_on_and(expr: &str) -> Vec<&str> {
1198    split_on_top_level_keyword(expr, "AND")
1199}
1200
1201fn split_on_or(expr: &str) -> Vec<&str> {
1202    split_on_top_level_keyword(expr, "OR")
1203}
1204
1205fn evaluate_single_key_condition(
1206    part: &str,
1207    item: &HashMap<String, AttributeValue>,
1208    expr_attr_names: &HashMap<String, String>,
1209    expr_attr_values: &HashMap<String, Value>,
1210) -> bool {
1211    let part = part.trim();
1212
1213    if let Some(rest) = part
1214        .strip_prefix("begins_with(")
1215        .or_else(|| part.strip_prefix("begins_with ("))
1216    {
1217        return key_cond_begins_with(rest, item, expr_attr_names, expr_attr_values);
1218    }
1219
1220    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
1221        return key_cond_between(part, between_pos, item, expr_attr_names, expr_attr_values);
1222    }
1223
1224    key_cond_simple_comparison(part, item, expr_attr_names, expr_attr_values)
1225}
1226
1227/// `begins_with(attr, :val)` — KeyCondition variant: supports only
1228/// S-typed attributes (mirrors AWS's behavior of returning false for
1229/// type mismatches). The filter-expression evaluator has its own
1230/// `eval_begins_with` because it operates on filter-grammar inputs.
1231fn key_cond_begins_with(
1232    rest: &str,
1233    item: &HashMap<String, AttributeValue>,
1234    expr_attr_names: &HashMap<String, String>,
1235    expr_attr_values: &HashMap<String, Value>,
1236) -> bool {
1237    let Some(inner) = rest.strip_suffix(')') else {
1238        return false;
1239    };
1240    let mut split = inner.splitn(2, ',');
1241    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1242        return false;
1243    };
1244    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1245    let expected = expr_attr_values.get(val_ref.trim());
1246    let actual = item.get(&attr_name);
1247    match (actual, expected) {
1248        (Some(a), Some(e)) => {
1249            let a_str = a.get("S").and_then(|v| v.as_str());
1250            let e_str = e.get("S").and_then(|v| v.as_str());
1251            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
1252        }
1253        _ => false,
1254    }
1255}
1256
1257/// `attr BETWEEN :lo AND :hi` — inclusive range comparison via the
1258/// shared `compare_attribute_values` ordering.
1259fn key_cond_between(
1260    part: &str,
1261    between_pos: usize,
1262    item: &HashMap<String, AttributeValue>,
1263    expr_attr_names: &HashMap<String, String>,
1264    expr_attr_values: &HashMap<String, Value>,
1265) -> bool {
1266    let attr_part = part[..between_pos].trim();
1267    let attr_name = resolve_attr_name(attr_part, expr_attr_names);
1268    let range_part = &part[between_pos + 7..];
1269    let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") else {
1270        return false;
1271    };
1272    let lo_ref = range_part[..and_pos].trim();
1273    let hi_ref = range_part[and_pos + 5..].trim();
1274    let lo = expr_attr_values.get(lo_ref);
1275    let hi = expr_attr_values.get(hi_ref);
1276    let actual = item.get(&attr_name);
1277    match (actual, lo, hi) {
1278        (Some(a), Some(l), Some(h)) => {
1279            compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
1280                && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
1281        }
1282        _ => false,
1283    }
1284}
1285
1286/// `attr <op> :val` — six operators (`=`, `<>`, `<`, `>`, `<=`, `>=`).
1287/// Multi-character operators come first in the search list so that `<=`
1288/// is not mistakenly matched as `<`.
1289fn key_cond_simple_comparison(
1290    part: &str,
1291    item: &HashMap<String, AttributeValue>,
1292    expr_attr_names: &HashMap<String, String>,
1293    expr_attr_values: &HashMap<String, Value>,
1294) -> bool {
1295    for op in &["<=", ">=", "<>", "=", "<", ">"] {
1296        let Some(pos) = part.find(op) else {
1297            continue;
1298        };
1299        let left = part[..pos].trim();
1300        let right = part[pos + op.len()..].trim();
1301        let actual_owned = resolve_path(left, item, expr_attr_names);
1302        let actual = actual_owned.as_ref();
1303        let expected = expr_attr_values.get(right);
1304
1305        return match *op {
1306            "=" => actual == expected,
1307            "<>" => actual != expected,
1308            "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
1309            ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
1310            "<=" => {
1311                let cmp = compare_attribute_values(actual, expected);
1312                cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
1313            }
1314            ">=" => {
1315                let cmp = compare_attribute_values(actual, expected);
1316                cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
1317            }
1318            _ => false,
1319        };
1320    }
1321    false
1322}
1323
1324/// Returns the "size" of a DynamoDB attribute value per AWS docs:
1325/// - S → character count
1326/// - B → decoded byte count
1327/// - SS/NS/BS → element count
1328/// - L → element count
1329/// - M → element count
1330///
1331/// `size()` is not valid on N, BOOL, or NULL per AWS; returns None for those so
1332/// the enclosing comparison evaluates to false (matching AWS's behavior of
1333/// silently filtering type-mismatched rows in FilterExpression context).
1334fn attribute_size(val: &Value) -> Option<usize> {
1335    if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
1336        return Some(s.len());
1337    }
1338    if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
1339        // B is base64-encoded — return decoded byte count
1340        let decoded_len = base64::engine::general_purpose::STANDARD
1341            .decode(b)
1342            .map(|v| v.len())
1343            .unwrap_or(b.len());
1344        return Some(decoded_len);
1345    }
1346    if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
1347        return Some(arr.len());
1348    }
1349    if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
1350        return Some(arr.len());
1351    }
1352    if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
1353        return Some(arr.len());
1354    }
1355    if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
1356        return Some(arr.len());
1357    }
1358    if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
1359        return Some(obj.len());
1360    }
1361    None
1362}
1363
1364/// Evaluate a `size(path) op :val` comparison expression.
1365fn evaluate_size_comparison(
1366    part: &str,
1367    item: &HashMap<String, AttributeValue>,
1368    expr_attr_names: &HashMap<String, String>,
1369    expr_attr_values: &HashMap<String, Value>,
1370) -> Option<bool> {
1371    // Find the closing paren of size(...)
1372    let open = part.find('(')?;
1373    let close = part[open..].find(')')? + open;
1374    let path = part[open + 1..close].trim();
1375    let remainder = part[close + 1..].trim();
1376
1377    // Parse operator and value ref
1378    let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
1379        ("<=", rest.trim())
1380    } else if let Some(rest) = remainder.strip_prefix(">=") {
1381        (">=", rest.trim())
1382    } else if let Some(rest) = remainder.strip_prefix("<>") {
1383        ("<>", rest.trim())
1384    } else if let Some(rest) = remainder.strip_prefix('<') {
1385        ("<", rest.trim())
1386    } else if let Some(rest) = remainder.strip_prefix('>') {
1387        (">", rest.trim())
1388    } else if let Some(rest) = remainder.strip_prefix('=') {
1389        ("=", rest.trim())
1390    } else {
1391        return None;
1392    };
1393
1394    let actual_owned = resolve_path(path, item, expr_attr_names)?;
1395    let size = attribute_size(&actual_owned)? as f64;
1396
1397    let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
1398
1399    Some(match op {
1400        "=" => (size - expected).abs() < f64::EPSILON,
1401        "<>" => (size - expected).abs() >= f64::EPSILON,
1402        "<" => size < expected,
1403        ">" => size > expected,
1404        "<=" => size <= expected,
1405        ">=" => size >= expected,
1406        _ => false,
1407    })
1408}
1409
1410fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
1411    match (a, b) {
1412        (None, None) => std::cmp::Ordering::Equal,
1413        (None, Some(_)) => std::cmp::Ordering::Less,
1414        (Some(_), None) => std::cmp::Ordering::Greater,
1415        (Some(a), Some(b)) => {
1416            let a_type = attribute_type_and_value(a);
1417            let b_type = attribute_type_and_value(b);
1418            match (a_type, b_type) {
1419                (Some(("S", a_val)), Some(("S", b_val))) => {
1420                    let a_str = a_val.as_str().unwrap_or("");
1421                    let b_str = b_val.as_str().unwrap_or("");
1422                    a_str.cmp(b_str)
1423                }
1424                (Some(("N", a_val)), Some(("N", b_val))) => {
1425                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1426                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1427                    a_num
1428                        .partial_cmp(&b_num)
1429                        .unwrap_or(std::cmp::Ordering::Equal)
1430                }
1431                (Some(("B", a_val)), Some(("B", b_val))) => {
1432                    let a_str = a_val.as_str().unwrap_or("");
1433                    let b_str = b_val.as_str().unwrap_or("");
1434                    a_str.cmp(b_str)
1435                }
1436                _ => std::cmp::Ordering::Equal,
1437            }
1438        }
1439    }
1440}
1441
1442fn evaluate_filter_expression(
1443    expr: &str,
1444    item: &HashMap<String, AttributeValue>,
1445    expr_attr_names: &HashMap<String, String>,
1446    expr_attr_values: &HashMap<String, Value>,
1447) -> bool {
1448    let trimmed = expr.trim();
1449
1450    // Split on OR first (lower precedence), respecting parentheses
1451    let or_parts = split_on_or(trimmed);
1452    if or_parts.len() > 1 {
1453        return or_parts.iter().any(|part| {
1454            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1455        });
1456    }
1457
1458    // Then split on AND (higher precedence), respecting parentheses
1459    let and_parts = split_on_and(trimmed);
1460    if and_parts.len() > 1 {
1461        return and_parts.iter().all(|part| {
1462            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1463        });
1464    }
1465
1466    // Strip outer parentheses if present
1467    let stripped = strip_outer_parens(trimmed);
1468    if stripped != trimmed {
1469        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
1470    }
1471
1472    // Handle NOT prefix (case-insensitive)
1473    if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
1474        return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
1475    }
1476
1477    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
1478}
1479
1480/// Strip matching outer parentheses from an expression.
1481fn strip_outer_parens(expr: &str) -> &str {
1482    let trimmed = expr.trim();
1483    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
1484        return trimmed;
1485    }
1486    // Verify the outer parens actually match each other
1487    let inner = &trimmed[1..trimmed.len() - 1];
1488    let mut depth = 0;
1489    for ch in inner.bytes() {
1490        match ch {
1491            b'(' => depth += 1,
1492            b')' => {
1493                if depth == 0 {
1494                    return trimmed; // closing paren matches something inside, not the outer one
1495                }
1496                depth -= 1;
1497            }
1498            _ => {}
1499        }
1500    }
1501    if depth == 0 {
1502        inner
1503    } else {
1504        trimmed
1505    }
1506}
1507
1508fn evaluate_single_filter_condition(
1509    part: &str,
1510    item: &HashMap<String, AttributeValue>,
1511    expr_attr_names: &HashMap<String, String>,
1512    expr_attr_values: &HashMap<String, Value>,
1513) -> bool {
1514    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
1515        return resolve_path(inner, item, expr_attr_names).is_some();
1516    }
1517
1518    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
1519        return resolve_path(inner, item, expr_attr_names).is_none();
1520    }
1521
1522    if let Some(rest) = part
1523        .strip_prefix("begins_with(")
1524        .or_else(|| part.strip_prefix("begins_with ("))
1525    {
1526        return eval_begins_with(rest, item, expr_attr_names, expr_attr_values);
1527    }
1528
1529    if let Some(rest) = part
1530        .strip_prefix("contains(")
1531        .or_else(|| part.strip_prefix("contains ("))
1532    {
1533        return eval_contains(rest, item, expr_attr_names, expr_attr_values);
1534    }
1535
1536    if part.starts_with("size(") || part.starts_with("size (") {
1537        if let Some(result) =
1538            evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
1539        {
1540            return result;
1541        }
1542    }
1543
1544    if let Some(rest) = part
1545        .strip_prefix("attribute_type(")
1546        .or_else(|| part.strip_prefix("attribute_type ("))
1547    {
1548        return eval_attribute_type(rest, item, expr_attr_names, expr_attr_values);
1549    }
1550
1551    if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
1552        let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
1553        let actual = item.get(&attr_name);
1554        return evaluate_in_match(actual, &value_refs, expr_attr_values);
1555    }
1556
1557    evaluate_single_key_condition(part, item, expr_attr_names, expr_attr_values)
1558}
1559
1560/// `begins_with(path, :val)` — only S (string) operands. Returns false on
1561/// any parse failure or type mismatch (this is the same shape DynamoDB
1562/// returns: a malformed predicate is silently false rather than an error).
1563fn eval_begins_with(
1564    rest: &str,
1565    item: &HashMap<String, AttributeValue>,
1566    expr_attr_names: &HashMap<String, String>,
1567    expr_attr_values: &HashMap<String, Value>,
1568) -> bool {
1569    let Some(inner) = rest.strip_suffix(')') else {
1570        return false;
1571    };
1572    let mut split = inner.splitn(2, ',');
1573    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1574        return false;
1575    };
1576    let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
1577    let expected = expr_attr_values.get(val_ref.trim());
1578    match (actual.as_ref(), expected) {
1579        (Some(a), Some(e)) => {
1580            let a_str = a.get("S").and_then(|v| v.as_str());
1581            let e_str = e.get("S").and_then(|v| v.as_str());
1582            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
1583        }
1584        _ => false,
1585    }
1586}
1587
1588/// `contains(path, :val)` — substring check on S, set membership on
1589/// SS/NS/BS, and element membership on L. Other type pairings return
1590/// false.
1591fn eval_contains(
1592    rest: &str,
1593    item: &HashMap<String, AttributeValue>,
1594    expr_attr_names: &HashMap<String, String>,
1595    expr_attr_values: &HashMap<String, Value>,
1596) -> bool {
1597    let Some(inner) = rest.strip_suffix(')') else {
1598        return false;
1599    };
1600    let mut split = inner.splitn(2, ',');
1601    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1602        return false;
1603    };
1604    let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
1605    let expected = expr_attr_values.get(val_ref.trim());
1606    let (Some(a), Some(e)) = (actual.as_ref(), expected) else {
1607        return false;
1608    };
1609
1610    if let (Some(a_s), Some(e_s)) = (
1611        a.get("S").and_then(|v| v.as_str()),
1612        e.get("S").and_then(|v| v.as_str()),
1613    ) {
1614        return a_s.contains(e_s);
1615    }
1616    if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
1617        if let Some(val) = e.get("S") {
1618            return set.contains(val);
1619        }
1620    }
1621    if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
1622        if let Some(val) = e.get("N") {
1623            return set.contains(val);
1624        }
1625    }
1626    if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
1627        if let Some(val) = e.get("B") {
1628            return set.contains(val);
1629        }
1630    }
1631    if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
1632        return list.contains(e);
1633    }
1634    false
1635}
1636
1637/// `attribute_type(path, :type)` — checks whether the attribute at `path`
1638/// is stored under the wire type identified by `:type` (one of the
1639/// DynamoDB type letters S/N/B/BOOL/NULL/SS/NS/BS/L/M).
1640fn eval_attribute_type(
1641    rest: &str,
1642    item: &HashMap<String, AttributeValue>,
1643    expr_attr_names: &HashMap<String, String>,
1644    expr_attr_values: &HashMap<String, Value>,
1645) -> bool {
1646    let Some(inner) = rest.strip_suffix(')') else {
1647        return false;
1648    };
1649    let mut split = inner.splitn(2, ',');
1650    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1651        return false;
1652    };
1653    let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
1654    let expected_type = expr_attr_values
1655        .get(val_ref.trim())
1656        .and_then(|v| v.get("S"))
1657        .and_then(|v| v.as_str());
1658    let (Some(val), Some(t)) = (actual.as_ref(), expected_type) else {
1659        return false;
1660    };
1661    match t {
1662        "S" => val.get("S").is_some(),
1663        "N" => val.get("N").is_some(),
1664        "B" => val.get("B").is_some(),
1665        "BOOL" => val.get("BOOL").is_some(),
1666        "NULL" => val.get("NULL").is_some(),
1667        "SS" => val.get("SS").is_some(),
1668        "NS" => val.get("NS").is_some(),
1669        "BS" => val.get("BS").is_some(),
1670        "L" => val.get("L").is_some(),
1671        "M" => val.get("M").is_some(),
1672        _ => false,
1673    }
1674}
1675
1676/// Parse an `attr IN (:v1, :v2, ...)` expression. Mirrors the DynamoDB
1677/// ConditionExpression / FilterExpression grammar where IN takes a single
1678/// operand on the left and 1–100 comma-separated value refs inside parens
1679/// on the right. Case-insensitive; tolerates missing spaces after commas
1680/// (aws-sdk-go's `expression` builder emits ", " but hand-built expressions
1681/// often use `strings.Join(..., ",")`). Returns None for non-IN inputs so
1682/// callers can fall through to their other grammar branches.
1683fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
1684    let upper = expr.to_ascii_uppercase();
1685    let in_pos = upper.find(" IN ")?;
1686    let attr_ref = expr[..in_pos].trim();
1687    if attr_ref.is_empty() {
1688        return None;
1689    }
1690    let rest = expr[in_pos + 4..].trim_start();
1691    let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
1692    let values: Vec<&str> = inner
1693        .split(',')
1694        .map(|s| s.trim())
1695        .filter(|s| !s.is_empty())
1696        .collect();
1697    if values.is_empty() {
1698        return None;
1699    }
1700    Some((attr_ref, values))
1701}
1702
1703/// Return true iff `actual` equals any of the `value_refs` resolved through
1704/// `expr_attr_values`. A missing attribute never matches (mirrors AWS, which
1705/// evaluates `IN` against undefined attributes as false).
1706fn evaluate_in_match(
1707    actual: Option<&AttributeValue>,
1708    value_refs: &[&str],
1709    expr_attr_values: &HashMap<String, Value>,
1710) -> bool {
1711    value_refs.iter().any(|v_ref| {
1712        let expected = expr_attr_values.get(*v_ref);
1713        matches!((actual, expected), (Some(a), Some(e)) if a == e)
1714    })
1715}
1716
1717/// One of the four DynamoDB ``UpdateExpression`` action keywords.
1718#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1719enum UpdateAction {
1720    Set,
1721    Remove,
1722    Add,
1723    Delete,
1724}
1725
1726impl UpdateAction {
1727    /// All four keywords as written on the wire — these double as the search
1728    /// terms for ``parse_update_clauses``.
1729    const KEYWORDS: &'static [(&'static str, UpdateAction)] = &[
1730        ("SET", UpdateAction::Set),
1731        ("REMOVE", UpdateAction::Remove),
1732        ("ADD", UpdateAction::Add),
1733        ("DELETE", UpdateAction::Delete),
1734    ];
1735
1736    fn keyword(self) -> &'static str {
1737        match self {
1738            UpdateAction::Set => "SET",
1739            UpdateAction::Remove => "REMOVE",
1740            UpdateAction::Add => "ADD",
1741            UpdateAction::Delete => "DELETE",
1742        }
1743    }
1744}
1745
1746fn apply_update_expression(
1747    item: &mut HashMap<String, AttributeValue>,
1748    expr: &str,
1749    expr_attr_names: &HashMap<String, String>,
1750    expr_attr_values: &HashMap<String, Value>,
1751) -> Result<(), AwsServiceError> {
1752    let clauses = parse_update_clauses(expr);
1753    if clauses.is_empty() && !expr.trim().is_empty() {
1754        return Err(AwsServiceError::aws_error(
1755            StatusCode::BAD_REQUEST,
1756            "ValidationException",
1757            "Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
1758        ));
1759    }
1760    for (action, assignments) in &clauses {
1761        match action {
1762            UpdateAction::Set => {
1763                for assignment in assignments {
1764                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1765                }
1766            }
1767            UpdateAction::Remove => {
1768                for attr_ref in assignments {
1769                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1770                    item.remove(&attr);
1771                }
1772            }
1773            UpdateAction::Add => {
1774                for assignment in assignments {
1775                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1776                }
1777            }
1778            UpdateAction::Delete => {
1779                for assignment in assignments {
1780                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1781                }
1782            }
1783        }
1784    }
1785    Ok(())
1786}
1787
1788fn parse_update_clauses(expr: &str) -> Vec<(UpdateAction, Vec<String>)> {
1789    let mut clauses: Vec<(UpdateAction, Vec<String>)> = Vec::new();
1790    let upper = expr.to_ascii_uppercase();
1791    let mut positions: Vec<(usize, UpdateAction)> = Vec::new();
1792
1793    for &(kw, action) in UpdateAction::KEYWORDS {
1794        let mut search_from = 0;
1795        while let Some(pos) = upper[search_from..].find(kw) {
1796            let abs_pos = search_from + pos;
1797            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
1798            let after_pos = abs_pos + kw.len();
1799            let after_ok =
1800                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
1801            if before_ok && after_ok {
1802                positions.push((abs_pos, action));
1803            }
1804            search_from = abs_pos + kw.len();
1805        }
1806    }
1807
1808    positions.sort_by_key(|(pos, _)| *pos);
1809
1810    for (i, &(pos, action)) in positions.iter().enumerate() {
1811        let start = pos + action.keyword().len();
1812        let end = if i + 1 < positions.len() {
1813            positions[i + 1].0
1814        } else {
1815            expr.len()
1816        };
1817        let content = expr[start..end].trim();
1818        // Use a paren-aware split so that function-call arguments such as
1819        // `list_append(#a, :b)` are kept as a single assignment rather than
1820        // being torn apart at the inner comma.
1821        let assignments: Vec<String> = split_on_top_level_keyword(content, ",")
1822            .into_iter()
1823            .map(|s| s.trim().to_string())
1824            .collect();
1825        clauses.push((action, assignments));
1826    }
1827
1828    clauses
1829}
1830
1831fn apply_set_assignment(
1832    item: &mut HashMap<String, AttributeValue>,
1833    assignment: &str,
1834    expr_attr_names: &HashMap<String, String>,
1835    expr_attr_values: &HashMap<String, Value>,
1836) -> Result<(), AwsServiceError> {
1837    let Some((left, right)) = assignment.split_once('=') else {
1838        return Ok(());
1839    };
1840
1841    let left_trimmed = left.trim();
1842    let right = right.trim();
1843
1844    // One RHS evaluator used for every LHS shape so `SET a.b = a.b + :d`,
1845    // `SET a.b = list_append(a.b, :list)`, and `SET a.b = if_not_exists(a.b, :v)`
1846    // all work against nested paths, not just top-level attributes. The evaluator
1847    // returns Ok(None) when the RHS is a no-op (if_not_exists where the target
1848    // already has a value, or an unresolvable plain reference).
1849    let new_value = evaluate_set_rhs(right, item, expr_attr_names, expr_attr_values)?;
1850
1851    if is_dotted_path(left_trimmed) {
1852        // A None value is a no-op (if_not_exists skip, or unresolvable plain
1853        // ref) — matches top-level SET's silent-skip behavior for the same
1854        // shapes. Structural errors (missing parent map, non-map intermediate)
1855        // surface from assign_nested_path itself.
1856        let Some(v) = new_value else {
1857            return Ok(());
1858        };
1859        return assign_nested_path(item, left_trimmed, expr_attr_names, v);
1860    }
1861
1862    // Split off a trailing `[N]` list-index suffix so we can resolve the
1863    // attribute name ref on its own. Without this, `resolve_attr_name` sees
1864    // "#items[0]" as a whole and misses the `#items` → `items` mapping.
1865    let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
1866        Some((name, idx)) => (name, Some(idx)),
1867        None => (left_trimmed, None),
1868    };
1869    let attr = resolve_attr_name(attr_ref, expr_attr_names);
1870
1871    let Some(v) = new_value else {
1872        return Ok(());
1873    };
1874    match list_index {
1875        Some(idx) => assign_list_index(item, &attr, idx, v),
1876        None => {
1877            item.insert(attr, v);
1878            Ok(())
1879        }
1880    }
1881}
1882
1883/// Evaluate the RHS of a `SET` assignment without writing it anywhere.
1884/// Returns `Ok(Some(value))` with the computed value, `Ok(None)` for
1885/// no-op cases (if_not_exists where the target already has a value, or
1886/// an unresolvable plain reference in dotted-path context), or
1887/// `Err(ValidationException)` for type-mismatched arithmetic.
1888fn evaluate_set_rhs(
1889    right: &str,
1890    item: &HashMap<String, AttributeValue>,
1891    expr_attr_names: &HashMap<String, String>,
1892    expr_attr_values: &HashMap<String, Value>,
1893) -> Result<Option<Value>, AwsServiceError> {
1894    if let Some(rest) = right
1895        .strip_prefix("if_not_exists(")
1896        .or_else(|| right.strip_prefix("if_not_exists ("))
1897    {
1898        return Ok(evaluate_if_not_exists_rhs(
1899            rest,
1900            item,
1901            expr_attr_names,
1902            expr_attr_values,
1903        ));
1904    }
1905
1906    if let Some(rest) = right
1907        .strip_prefix("list_append(")
1908        .or_else(|| right.strip_prefix("list_append ("))
1909    {
1910        return Ok(evaluate_list_append_rhs(
1911            rest,
1912            item,
1913            expr_attr_names,
1914            expr_attr_values,
1915        ));
1916    }
1917
1918    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
1919        return evaluate_arithmetic_rhs(
1920            arith_left,
1921            arith_right,
1922            is_add,
1923            item,
1924            expr_attr_names,
1925            expr_attr_values,
1926        );
1927    }
1928
1929    Ok(resolve_ref_or_path(
1930        right,
1931        item,
1932        expr_attr_names,
1933        expr_attr_values,
1934    ))
1935}
1936
1937/// `if_not_exists(path, :val)` — evaluates to nothing when `path` already
1938/// resolves to a value, and to the default ref otherwise. `path` may be a
1939/// top-level attribute, a placeholder, or a dotted path inside an M-typed
1940/// attribute.
1941fn evaluate_if_not_exists_rhs(
1942    rest: &str,
1943    item: &HashMap<String, AttributeValue>,
1944    expr_attr_names: &HashMap<String, String>,
1945    expr_attr_values: &HashMap<String, Value>,
1946) -> Option<Value> {
1947    let inner = rest.strip_suffix(')')?;
1948    let mut split = inner.splitn(2, ',');
1949    let (check, default) = (split.next()?, split.next()?);
1950    if resolve_ref_or_path(check.trim(), item, expr_attr_names, expr_attr_values).is_some() {
1951        return None;
1952    }
1953    resolve_ref_or_path(default.trim(), item, expr_attr_names, expr_attr_values)
1954}
1955
1956/// `list_append(a, b)` — concatenate the L arrays of two list operands.
1957/// Either operand may be missing or non-list, in which case it contributes
1958/// nothing. Both operands may be value refs (`:list`) or document paths
1959/// (top-level or dotted).
1960fn evaluate_list_append_rhs(
1961    rest: &str,
1962    item: &HashMap<String, AttributeValue>,
1963    expr_attr_names: &HashMap<String, String>,
1964    expr_attr_values: &HashMap<String, Value>,
1965) -> Option<Value> {
1966    let inner = rest.strip_suffix(')')?;
1967    let mut split = inner.splitn(2, ',');
1968    let (a_ref, b_ref) = (split.next()?, split.next()?);
1969    let a_val = resolve_ref_or_path(a_ref.trim(), item, expr_attr_names, expr_attr_values);
1970    let b_val = resolve_ref_or_path(b_ref.trim(), item, expr_attr_names, expr_attr_values);
1971
1972    let mut merged = Vec::new();
1973    for v in [&a_val, &b_val].iter().copied().flatten() {
1974        if let Value::Object(obj) = v {
1975            if let Some(Value::Array(arr)) = obj.get("L") {
1976                merged.extend(arr.clone());
1977            }
1978        }
1979    }
1980    Some(json!({ "L": merged }))
1981}
1982
1983/// `<arith_left> +/- <arith_right>` — both operands must resolve to N values
1984/// (or the LHS may be missing, in which case it's treated as 0). Anything
1985/// else is rejected with the same `ValidationException` AWS returns.
1986fn evaluate_arithmetic_rhs(
1987    arith_left: &str,
1988    arith_right: &str,
1989    is_add: bool,
1990    item: &HashMap<String, AttributeValue>,
1991    expr_attr_names: &HashMap<String, String>,
1992    expr_attr_values: &HashMap<String, Value>,
1993) -> Result<Option<Value>, AwsServiceError> {
1994    let left_val = resolve_ref_or_path(arith_left.trim(), item, expr_attr_names, expr_attr_values);
1995    let right_val =
1996        resolve_ref_or_path(arith_right.trim(), item, expr_attr_names, expr_attr_values);
1997
1998    let left_num = match extract_number(&left_val) {
1999        Some(n) => n,
2000        None if left_val.is_some() => {
2001            return Err(AwsServiceError::aws_error(
2002                StatusCode::BAD_REQUEST,
2003                "ValidationException",
2004                "An operand in the update expression has an incorrect data type",
2005            ));
2006        }
2007        None => 0.0,
2008    };
2009    let right_num = extract_number(&right_val).ok_or_else(|| {
2010        AwsServiceError::aws_error(
2011            StatusCode::BAD_REQUEST,
2012            "ValidationException",
2013            "An operand in the update expression has an incorrect data type",
2014        )
2015    })?;
2016
2017    let result = if is_add {
2018        left_num + right_num
2019    } else {
2020        left_num - right_num
2021    };
2022
2023    let num_str = if result == result.trunc() {
2024        format!("{}", result as i64)
2025    } else {
2026        format!("{result}")
2027    };
2028
2029    Ok(Some(json!({ "N": num_str })))
2030}
2031
2032/// Parse a trailing `[N]` list-index suffix off the LHS of a SET assignment.
2033/// Returns the bare attribute reference and the index, or None when the LHS
2034/// is a plain attribute (or a path shape we don't yet support).
2035fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
2036    let path = path.trim();
2037    if !path.ends_with(']') {
2038        return None;
2039    }
2040    let open = path.rfind('[')?;
2041    // Require no further `.` / `[` / `]` inside the bracketed portion and no
2042    // further path segments after — we only handle the single-index case
2043    // `name[N]`, not nested shapes like `a.b[0].c`.
2044    let idx_str = &path[open + 1..path.len() - 1];
2045    let idx: usize = idx_str.parse().ok()?;
2046    let name = &path[..open];
2047    if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
2048        return None;
2049    }
2050    Some((name, idx))
2051}
2052
2053/// Assign a value to a specific index of a `L`-typed attribute. If `idx` is
2054/// within the current list, replaces that slot; if it's at the end, appends.
2055/// AWS rejects writes beyond `len`, so we return a `ValidationException` for
2056/// out-of-range indices and non-list attributes.
2057fn assign_list_index(
2058    item: &mut HashMap<String, AttributeValue>,
2059    attr: &str,
2060    idx: usize,
2061    value: Value,
2062) -> Result<(), AwsServiceError> {
2063    let Some(existing) = item.get_mut(attr) else {
2064        return Err(invalid_document_path());
2065    };
2066    let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
2067        return Err(invalid_document_path());
2068    };
2069    if idx < list.len() {
2070        list[idx] = value;
2071    } else if idx == list.len() {
2072        list.push(value);
2073    } else {
2074        return Err(invalid_document_path());
2075    }
2076    Ok(())
2077}
2078
2079fn invalid_document_path() -> AwsServiceError {
2080    AwsServiceError::aws_error(
2081        StatusCode::BAD_REQUEST,
2082        "ValidationException",
2083        "The document path provided in the update expression is invalid for update",
2084    )
2085}
2086
2087/// Resolve a SET-RHS operand that may be either a value placeholder
2088/// (``:foo``) or a document path (top-level attribute, ``#name``, or a
2089/// dotted path like ``profile.email`` / ``#web.#count``).
2090fn resolve_ref_or_path(
2091    reference: &str,
2092    item: &HashMap<String, AttributeValue>,
2093    expr_attr_names: &HashMap<String, String>,
2094    expr_attr_values: &HashMap<String, Value>,
2095) -> Option<Value> {
2096    let reference = reference.trim();
2097    if reference.starts_with(':') {
2098        return expr_attr_values.get(reference).cloned();
2099    }
2100    resolve_path(reference, item, expr_attr_names)
2101}
2102
2103/// True if `path` targets a nested key inside an M-typed attribute. Bracketed
2104/// list indices (`a[0]`, `a.b[0]`) are not supported by the nested-SET writer.
2105fn is_dotted_path(path: &str) -> bool {
2106    path.contains('.') && !path.contains('[')
2107}
2108
2109/// Write `value` at a dotted path inside an M-typed attribute.
2110///
2111/// Resolves each `#name` segment through `expr_attr_names`. The top-level
2112/// attribute and every intermediate segment must already exist as a Map —
2113/// DynamoDB rejects writes through missing parents with ValidationException.
2114fn assign_nested_path(
2115    item: &mut HashMap<String, AttributeValue>,
2116    path: &str,
2117    expr_attr_names: &HashMap<String, String>,
2118    value: Value,
2119) -> Result<(), AwsServiceError> {
2120    let mut segments: Vec<String> = path
2121        .split('.')
2122        .map(|seg| resolve_attr_name(seg.trim(), expr_attr_names))
2123        .collect();
2124    if segments.len() < 2 {
2125        return Err(invalid_document_path());
2126    }
2127
2128    let leaf = segments.pop().expect("len >= 2");
2129    let top = segments.remove(0);
2130
2131    let top_attr = item.get_mut(&top).ok_or_else(invalid_document_path)?;
2132    let mut current = top_attr
2133        .get_mut("M")
2134        .and_then(|m| m.as_object_mut())
2135        .ok_or_else(invalid_document_path)?;
2136
2137    for seg in &segments {
2138        current = current
2139            .get_mut(seg)
2140            .and_then(|v| v.get_mut("M"))
2141            .and_then(|m| m.as_object_mut())
2142            .ok_or_else(invalid_document_path)?;
2143    }
2144
2145    current.insert(leaf, value);
2146    Ok(())
2147}
2148
2149fn extract_number(val: &Option<Value>) -> Option<f64> {
2150    val.as_ref()
2151        .and_then(|v| v.get("N"))
2152        .and_then(|n| n.as_str())
2153        .and_then(|s| s.parse().ok())
2154}
2155
2156fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
2157    let mut depth = 0;
2158    for (i, c) in expr.char_indices() {
2159        match c {
2160            '(' => depth += 1,
2161            ')' => depth -= 1,
2162            '+' if depth == 0 && i > 0 => {
2163                return Some((&expr[..i], &expr[i + 1..], true));
2164            }
2165            '-' if depth == 0 && i > 0 => {
2166                return Some((&expr[..i], &expr[i + 1..], false));
2167            }
2168            _ => {}
2169        }
2170    }
2171    None
2172}
2173
2174fn apply_add_assignment(
2175    item: &mut HashMap<String, AttributeValue>,
2176    assignment: &str,
2177    expr_attr_names: &HashMap<String, String>,
2178    expr_attr_values: &HashMap<String, Value>,
2179) -> Result<(), AwsServiceError> {
2180    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
2181    if parts.len() != 2 {
2182        return Ok(());
2183    }
2184
2185    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
2186    let val_ref = parts[1].trim();
2187    let add_val = expr_attr_values.get(val_ref);
2188
2189    if let Some(add_val) = add_val {
2190        if let Some(existing) = item.get(&attr) {
2191            if let (Some(existing_num), Some(add_num)) = (
2192                extract_number(&Some(existing.clone())),
2193                extract_number(&Some(add_val.clone())),
2194            ) {
2195                let result = existing_num + add_num;
2196                let num_str = if result == result.trunc() {
2197                    format!("{}", result as i64)
2198                } else {
2199                    format!("{result}")
2200                };
2201                item.insert(attr, json!({"N": num_str}));
2202            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
2203                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
2204                    let mut merged: Vec<Value> = existing_set.clone();
2205                    for v in add_set {
2206                        if !merged.contains(v) {
2207                            merged.push(v.clone());
2208                        }
2209                    }
2210                    item.insert(attr, json!({"SS": merged}));
2211                }
2212            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
2213                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
2214                    let mut merged: Vec<Value> = existing_set.clone();
2215                    for v in add_set {
2216                        if !merged.contains(v) {
2217                            merged.push(v.clone());
2218                        }
2219                    }
2220                    item.insert(attr, json!({"NS": merged}));
2221                }
2222            } else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
2223                if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
2224                    let mut merged: Vec<Value> = existing_set.clone();
2225                    for v in add_set {
2226                        if !merged.contains(v) {
2227                            merged.push(v.clone());
2228                        }
2229                    }
2230                    item.insert(attr, json!({"BS": merged}));
2231                }
2232            }
2233        } else {
2234            item.insert(attr, add_val.clone());
2235        }
2236    }
2237
2238    Ok(())
2239}
2240
2241fn apply_delete_assignment(
2242    item: &mut HashMap<String, AttributeValue>,
2243    assignment: &str,
2244    expr_attr_names: &HashMap<String, String>,
2245    expr_attr_values: &HashMap<String, Value>,
2246) -> Result<(), AwsServiceError> {
2247    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
2248    if parts.len() != 2 {
2249        return Ok(());
2250    }
2251
2252    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
2253    let val_ref = parts[1].trim();
2254    let del_val = expr_attr_values.get(val_ref);
2255
2256    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
2257        if let (Some(existing_set), Some(del_set)) = (
2258            existing.get("SS").and_then(|v| v.as_array()),
2259            del_val.get("SS").and_then(|v| v.as_array()),
2260        ) {
2261            let filtered: Vec<Value> = existing_set
2262                .iter()
2263                .filter(|v| !del_set.contains(v))
2264                .cloned()
2265                .collect();
2266            if filtered.is_empty() {
2267                item.remove(&attr);
2268            } else {
2269                item.insert(attr, json!({"SS": filtered}));
2270            }
2271        } else if let (Some(existing_set), Some(del_set)) = (
2272            existing.get("NS").and_then(|v| v.as_array()),
2273            del_val.get("NS").and_then(|v| v.as_array()),
2274        ) {
2275            let filtered: Vec<Value> = existing_set
2276                .iter()
2277                .filter(|v| !del_set.contains(v))
2278                .cloned()
2279                .collect();
2280            if filtered.is_empty() {
2281                item.remove(&attr);
2282            } else {
2283                item.insert(attr, json!({"NS": filtered}));
2284            }
2285        } else if let (Some(existing_set), Some(del_set)) = (
2286            existing.get("BS").and_then(|v| v.as_array()),
2287            del_val.get("BS").and_then(|v| v.as_array()),
2288        ) {
2289            let filtered: Vec<Value> = existing_set
2290                .iter()
2291                .filter(|v| !del_set.contains(v))
2292                .cloned()
2293                .collect();
2294            if filtered.is_empty() {
2295                item.remove(&attr);
2296            } else {
2297                item.insert(attr, json!({"BS": filtered}));
2298            }
2299        }
2300    }
2301
2302    Ok(())
2303}
2304
2305pub(super) struct TableDescriptionInput<'a> {
2306    pub arn: &'a str,
2307    pub table_id: &'a str,
2308    pub key_schema: &'a [KeySchemaElement],
2309    pub attribute_definitions: &'a [AttributeDefinition],
2310    pub provisioned_throughput: &'a ProvisionedThroughput,
2311    pub gsi: &'a [GlobalSecondaryIndex],
2312    pub lsi: &'a [LocalSecondaryIndex],
2313    pub billing_mode: &'a str,
2314    pub created_at: chrono::DateTime<chrono::Utc>,
2315    pub item_count: i64,
2316    pub size_bytes: i64,
2317    pub status: &'a str,
2318    pub deletion_protection_enabled: bool,
2319    pub on_demand_throughput: Option<&'a crate::state::OnDemandThroughput>,
2320}
2321
2322fn build_table_description_json(input: &TableDescriptionInput<'_>) -> Value {
2323    let TableDescriptionInput {
2324        arn,
2325        table_id,
2326        key_schema,
2327        attribute_definitions,
2328        provisioned_throughput,
2329        gsi,
2330        lsi,
2331        billing_mode,
2332        created_at,
2333        item_count,
2334        size_bytes,
2335        status,
2336        deletion_protection_enabled,
2337        on_demand_throughput,
2338    } = *input;
2339    let table_name = arn.rsplit('/').next().unwrap_or("");
2340    let creation_timestamp =
2341        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
2342
2343    let ks: Vec<Value> = key_schema
2344        .iter()
2345        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2346        .collect();
2347
2348    let ad: Vec<Value> = attribute_definitions
2349        .iter()
2350        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
2351        .collect();
2352
2353    let mut desc = json!({
2354        "TableName": table_name,
2355        "TableArn": arn,
2356        "TableId": table_id,
2357        "TableStatus": status,
2358        "KeySchema": ks,
2359        "AttributeDefinitions": ad,
2360        "CreationDateTime": creation_timestamp,
2361        "ItemCount": item_count,
2362        "TableSizeBytes": size_bytes,
2363        "BillingModeSummary": { "BillingMode": billing_mode },
2364        "DeletionProtectionEnabled": deletion_protection_enabled,
2365    });
2366
2367    if billing_mode != "PAY_PER_REQUEST" {
2368        desc["ProvisionedThroughput"] = json!({
2369            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
2370            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
2371            "NumberOfDecreasesToday": 0,
2372        });
2373    } else {
2374        desc["ProvisionedThroughput"] = json!({
2375            "ReadCapacityUnits": 0,
2376            "WriteCapacityUnits": 0,
2377            "NumberOfDecreasesToday": 0,
2378        });
2379    }
2380
2381    if let Some(odt) = on_demand_throughput {
2382        desc["OnDemandThroughput"] = json!({
2383            "MaxReadRequestUnits": odt.max_read_request_units,
2384            "MaxWriteRequestUnits": odt.max_write_request_units,
2385        });
2386    }
2387
2388    // Terraform's AWS provider now waits on WarmThroughput after CreateTable.
2389    // Real AWS returns an ACTIVE warm throughput object for active tables,
2390    // including PAY_PER_REQUEST tables. Returning null keeps the provider in a
2391    // perpetual "still creating" loop.
2392    if status == "ACTIVE" {
2393        desc["WarmThroughput"] = json!({
2394            "ReadUnitsPerSecond": 0,
2395            "WriteUnitsPerSecond": 0,
2396            "Status": "ACTIVE",
2397        });
2398    }
2399
2400    if !gsi.is_empty() {
2401        let gsi_json: Vec<Value> = gsi
2402            .iter()
2403            .map(|g| {
2404                let gks: Vec<Value> = g
2405                    .key_schema
2406                    .iter()
2407                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2408                    .collect();
2409                let mut idx = json!({
2410                    "IndexName": g.index_name,
2411                    "KeySchema": gks,
2412                    "Projection": { "ProjectionType": g.projection.projection_type },
2413                    "IndexStatus": "ACTIVE",
2414                    "IndexArn": format!("{arn}/index/{}", g.index_name),
2415                    "ItemCount": 0,
2416                    "IndexSizeBytes": 0,
2417                });
2418                if !g.projection.non_key_attributes.is_empty() {
2419                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
2420                }
2421                if let Some(ref pt) = g.provisioned_throughput {
2422                    idx["ProvisionedThroughput"] = json!({
2423                        "ReadCapacityUnits": pt.read_capacity_units,
2424                        "WriteCapacityUnits": pt.write_capacity_units,
2425                        "NumberOfDecreasesToday": 0,
2426                    });
2427                }
2428                if let Some(ref odt) = g.on_demand_throughput {
2429                    idx["OnDemandThroughput"] = json!({
2430                        "MaxReadRequestUnits": odt.max_read_request_units,
2431                        "MaxWriteRequestUnits": odt.max_write_request_units,
2432                    });
2433                }
2434                idx
2435            })
2436            .collect();
2437        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
2438    }
2439
2440    if !lsi.is_empty() {
2441        let lsi_json: Vec<Value> = lsi
2442            .iter()
2443            .map(|l| {
2444                let lks: Vec<Value> = l
2445                    .key_schema
2446                    .iter()
2447                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2448                    .collect();
2449                let mut idx = json!({
2450                    "IndexName": l.index_name,
2451                    "KeySchema": lks,
2452                    "Projection": { "ProjectionType": l.projection.projection_type },
2453                    "IndexArn": format!("{arn}/index/{}", l.index_name),
2454                    "ItemCount": 0,
2455                    "IndexSizeBytes": 0,
2456                });
2457                if !l.projection.non_key_attributes.is_empty() {
2458                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
2459                }
2460                idx
2461            })
2462            .collect();
2463        desc["LocalSecondaryIndexes"] = json!(lsi_json);
2464    }
2465
2466    desc
2467}
2468
2469fn build_table_description(table: &DynamoTable) -> Value {
2470    let mut desc = build_table_description_json(&TableDescriptionInput {
2471        arn: &table.arn,
2472        table_id: &table.table_id,
2473        key_schema: &table.key_schema,
2474        attribute_definitions: &table.attribute_definitions,
2475        provisioned_throughput: &table.provisioned_throughput,
2476        gsi: &table.gsi,
2477        lsi: &table.lsi,
2478        billing_mode: &table.billing_mode,
2479        created_at: table.created_at,
2480        item_count: table.item_count,
2481        size_bytes: table.size_bytes,
2482        status: &table.status,
2483        deletion_protection_enabled: table.deletion_protection_enabled,
2484        on_demand_throughput: table.on_demand_throughput.as_ref(),
2485    });
2486
2487    // `LatestStreamArn` / `LatestStreamLabel` persist after a stream has
2488    // been created, even if streams are currently disabled — real AWS
2489    // keeps them for ~24h post-disable so DescribeTable callers can still
2490    // observe the last active stream. fakecloud keeps them for the
2491    // table's lifetime, which is sufficient for any single test run.
2492    if let Some(ref stream_arn) = table.stream_arn {
2493        desc["LatestStreamArn"] = json!(stream_arn);
2494        desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
2495    }
2496    // The `StreamSpecification` block is only present while streams are
2497    // actively enabled. When absent, the Terraform provider Read falls
2498    // through to the prior `stream_view_type` from its own state rather
2499    // than clearing it, which matches the diff behaviour the upstream
2500    // acceptance tests assert on.
2501    if table.stream_enabled {
2502        if let Some(ref view_type) = table.stream_view_type {
2503            desc["StreamSpecification"] = json!({
2504                "StreamEnabled": true,
2505                "StreamViewType": view_type,
2506            });
2507        }
2508    }
2509
2510    // SSEDescription is only returned when the customer explicitly enabled
2511    // a KMS-backed SSE. Real AWS tables using the default AWS-owned key omit
2512    // this field entirely, and the Terraform provider's Read asserts
2513    // `server_side_encryption.#` == 0 in that case.
2514    if let Some(ref sse_type) = table.sse_type {
2515        let mut sse_desc = json!({
2516            "Status": "ENABLED",
2517            "SSEType": sse_type,
2518        });
2519        if let Some(ref key_arn) = table.sse_kms_key_arn {
2520            sse_desc["KMSMasterKeyArn"] = json!(key_arn);
2521        }
2522        desc["SSEDescription"] = sse_desc;
2523    }
2524
2525    desc
2526}
2527
2528fn execute_partiql_statement(
2529    state: &SharedDynamoDbState,
2530    statement: &str,
2531    parameters: &[Value],
2532) -> Result<AwsResponse, AwsServiceError> {
2533    let trimmed = statement.trim();
2534    let upper = trimmed.to_ascii_uppercase();
2535
2536    if upper.starts_with("SELECT") {
2537        execute_partiql_select(state, trimmed, parameters)
2538    } else if upper.starts_with("INSERT") {
2539        execute_partiql_insert(state, trimmed, parameters)
2540    } else if upper.starts_with("UPDATE") {
2541        execute_partiql_update(state, trimmed, parameters)
2542    } else if upper.starts_with("DELETE") {
2543        execute_partiql_delete(state, trimmed, parameters)
2544    } else {
2545        Err(AwsServiceError::aws_error(
2546            StatusCode::BAD_REQUEST,
2547            "ValidationException",
2548            format!("Unsupported PartiQL statement: {trimmed}"),
2549        ))
2550    }
2551}
2552
2553/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
2554fn execute_partiql_select(
2555    state: &SharedDynamoDbState,
2556    statement: &str,
2557    parameters: &[Value],
2558) -> Result<AwsResponse, AwsServiceError> {
2559    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
2560    let upper = statement.to_ascii_uppercase();
2561    let from_pos = upper.find("FROM").ok_or_else(|| {
2562        AwsServiceError::aws_error(
2563            StatusCode::BAD_REQUEST,
2564            "ValidationException",
2565            "Invalid SELECT statement: missing FROM",
2566        )
2567    })?;
2568
2569    let after_from = statement[from_pos + 4..].trim();
2570    let (table_name, rest) = parse_partiql_table_name(after_from);
2571
2572    let __mas = state.read();
2573    let state = __mas.default_ref();
2574    let table = get_table(&state.tables, &table_name)?;
2575
2576    let rest_upper = rest.trim().to_ascii_uppercase();
2577    if rest_upper.starts_with("WHERE") {
2578        let where_clause = rest.trim()[5..].trim();
2579        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
2580        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
2581        DynamoDbService::ok_json(json!({ "Items": items }))
2582    } else {
2583        // No WHERE, return all items
2584        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
2585        DynamoDbService::ok_json(json!({ "Items": items }))
2586    }
2587}
2588
2589fn execute_partiql_insert(
2590    state: &SharedDynamoDbState,
2591    statement: &str,
2592    parameters: &[Value],
2593) -> Result<AwsResponse, AwsServiceError> {
2594    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
2595    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
2596    let upper = statement.to_ascii_uppercase();
2597    let into_pos = upper.find("INTO").ok_or_else(|| {
2598        AwsServiceError::aws_error(
2599            StatusCode::BAD_REQUEST,
2600            "ValidationException",
2601            "Invalid INSERT statement: missing INTO",
2602        )
2603    })?;
2604
2605    let after_into = statement[into_pos + 4..].trim();
2606    let (table_name, rest) = parse_partiql_table_name(after_into);
2607
2608    let rest_upper = rest.trim().to_ascii_uppercase();
2609    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
2610        AwsServiceError::aws_error(
2611            StatusCode::BAD_REQUEST,
2612            "ValidationException",
2613            "Invalid INSERT statement: missing VALUE",
2614        )
2615    })?;
2616
2617    let value_str = rest.trim()[value_pos + 5..].trim();
2618    let item = parse_partiql_value_object(value_str, parameters)?;
2619
2620    let mut __mas = state.write();
2621    let state = __mas.default_mut();
2622    let table = get_table_mut(&mut state.tables, &table_name)?;
2623    let key = extract_key(table, &item);
2624    if table.find_item_index(&key).is_some() {
2625        // DynamoDB PartiQL INSERT fails if item exists
2626        return Err(AwsServiceError::aws_error(
2627            StatusCode::BAD_REQUEST,
2628            "DuplicateItemException",
2629            "Duplicate primary key exists in table",
2630        ));
2631    } else {
2632        table.items.push(item);
2633    }
2634    table.recalculate_stats();
2635
2636    DynamoDbService::ok_json(json!({}))
2637}
2638
2639fn execute_partiql_update(
2640    state: &SharedDynamoDbState,
2641    statement: &str,
2642    parameters: &[Value],
2643) -> Result<AwsResponse, AwsServiceError> {
2644    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
2645    // or: UPDATE "tablename" SET attr=? WHERE pk=?
2646    let after_update = statement[6..].trim(); // skip "UPDATE"
2647    let (table_name, rest) = parse_partiql_table_name(after_update);
2648
2649    let rest_upper = rest.trim().to_ascii_uppercase();
2650    let set_pos = rest_upper.find("SET").ok_or_else(|| {
2651        AwsServiceError::aws_error(
2652            StatusCode::BAD_REQUEST,
2653            "ValidationException",
2654            "Invalid UPDATE statement: missing SET",
2655        )
2656    })?;
2657
2658    let after_set = rest.trim()[set_pos + 3..].trim();
2659
2660    // Split on WHERE
2661    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
2662    let (set_clause, where_clause) = if let Some(wp) = where_pos {
2663        (&after_set[..wp], after_set[wp + 5..].trim())
2664    } else {
2665        (after_set, "")
2666    };
2667
2668    let mut __mas = state.write();
2669    let state = __mas.default_mut();
2670    let table = get_table_mut(&mut state.tables, &table_name)?;
2671
2672    let matched_indices = if !where_clause.is_empty() {
2673        find_partiql_where_indices(table, where_clause, parameters)?
2674    } else {
2675        (0..table.items.len()).collect()
2676    };
2677
2678    // Parse SET assignments: attr=value, attr2=value2
2679    let param_offset = count_params_in_str(where_clause);
2680    let assignments: Vec<&str> = set_clause.split(',').collect();
2681    for idx in &matched_indices {
2682        let mut local_offset = param_offset;
2683        for assignment in &assignments {
2684            let assignment = assignment.trim();
2685            if let Some((attr, val_str)) = assignment.split_once('=') {
2686                let attr = attr.trim().trim_matches('"');
2687                let val_str = val_str.trim();
2688                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
2689                if let Some(v) = value {
2690                    table.items[*idx].insert(attr.to_string(), v);
2691                }
2692            }
2693        }
2694    }
2695    table.recalculate_stats();
2696
2697    DynamoDbService::ok_json(json!({}))
2698}
2699
2700fn execute_partiql_delete(
2701    state: &SharedDynamoDbState,
2702    statement: &str,
2703    parameters: &[Value],
2704) -> Result<AwsResponse, AwsServiceError> {
2705    // Pattern: DELETE FROM "tablename" WHERE pk='val'
2706    let upper = statement.to_ascii_uppercase();
2707    let from_pos = upper.find("FROM").ok_or_else(|| {
2708        AwsServiceError::aws_error(
2709            StatusCode::BAD_REQUEST,
2710            "ValidationException",
2711            "Invalid DELETE statement: missing FROM",
2712        )
2713    })?;
2714
2715    let after_from = statement[from_pos + 4..].trim();
2716    let (table_name, rest) = parse_partiql_table_name(after_from);
2717
2718    let rest_upper = rest.trim().to_ascii_uppercase();
2719    if !rest_upper.starts_with("WHERE") {
2720        return Err(AwsServiceError::aws_error(
2721            StatusCode::BAD_REQUEST,
2722            "ValidationException",
2723            "DELETE requires a WHERE clause",
2724        ));
2725    }
2726    let where_clause = rest.trim()[5..].trim();
2727
2728    let mut __mas = state.write();
2729    let state = __mas.default_mut();
2730    let table = get_table_mut(&mut state.tables, &table_name)?;
2731
2732    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
2733    // Remove from highest index first to avoid invalidating lower indices
2734    indices.sort_unstable();
2735    indices.reverse();
2736    for idx in indices {
2737        table.items.remove(idx);
2738    }
2739    table.recalculate_stats();
2740
2741    DynamoDbService::ok_json(json!({}))
2742}
2743
2744/// Parse a table name that may be quoted with double quotes.
2745/// Returns (table_name, rest_of_string).
2746fn parse_partiql_table_name(s: &str) -> (String, &str) {
2747    let s = s.trim();
2748    if let Some(stripped) = s.strip_prefix('"') {
2749        // Quoted name
2750        if let Some(end) = stripped.find('"') {
2751            let name = &stripped[..end];
2752            let rest = &stripped[end + 1..];
2753            (name.to_string(), rest)
2754        } else {
2755            let end = s.find(' ').unwrap_or(s.len());
2756            (s[..end].trim_matches('"').to_string(), &s[end..])
2757        }
2758    } else {
2759        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
2760        (s[..end].to_string(), &s[end..])
2761    }
2762}
2763
2764/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
2765/// Returns matching items.
2766fn evaluate_partiql_where<'a>(
2767    table: &'a DynamoTable,
2768    where_clause: &str,
2769    parameters: &[Value],
2770) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
2771    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
2772    Ok(indices.iter().map(|i| &table.items[*i]).collect())
2773}
2774
2775fn find_partiql_where_indices(
2776    table: &DynamoTable,
2777    where_clause: &str,
2778    parameters: &[Value],
2779) -> Result<Vec<usize>, AwsServiceError> {
2780    let conditions = split_partiql_and_clauses(where_clause);
2781    let parsed_conditions = parse_partiql_equality_conditions(&conditions, parameters);
2782
2783    let mut indices = Vec::new();
2784    for (i, item) in table.items.iter().enumerate() {
2785        let all_match = parsed_conditions
2786            .iter()
2787            .all(|(attr, expected)| item.get(attr) == Some(expected));
2788        if all_match {
2789            indices.push(i);
2790        }
2791    }
2792
2793    Ok(indices)
2794}
2795
2796/// Split a PartiQL WHERE clause on case-insensitive ` AND ` boundaries.
2797fn split_partiql_and_clauses(where_clause: &str) -> Vec<&str> {
2798    let upper = where_clause.to_uppercase();
2799    if !upper.contains(" AND ") {
2800        return vec![where_clause.trim()];
2801    }
2802    let mut parts = Vec::new();
2803    let mut last = 0;
2804    for (i, _) in upper.match_indices(" AND ") {
2805        parts.push(where_clause[last..i].trim());
2806        last = i + 5;
2807    }
2808    parts.push(where_clause[last..].trim());
2809    parts
2810}
2811
2812/// Parse each `col = literal` (or `col = ?`) condition into an
2813/// `(attribute_name, expected_AttributeValue)` pair. Conditions that
2814/// don't parse as equality, or whose RHS literal can't be resolved, are
2815/// silently dropped — that mirrors the prior inline behavior.
2816fn parse_partiql_equality_conditions(
2817    conditions: &[&str],
2818    parameters: &[Value],
2819) -> Vec<(String, Value)> {
2820    let mut param_idx = 0usize;
2821    let mut parsed = Vec::new();
2822    for cond in conditions {
2823        let cond = cond.trim();
2824        if let Some((left, right)) = cond.split_once('=') {
2825            let attr = left.trim().trim_matches('"').to_string();
2826            let val_str = right.trim();
2827            if let Some(value) = parse_partiql_literal(val_str, parameters, &mut param_idx) {
2828                parsed.push((attr, value));
2829            }
2830        }
2831    }
2832    parsed
2833}
2834
2835/// Parse a PartiQL literal value. Supports:
2836/// - 'string' -> {"S": "string"}
2837/// - 123 -> {"N": "123"}
2838/// - ? -> parameter from list
2839fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
2840    let s = s.trim();
2841    if s == "?" {
2842        let idx = *param_idx;
2843        *param_idx += 1;
2844        parameters.get(idx).cloned()
2845    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
2846        let inner = &s[1..s.len() - 1];
2847        Some(json!({"S": inner}))
2848    } else if let Ok(n) = s.parse::<f64>() {
2849        let num_str = if n == n.trunc() {
2850            format!("{}", n as i64)
2851        } else {
2852            format!("{n}")
2853        };
2854        Some(json!({"N": num_str}))
2855    } else {
2856        None
2857    }
2858}
2859
2860/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
2861fn parse_partiql_value_object(
2862    s: &str,
2863    parameters: &[Value],
2864) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
2865    let s = s.trim();
2866    let inner = s
2867        .strip_prefix('{')
2868        .and_then(|s| s.strip_suffix('}'))
2869        .ok_or_else(|| {
2870            AwsServiceError::aws_error(
2871                StatusCode::BAD_REQUEST,
2872                "ValidationException",
2873                "Invalid VALUE: expected object literal",
2874            )
2875        })?;
2876
2877    let mut item = HashMap::new();
2878    let mut param_idx = 0usize;
2879
2880    // Simple comma-separated key:value parsing
2881    for pair in split_partiql_pairs(inner) {
2882        let pair = pair.trim();
2883        if pair.is_empty() {
2884            continue;
2885        }
2886        if let Some((key_part, val_part)) = pair.split_once(':') {
2887            let key = key_part
2888                .trim()
2889                .trim_matches('\'')
2890                .trim_matches('"')
2891                .to_string();
2892            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
2893                item.insert(key, val);
2894            }
2895        }
2896    }
2897
2898    Ok(item)
2899}
2900
2901/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
2902fn split_partiql_pairs(s: &str) -> Vec<&str> {
2903    let mut parts = Vec::new();
2904    let mut start = 0;
2905    let mut depth = 0;
2906    let mut in_quote = false;
2907
2908    for (i, c) in s.char_indices() {
2909        match c {
2910            '\'' if !in_quote => in_quote = true,
2911            '\'' if in_quote => in_quote = false,
2912            '{' if !in_quote => depth += 1,
2913            '}' if !in_quote => depth -= 1,
2914            ',' if !in_quote && depth == 0 => {
2915                parts.push(&s[start..i]);
2916                start = i + 1;
2917            }
2918            _ => {}
2919        }
2920    }
2921    parts.push(&s[start..]);
2922    parts
2923}
2924
2925/// Count ? parameters in a string.
2926fn count_params_in_str(s: &str) -> usize {
2927    s.chars().filter(|c| *c == '?').count()
2928}
2929
2930#[cfg(test)]
2931mod tests {
2932    use super::*;
2933    use serde_json::json;
2934
2935    #[test]
2936    fn test_parse_update_clauses_set() {
2937        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
2938        assert_eq!(clauses.len(), 1);
2939        assert_eq!(clauses[0].0, UpdateAction::Set);
2940        assert_eq!(clauses[0].1.len(), 2);
2941    }
2942
2943    #[test]
2944    fn test_parse_update_clauses_set_and_remove() {
2945        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
2946        assert_eq!(clauses.len(), 2);
2947        assert_eq!(clauses[0].0, UpdateAction::Set);
2948        assert_eq!(clauses[1].0, UpdateAction::Remove);
2949    }
2950
2951    #[test]
2952    fn test_parse_update_clauses_list_append_single_assignment() {
2953        // Before fix: naive comma split tore list_append(#0, :0) at the
2954        // inner comma, producing two bogus assignments instead of one.
2955        let clauses = parse_update_clauses("SET #0 = list_append(#0, :0)");
2956        assert_eq!(clauses.len(), 1);
2957        assert_eq!(clauses[0].0, UpdateAction::Set);
2958        assert_eq!(
2959            clauses[0].1.len(),
2960            1,
2961            "list_append(a, b) must be kept as a single assignment, not split at the inner comma"
2962        );
2963    }
2964
2965    #[test]
2966    fn test_parse_update_clauses_list_append_mixed_with_plain_set() {
2967        // list_append assignment followed by a plain SET — the comma between
2968        // the two assignments must still split them, while the comma inside
2969        // the list_append call must not.
2970        let clauses = parse_update_clauses("SET #0 = list_append(#0, :new), #1 = :other");
2971        assert_eq!(clauses.len(), 1);
2972        assert_eq!(clauses[0].0, UpdateAction::Set);
2973        assert_eq!(
2974            clauses[0].1.len(),
2975            2,
2976            "two SET assignments: one list_append and one plain"
2977        );
2978    }
2979
2980    #[test]
2981    fn test_evaluate_key_condition_simple() {
2982        let mut item = HashMap::new();
2983        item.insert("pk".to_string(), json!({"S": "user1"}));
2984        item.insert("sk".to_string(), json!({"S": "order1"}));
2985
2986        let mut expr_values = HashMap::new();
2987        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
2988
2989        assert!(evaluate_key_condition(
2990            "pk = :pk",
2991            &item,
2992            &HashMap::new(),
2993            &expr_values,
2994        ));
2995    }
2996
2997    #[test]
2998    fn test_compare_attribute_values_numbers() {
2999        let a = json!({"N": "10"});
3000        let b = json!({"N": "20"});
3001        assert_eq!(
3002            compare_attribute_values(Some(&a), Some(&b)),
3003            std::cmp::Ordering::Less
3004        );
3005    }
3006
3007    #[test]
3008    fn test_compare_attribute_values_strings() {
3009        let a = json!({"S": "apple"});
3010        let b = json!({"S": "banana"});
3011        assert_eq!(
3012            compare_attribute_values(Some(&a), Some(&b)),
3013            std::cmp::Ordering::Less
3014        );
3015    }
3016
3017    #[test]
3018    fn test_split_on_and() {
3019        let parts = split_on_and("pk = :pk AND sk > :sk");
3020        assert_eq!(parts.len(), 2);
3021        assert_eq!(parts[0].trim(), "pk = :pk");
3022        assert_eq!(parts[1].trim(), "sk > :sk");
3023    }
3024
3025    #[test]
3026    fn test_split_on_and_respects_parentheses() {
3027        // Before fix: split_on_and would split inside the parens
3028        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
3029        // Should NOT split on the AND inside parentheses
3030        assert_eq!(parts.len(), 1);
3031        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
3032    }
3033
3034    #[test]
3035    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
3036        // (a AND b) OR c — should match when c is true but a is false
3037        let mut item = HashMap::new();
3038        item.insert("x".to_string(), json!({"S": "no"}));
3039        item.insert("y".to_string(), json!({"S": "no"}));
3040        item.insert("z".to_string(), json!({"S": "yes"}));
3041
3042        let mut expr_values = HashMap::new();
3043        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
3044
3045        // x=yes AND y=yes => false, but z=yes => true => overall true
3046        let result = evaluate_filter_expression(
3047            "(x = :yes AND y = :yes) OR z = :yes",
3048            &item,
3049            &HashMap::new(),
3050            &expr_values,
3051        );
3052        assert!(result, "should match because z = :yes is true");
3053
3054        // x=yes AND y=yes => false, z=yes => false => overall false
3055        let mut item2 = HashMap::new();
3056        item2.insert("x".to_string(), json!({"S": "no"}));
3057        item2.insert("y".to_string(), json!({"S": "no"}));
3058        item2.insert("z".to_string(), json!({"S": "no"}));
3059
3060        let result2 = evaluate_filter_expression(
3061            "(x = :yes AND y = :yes) OR z = :yes",
3062            &item2,
3063            &HashMap::new(),
3064            &expr_values,
3065        );
3066        assert!(!result2, "should not match because nothing is true");
3067    }
3068
3069    #[test]
3070    fn test_project_item_nested_path() {
3071        // Item with a list attribute containing maps
3072        let mut item = HashMap::new();
3073        item.insert("pk".to_string(), json!({"S": "key1"}));
3074        item.insert(
3075            "data".to_string(),
3076            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
3077        );
3078
3079        let body = json!({
3080            "ProjectionExpression": "data[0].name"
3081        });
3082
3083        let projected = project_item(&item, &body);
3084        // Should contain data[0].name = "Alice", not the entire data[0] element
3085        let name = projected
3086            .get("data")
3087            .and_then(|v| v.get("L"))
3088            .and_then(|v| v.get(0))
3089            .and_then(|v| v.get("M"))
3090            .and_then(|v| v.get("name"))
3091            .and_then(|v| v.get("S"))
3092            .and_then(|v| v.as_str());
3093        assert_eq!(name, Some("Alice"));
3094
3095        // Should NOT contain the "age" field
3096        let age = projected
3097            .get("data")
3098            .and_then(|v| v.get("L"))
3099            .and_then(|v| v.get(0))
3100            .and_then(|v| v.get("M"))
3101            .and_then(|v| v.get("age"));
3102        assert!(age.is_none(), "age should not be present in projection");
3103    }
3104
3105    #[test]
3106    fn test_resolve_nested_path_map() {
3107        let mut item = HashMap::new();
3108        item.insert(
3109            "info".to_string(),
3110            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
3111        );
3112
3113        let result = resolve_nested_path(&item, "info.address.city");
3114        assert_eq!(result, Some(json!({"S": "NYC"})));
3115    }
3116
3117    #[test]
3118    fn test_resolve_nested_path_list_then_map() {
3119        let mut item = HashMap::new();
3120        item.insert(
3121            "items".to_string(),
3122            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
3123        );
3124
3125        let result = resolve_nested_path(&item, "items[0].sku");
3126        assert_eq!(result, Some(json!({"S": "ABC"})));
3127    }
3128
3129    #[test]
3130    fn test_resolve_path_alias_with_dot_is_top_level_attr() {
3131        // Top-level attribute name literally contains a dot; user aliases it
3132        // via ExpressionAttributeNames and references the alias. Must resolve
3133        // to the top-level attribute, NOT be walked as a nested path.
3134        let mut item = HashMap::new();
3135        item.insert("Safety.Warning".to_string(), json!({"S": "high"}));
3136        let mut names = HashMap::new();
3137        names.insert("#sw".to_string(), "Safety.Warning".to_string());
3138
3139        let result = resolve_path("#sw", &item, &names);
3140        assert_eq!(result, Some(json!({"S": "high"})));
3141    }
3142
3143    #[test]
3144    fn test_resolve_path_dotted_expression_still_walks_nested() {
3145        // When the expression itself contains `.`, we still walk the nested
3146        // path (the dot is a path separator, not part of an attribute name).
3147        let mut item = HashMap::new();
3148        item.insert("profile".to_string(), json!({"M": {"email": {"S": "x@y"}}}));
3149        let names = HashMap::new();
3150
3151        let result = resolve_path("profile.email", &item, &names);
3152        assert_eq!(result, Some(json!({"S": "x@y"})));
3153    }
3154
3155    #[test]
3156    fn test_project_item_alias_with_dot_is_top_level_attr() {
3157        // Same invariant must hold for ProjectionExpression.
3158        let mut item = HashMap::new();
3159        item.insert("Safety.Warning".to_string(), json!({"S": "high"}));
3160        item.insert("other".to_string(), json!({"S": "ignored"}));
3161        let body = json!({
3162            "ProjectionExpression": "#sw",
3163            "ExpressionAttributeNames": {"#sw": "Safety.Warning"},
3164        });
3165
3166        let projected = project_item(&item, &body);
3167        assert_eq!(projected.get("Safety.Warning"), Some(&json!({"S": "high"})));
3168        assert!(!projected.contains_key("other"));
3169    }
3170
3171    // -- Integration-style tests using DynamoDbService --
3172
3173    use crate::state::SharedDynamoDbState;
3174    use parking_lot::RwLock;
3175    use std::sync::Arc;
3176
3177    fn make_service() -> DynamoDbService {
3178        let state: SharedDynamoDbState = Arc::new(RwLock::new(
3179            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3180        ));
3181        DynamoDbService::new(state)
3182    }
3183
3184    fn make_request(action: &str, body: Value) -> AwsRequest {
3185        AwsRequest {
3186            service: "dynamodb".to_string(),
3187            action: action.to_string(),
3188            region: "us-east-1".to_string(),
3189            account_id: "123456789012".to_string(),
3190            request_id: "test-id".to_string(),
3191            headers: http::HeaderMap::new(),
3192            query_params: HashMap::new(),
3193            body: serde_json::to_vec(&body).unwrap().into(),
3194            path_segments: vec![],
3195            raw_path: "/".to_string(),
3196            raw_query: String::new(),
3197            method: http::Method::POST,
3198            is_query_protocol: false,
3199            access_key_id: None,
3200            principal: None,
3201        }
3202    }
3203
3204    fn create_test_table(svc: &DynamoDbService) {
3205        let req = make_request(
3206            "CreateTable",
3207            json!({
3208                "TableName": "test-table",
3209                "KeySchema": [
3210                    { "AttributeName": "pk", "KeyType": "HASH" }
3211                ],
3212                "AttributeDefinitions": [
3213                    { "AttributeName": "pk", "AttributeType": "S" }
3214                ],
3215                "BillingMode": "PAY_PER_REQUEST"
3216            }),
3217        );
3218        svc.create_table(&req).unwrap();
3219    }
3220
3221    #[test]
3222    fn describe_table_returns_stable_table_id_and_active_warm_throughput() {
3223        let svc = make_service();
3224        let req = make_request(
3225            "CreateTable",
3226            json!({
3227                "TableName": "warm-throughput-table",
3228                "KeySchema": [
3229                    { "AttributeName": "pk", "KeyType": "HASH" }
3230                ],
3231                "AttributeDefinitions": [
3232                    { "AttributeName": "pk", "AttributeType": "S" }
3233                ],
3234                "BillingMode": "PAY_PER_REQUEST"
3235            }),
3236        );
3237        let create_resp = svc.create_table(&req).unwrap();
3238        let create_body: Value = serde_json::from_slice(create_resp.body.expect_bytes()).unwrap();
3239        let create_table = &create_body["TableDescription"];
3240
3241        assert_eq!(create_table["TableStatus"], "ACTIVE");
3242        assert_eq!(create_table["WarmThroughput"]["Status"], "ACTIVE");
3243        let table_id = create_table["TableId"].as_str().unwrap().to_string();
3244        assert!(!table_id.is_empty());
3245
3246        let describe_req = make_request(
3247            "DescribeTable",
3248            json!({ "TableName": "warm-throughput-table" }),
3249        );
3250        let describe_resp = svc.describe_table(&describe_req).unwrap();
3251        let describe_body: Value =
3252            serde_json::from_slice(describe_resp.body.expect_bytes()).unwrap();
3253        let described_table = &describe_body["Table"];
3254
3255        assert_eq!(described_table["TableStatus"], "ACTIVE");
3256        assert_eq!(described_table["WarmThroughput"]["Status"], "ACTIVE");
3257        assert_eq!(described_table["TableId"], table_id);
3258
3259        let describe_resp_again = svc.describe_table(&describe_req).unwrap();
3260        let describe_body_again: Value =
3261            serde_json::from_slice(describe_resp_again.body.expect_bytes()).unwrap();
3262        assert_eq!(describe_body_again["Table"]["TableId"], table_id);
3263    }
3264
3265    #[test]
3266    fn delete_item_return_values_all_old() {
3267        let svc = make_service();
3268        create_test_table(&svc);
3269
3270        // Put an item
3271        let req = make_request(
3272            "PutItem",
3273            json!({
3274                "TableName": "test-table",
3275                "Item": {
3276                    "pk": { "S": "key1" },
3277                    "name": { "S": "Alice" },
3278                    "age": { "N": "30" }
3279                }
3280            }),
3281        );
3282        svc.put_item(&req).unwrap();
3283
3284        // Delete with ReturnValues=ALL_OLD
3285        let req = make_request(
3286            "DeleteItem",
3287            json!({
3288                "TableName": "test-table",
3289                "Key": { "pk": { "S": "key1" } },
3290                "ReturnValues": "ALL_OLD"
3291            }),
3292        );
3293        let resp = svc.delete_item(&req).unwrap();
3294        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3295
3296        // Verify the old item is returned
3297        let attrs = &body["Attributes"];
3298        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
3299        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
3300        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
3301
3302        // Verify the item is actually deleted
3303        let req = make_request(
3304            "GetItem",
3305            json!({
3306                "TableName": "test-table",
3307                "Key": { "pk": { "S": "key1" } }
3308            }),
3309        );
3310        let resp = svc.get_item(&req).unwrap();
3311        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3312        assert!(body.get("Item").is_none(), "item should be deleted");
3313    }
3314
3315    #[test]
3316    fn transact_get_items_returns_existing_and_missing() {
3317        let svc = make_service();
3318        create_test_table(&svc);
3319
3320        // Put one item
3321        let req = make_request(
3322            "PutItem",
3323            json!({
3324                "TableName": "test-table",
3325                "Item": {
3326                    "pk": { "S": "exists" },
3327                    "val": { "S": "hello" }
3328                }
3329            }),
3330        );
3331        svc.put_item(&req).unwrap();
3332
3333        let req = make_request(
3334            "TransactGetItems",
3335            json!({
3336                "TransactItems": [
3337                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
3338                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
3339                ]
3340            }),
3341        );
3342        let resp = svc.transact_get_items(&req).unwrap();
3343        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3344        let responses = body["Responses"].as_array().unwrap();
3345        assert_eq!(responses.len(), 2);
3346        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
3347        assert!(responses[1].get("Item").is_none());
3348    }
3349
3350    #[test]
3351    fn transact_write_items_put_and_delete() {
3352        let svc = make_service();
3353        create_test_table(&svc);
3354
3355        // Put initial item
3356        let req = make_request(
3357            "PutItem",
3358            json!({
3359                "TableName": "test-table",
3360                "Item": {
3361                    "pk": { "S": "to-delete" },
3362                    "val": { "S": "bye" }
3363                }
3364            }),
3365        );
3366        svc.put_item(&req).unwrap();
3367
3368        // TransactWrite: put new + delete existing
3369        let req = make_request(
3370            "TransactWriteItems",
3371            json!({
3372                "TransactItems": [
3373                    {
3374                        "Put": {
3375                            "TableName": "test-table",
3376                            "Item": {
3377                                "pk": { "S": "new-item" },
3378                                "val": { "S": "hi" }
3379                            }
3380                        }
3381                    },
3382                    {
3383                        "Delete": {
3384                            "TableName": "test-table",
3385                            "Key": { "pk": { "S": "to-delete" } }
3386                        }
3387                    }
3388                ]
3389            }),
3390        );
3391        let resp = svc.transact_write_items(&req).unwrap();
3392        assert_eq!(resp.status, StatusCode::OK);
3393
3394        // Verify new item exists
3395        let req = make_request(
3396            "GetItem",
3397            json!({
3398                "TableName": "test-table",
3399                "Key": { "pk": { "S": "new-item" } }
3400            }),
3401        );
3402        let resp = svc.get_item(&req).unwrap();
3403        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3404        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
3405
3406        // Verify deleted item is gone
3407        let req = make_request(
3408            "GetItem",
3409            json!({
3410                "TableName": "test-table",
3411                "Key": { "pk": { "S": "to-delete" } }
3412            }),
3413        );
3414        let resp = svc.get_item(&req).unwrap();
3415        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3416        assert!(body.get("Item").is_none());
3417    }
3418
3419    #[test]
3420    fn transact_write_items_condition_check_failure() {
3421        let svc = make_service();
3422        create_test_table(&svc);
3423
3424        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
3425        let req = make_request(
3426            "TransactWriteItems",
3427            json!({
3428                "TransactItems": [
3429                    {
3430                        "ConditionCheck": {
3431                            "TableName": "test-table",
3432                            "Key": { "pk": { "S": "nonexistent" } },
3433                            "ConditionExpression": "attribute_exists(pk)"
3434                        }
3435                    }
3436                ]
3437            }),
3438        );
3439        let resp = svc.transact_write_items(&req).unwrap();
3440        // Should be a 400 error response
3441        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
3442        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3443        assert_eq!(
3444            body["__type"].as_str().unwrap(),
3445            "TransactionCanceledException"
3446        );
3447        assert!(body["CancellationReasons"].as_array().is_some());
3448    }
3449
3450    #[test]
3451    fn update_and_describe_time_to_live() {
3452        let svc = make_service();
3453        create_test_table(&svc);
3454
3455        // Enable TTL
3456        let req = make_request(
3457            "UpdateTimeToLive",
3458            json!({
3459                "TableName": "test-table",
3460                "TimeToLiveSpecification": {
3461                    "AttributeName": "ttl",
3462                    "Enabled": true
3463                }
3464            }),
3465        );
3466        let resp = svc.update_time_to_live(&req).unwrap();
3467        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3468        assert_eq!(
3469            body["TimeToLiveSpecification"]["AttributeName"]
3470                .as_str()
3471                .unwrap(),
3472            "ttl"
3473        );
3474        assert!(body["TimeToLiveSpecification"]["Enabled"]
3475            .as_bool()
3476            .unwrap());
3477
3478        // Describe TTL
3479        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3480        let resp = svc.describe_time_to_live(&req).unwrap();
3481        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3482        assert_eq!(
3483            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3484                .as_str()
3485                .unwrap(),
3486            "ENABLED"
3487        );
3488        assert_eq!(
3489            body["TimeToLiveDescription"]["AttributeName"]
3490                .as_str()
3491                .unwrap(),
3492            "ttl"
3493        );
3494
3495        // Disable TTL
3496        let req = make_request(
3497            "UpdateTimeToLive",
3498            json!({
3499                "TableName": "test-table",
3500                "TimeToLiveSpecification": {
3501                    "AttributeName": "ttl",
3502                    "Enabled": false
3503                }
3504            }),
3505        );
3506        svc.update_time_to_live(&req).unwrap();
3507
3508        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3509        let resp = svc.describe_time_to_live(&req).unwrap();
3510        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3511        assert_eq!(
3512            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3513                .as_str()
3514                .unwrap(),
3515            "DISABLED"
3516        );
3517    }
3518
3519    #[test]
3520    fn resource_policy_lifecycle() {
3521        let svc = make_service();
3522        create_test_table(&svc);
3523
3524        let table_arn = {
3525            let __mas = svc.state.read();
3526            let state = __mas.default_ref();
3527            state.tables.get("test-table").unwrap().arn.clone()
3528        };
3529
3530        // Put policy
3531        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
3532        let req = make_request(
3533            "PutResourcePolicy",
3534            json!({
3535                "ResourceArn": table_arn,
3536                "Policy": policy_doc
3537            }),
3538        );
3539        let resp = svc.put_resource_policy(&req).unwrap();
3540        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3541        assert!(body["RevisionId"].as_str().is_some());
3542
3543        // Get policy
3544        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3545        let resp = svc.get_resource_policy(&req).unwrap();
3546        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3547        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
3548
3549        // Delete policy
3550        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
3551        svc.delete_resource_policy(&req).unwrap();
3552
3553        // Get should return null now
3554        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3555        let resp = svc.get_resource_policy(&req).unwrap();
3556        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3557        assert!(body["Policy"].is_null());
3558    }
3559
3560    #[test]
3561    fn describe_endpoints() {
3562        let svc = make_service();
3563        let req = make_request("DescribeEndpoints", json!({}));
3564        let resp = svc.describe_endpoints(&req).unwrap();
3565        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3566        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
3567    }
3568
3569    #[test]
3570    fn describe_limits() {
3571        let svc = make_service();
3572        let req = make_request("DescribeLimits", json!({}));
3573        let resp = svc.describe_limits(&req).unwrap();
3574        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3575        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
3576    }
3577
3578    #[test]
3579    fn backup_lifecycle() {
3580        let svc = make_service();
3581        create_test_table(&svc);
3582
3583        // Create backup
3584        let req = make_request(
3585            "CreateBackup",
3586            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
3587        );
3588        let resp = svc.create_backup(&req).unwrap();
3589        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3590        let backup_arn = body["BackupDetails"]["BackupArn"]
3591            .as_str()
3592            .unwrap()
3593            .to_string();
3594        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
3595
3596        // Describe backup
3597        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
3598        let resp = svc.describe_backup(&req).unwrap();
3599        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3600        assert_eq!(
3601            body["BackupDescription"]["BackupDetails"]["BackupName"],
3602            "my-backup"
3603        );
3604
3605        // List backups
3606        let req = make_request("ListBackups", json!({}));
3607        let resp = svc.list_backups(&req).unwrap();
3608        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3609        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
3610
3611        // Restore from backup
3612        let req = make_request(
3613            "RestoreTableFromBackup",
3614            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
3615        );
3616        svc.restore_table_from_backup(&req).unwrap();
3617
3618        // Verify restored table exists
3619        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
3620        let resp = svc.describe_table(&req).unwrap();
3621        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3622        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3623
3624        // Delete backup
3625        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
3626        svc.delete_backup(&req).unwrap();
3627
3628        // List should be empty
3629        let req = make_request("ListBackups", json!({}));
3630        let resp = svc.list_backups(&req).unwrap();
3631        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3632        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
3633    }
3634
3635    #[test]
3636    fn continuous_backups() {
3637        let svc = make_service();
3638        create_test_table(&svc);
3639
3640        // Initially disabled
3641        let req = make_request(
3642            "DescribeContinuousBackups",
3643            json!({ "TableName": "test-table" }),
3644        );
3645        let resp = svc.describe_continuous_backups(&req).unwrap();
3646        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3647        assert_eq!(
3648            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3649                ["PointInTimeRecoveryStatus"],
3650            "DISABLED"
3651        );
3652
3653        // Enable
3654        let req = make_request(
3655            "UpdateContinuousBackups",
3656            json!({
3657                "TableName": "test-table",
3658                "PointInTimeRecoverySpecification": {
3659                    "PointInTimeRecoveryEnabled": true
3660                }
3661            }),
3662        );
3663        svc.update_continuous_backups(&req).unwrap();
3664
3665        // Verify
3666        let req = make_request(
3667            "DescribeContinuousBackups",
3668            json!({ "TableName": "test-table" }),
3669        );
3670        let resp = svc.describe_continuous_backups(&req).unwrap();
3671        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3672        assert_eq!(
3673            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3674                ["PointInTimeRecoveryStatus"],
3675            "ENABLED"
3676        );
3677    }
3678
3679    #[test]
3680    fn restore_table_to_point_in_time() {
3681        let svc = make_service();
3682        create_test_table(&svc);
3683
3684        let req = make_request(
3685            "RestoreTableToPointInTime",
3686            json!({
3687                "SourceTableName": "test-table",
3688                "TargetTableName": "pitr-restored"
3689            }),
3690        );
3691        svc.restore_table_to_point_in_time(&req).unwrap();
3692
3693        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
3694        let resp = svc.describe_table(&req).unwrap();
3695        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3696        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3697    }
3698
3699    #[test]
3700    fn global_table_lifecycle() {
3701        let svc = make_service();
3702
3703        // Create global table
3704        let req = make_request(
3705            "CreateGlobalTable",
3706            json!({
3707                "GlobalTableName": "my-global",
3708                "ReplicationGroup": [
3709                    { "RegionName": "us-east-1" },
3710                    { "RegionName": "eu-west-1" }
3711                ]
3712            }),
3713        );
3714        let resp = svc.create_global_table(&req).unwrap();
3715        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3716        assert_eq!(
3717            body["GlobalTableDescription"]["GlobalTableStatus"],
3718            "ACTIVE"
3719        );
3720
3721        // Describe
3722        let req = make_request(
3723            "DescribeGlobalTable",
3724            json!({ "GlobalTableName": "my-global" }),
3725        );
3726        let resp = svc.describe_global_table(&req).unwrap();
3727        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3728        assert_eq!(
3729            body["GlobalTableDescription"]["ReplicationGroup"]
3730                .as_array()
3731                .unwrap()
3732                .len(),
3733            2
3734        );
3735
3736        // List
3737        let req = make_request("ListGlobalTables", json!({}));
3738        let resp = svc.list_global_tables(&req).unwrap();
3739        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3740        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
3741
3742        // Update - add a region
3743        let req = make_request(
3744            "UpdateGlobalTable",
3745            json!({
3746                "GlobalTableName": "my-global",
3747                "ReplicaUpdates": [
3748                    { "Create": { "RegionName": "ap-southeast-1" } }
3749                ]
3750            }),
3751        );
3752        let resp = svc.update_global_table(&req).unwrap();
3753        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3754        assert_eq!(
3755            body["GlobalTableDescription"]["ReplicationGroup"]
3756                .as_array()
3757                .unwrap()
3758                .len(),
3759            3
3760        );
3761
3762        // Describe settings
3763        let req = make_request(
3764            "DescribeGlobalTableSettings",
3765            json!({ "GlobalTableName": "my-global" }),
3766        );
3767        let resp = svc.describe_global_table_settings(&req).unwrap();
3768        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3769        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
3770
3771        // Update settings (no-op, just verify no error)
3772        let req = make_request(
3773            "UpdateGlobalTableSettings",
3774            json!({ "GlobalTableName": "my-global" }),
3775        );
3776        svc.update_global_table_settings(&req).unwrap();
3777    }
3778
3779    #[test]
3780    fn table_replica_auto_scaling() {
3781        let svc = make_service();
3782        create_test_table(&svc);
3783
3784        let req = make_request(
3785            "DescribeTableReplicaAutoScaling",
3786            json!({ "TableName": "test-table" }),
3787        );
3788        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
3789        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3790        assert_eq!(
3791            body["TableAutoScalingDescription"]["TableName"],
3792            "test-table"
3793        );
3794
3795        let req = make_request(
3796            "UpdateTableReplicaAutoScaling",
3797            json!({ "TableName": "test-table" }),
3798        );
3799        svc.update_table_replica_auto_scaling(&req).unwrap();
3800    }
3801
3802    #[test]
3803    fn kinesis_streaming_lifecycle() {
3804        let svc = make_service();
3805        create_test_table(&svc);
3806
3807        // Enable
3808        let req = make_request(
3809            "EnableKinesisStreamingDestination",
3810            json!({
3811                "TableName": "test-table",
3812                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3813            }),
3814        );
3815        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
3816        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3817        assert_eq!(body["DestinationStatus"], "ACTIVE");
3818
3819        // Describe
3820        let req = make_request(
3821            "DescribeKinesisStreamingDestination",
3822            json!({ "TableName": "test-table" }),
3823        );
3824        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
3825        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3826        assert_eq!(
3827            body["KinesisDataStreamDestinations"]
3828                .as_array()
3829                .unwrap()
3830                .len(),
3831            1
3832        );
3833
3834        // Update
3835        let req = make_request(
3836            "UpdateKinesisStreamingDestination",
3837            json!({
3838                "TableName": "test-table",
3839                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
3840                "UpdateKinesisStreamingConfiguration": {
3841                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
3842                }
3843            }),
3844        );
3845        svc.update_kinesis_streaming_destination(&req).unwrap();
3846
3847        // Disable
3848        let req = make_request(
3849            "DisableKinesisStreamingDestination",
3850            json!({
3851                "TableName": "test-table",
3852                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3853            }),
3854        );
3855        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
3856        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3857        assert_eq!(body["DestinationStatus"], "DISABLED");
3858    }
3859
3860    #[test]
3861    fn contributor_insights_lifecycle() {
3862        let svc = make_service();
3863        create_test_table(&svc);
3864
3865        // Initially disabled
3866        let req = make_request(
3867            "DescribeContributorInsights",
3868            json!({ "TableName": "test-table" }),
3869        );
3870        let resp = svc.describe_contributor_insights(&req).unwrap();
3871        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3872        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
3873
3874        // Enable
3875        let req = make_request(
3876            "UpdateContributorInsights",
3877            json!({
3878                "TableName": "test-table",
3879                "ContributorInsightsAction": "ENABLE"
3880            }),
3881        );
3882        let resp = svc.update_contributor_insights(&req).unwrap();
3883        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3884        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
3885
3886        // List
3887        let req = make_request("ListContributorInsights", json!({}));
3888        let resp = svc.list_contributor_insights(&req).unwrap();
3889        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3890        assert_eq!(
3891            body["ContributorInsightsSummaries"]
3892                .as_array()
3893                .unwrap()
3894                .len(),
3895            1
3896        );
3897    }
3898
3899    #[test]
3900    fn export_lifecycle() {
3901        let svc = make_service();
3902        create_test_table(&svc);
3903
3904        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
3905
3906        // Export
3907        let req = make_request(
3908            "ExportTableToPointInTime",
3909            json!({
3910                "TableArn": table_arn,
3911                "S3Bucket": "my-bucket"
3912            }),
3913        );
3914        let resp = svc.export_table_to_point_in_time(&req).unwrap();
3915        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3916        let export_arn = body["ExportDescription"]["ExportArn"]
3917            .as_str()
3918            .unwrap()
3919            .to_string();
3920        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
3921
3922        // Describe
3923        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
3924        let resp = svc.describe_export(&req).unwrap();
3925        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3926        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
3927
3928        // List
3929        let req = make_request("ListExports", json!({}));
3930        let resp = svc.list_exports(&req).unwrap();
3931        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3932        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
3933    }
3934
3935    #[test]
3936    fn import_lifecycle() {
3937        let svc = make_service();
3938
3939        let req = make_request(
3940            "ImportTable",
3941            json!({
3942                "InputFormat": "DYNAMODB_JSON",
3943                "S3BucketSource": { "S3Bucket": "import-bucket" },
3944                "TableCreationParameters": {
3945                    "TableName": "imported-table",
3946                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
3947                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
3948                }
3949            }),
3950        );
3951        let resp = svc.import_table(&req).unwrap();
3952        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3953        let import_arn = body["ImportTableDescription"]["ImportArn"]
3954            .as_str()
3955            .unwrap()
3956            .to_string();
3957        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3958
3959        // Describe import
3960        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
3961        let resp = svc.describe_import(&req).unwrap();
3962        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3963        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3964
3965        // List imports
3966        let req = make_request("ListImports", json!({}));
3967        let resp = svc.list_imports(&req).unwrap();
3968        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3969        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
3970
3971        // Verify the table was created
3972        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
3973        let resp = svc.describe_table(&req).unwrap();
3974        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3975        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3976    }
3977
3978    #[test]
3979    fn backup_restore_preserves_items() {
3980        let svc = make_service();
3981        create_test_table(&svc);
3982
3983        // Put 3 items
3984        for i in 1..=3 {
3985            let req = make_request(
3986                "PutItem",
3987                json!({
3988                    "TableName": "test-table",
3989                    "Item": {
3990                        "pk": { "S": format!("key{i}") },
3991                        "data": { "S": format!("value{i}") }
3992                    }
3993                }),
3994            );
3995            svc.put_item(&req).unwrap();
3996        }
3997
3998        // Create backup
3999        let req = make_request(
4000            "CreateBackup",
4001            json!({
4002                "TableName": "test-table",
4003                "BackupName": "my-backup"
4004            }),
4005        );
4006        let resp = svc.create_backup(&req).unwrap();
4007        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4008        let backup_arn = body["BackupDetails"]["BackupArn"]
4009            .as_str()
4010            .unwrap()
4011            .to_string();
4012
4013        // Delete all items from the original table
4014        for i in 1..=3 {
4015            let req = make_request(
4016                "DeleteItem",
4017                json!({
4018                    "TableName": "test-table",
4019                    "Key": { "pk": { "S": format!("key{i}") } }
4020                }),
4021            );
4022            svc.delete_item(&req).unwrap();
4023        }
4024
4025        // Verify original table is empty
4026        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4027        let resp = svc.scan(&req).unwrap();
4028        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4029        assert_eq!(body["Count"], 0);
4030
4031        // Restore from backup
4032        let req = make_request(
4033            "RestoreTableFromBackup",
4034            json!({
4035                "BackupArn": backup_arn,
4036                "TargetTableName": "restored-table"
4037            }),
4038        );
4039        svc.restore_table_from_backup(&req).unwrap();
4040
4041        // Scan restored table — should have 3 items
4042        let req = make_request("Scan", json!({ "TableName": "restored-table" }));
4043        let resp = svc.scan(&req).unwrap();
4044        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4045        assert_eq!(body["Count"], 3);
4046        assert_eq!(body["Items"].as_array().unwrap().len(), 3);
4047    }
4048
4049    #[test]
4050    fn global_table_replicates_writes() {
4051        let svc = make_service();
4052        create_test_table(&svc);
4053
4054        // Create global table with replicas
4055        let req = make_request(
4056            "CreateGlobalTable",
4057            json!({
4058                "GlobalTableName": "test-table",
4059                "ReplicationGroup": [
4060                    { "RegionName": "us-east-1" },
4061                    { "RegionName": "eu-west-1" }
4062                ]
4063            }),
4064        );
4065        let resp = svc.create_global_table(&req).unwrap();
4066        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4067        assert_eq!(
4068            body["GlobalTableDescription"]["GlobalTableStatus"],
4069            "ACTIVE"
4070        );
4071
4072        // Put an item
4073        let req = make_request(
4074            "PutItem",
4075            json!({
4076                "TableName": "test-table",
4077                "Item": {
4078                    "pk": { "S": "replicated-key" },
4079                    "data": { "S": "replicated-value" }
4080                }
4081            }),
4082        );
4083        svc.put_item(&req).unwrap();
4084
4085        // Verify the item is readable (since all replicas share the same table)
4086        let req = make_request(
4087            "GetItem",
4088            json!({
4089                "TableName": "test-table",
4090                "Key": { "pk": { "S": "replicated-key" } }
4091            }),
4092        );
4093        let resp = svc.get_item(&req).unwrap();
4094        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4095        assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
4096        assert_eq!(body["Item"]["data"]["S"], "replicated-value");
4097    }
4098
4099    #[test]
4100    fn contributor_insights_tracks_access() {
4101        let svc = make_service();
4102        create_test_table(&svc);
4103
4104        // Enable contributor insights
4105        let req = make_request(
4106            "UpdateContributorInsights",
4107            json!({
4108                "TableName": "test-table",
4109                "ContributorInsightsAction": "ENABLE"
4110            }),
4111        );
4112        svc.update_contributor_insights(&req).unwrap();
4113
4114        // Put items with different partition keys
4115        for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
4116            let req = make_request(
4117                "PutItem",
4118                json!({
4119                    "TableName": "test-table",
4120                    "Item": {
4121                        "pk": { "S": key },
4122                        "data": { "S": "value" }
4123                    }
4124                }),
4125            );
4126            svc.put_item(&req).unwrap();
4127        }
4128
4129        // Get items (to also track read access)
4130        for _ in 0..3 {
4131            let req = make_request(
4132                "GetItem",
4133                json!({
4134                    "TableName": "test-table",
4135                    "Key": { "pk": { "S": "alpha" } }
4136                }),
4137            );
4138            svc.get_item(&req).unwrap();
4139        }
4140
4141        // Describe contributor insights — should show top contributors
4142        let req = make_request(
4143            "DescribeContributorInsights",
4144            json!({ "TableName": "test-table" }),
4145        );
4146        let resp = svc.describe_contributor_insights(&req).unwrap();
4147        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4148        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
4149
4150        let contributors = body["TopContributors"].as_array().unwrap();
4151        assert!(
4152            !contributors.is_empty(),
4153            "TopContributors should not be empty"
4154        );
4155
4156        // alpha was accessed 3 (put) + 3 (get) = 6 times, beta 2 times
4157        // alpha should be the top contributor
4158        let top = &contributors[0];
4159        assert!(top["Count"].as_u64().unwrap() > 0);
4160
4161        // Verify the rule list is populated
4162        let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
4163        assert!(!rules.is_empty());
4164    }
4165
4166    #[test]
4167    fn contributor_insights_not_tracked_when_disabled() {
4168        let svc = make_service();
4169        create_test_table(&svc);
4170
4171        // Put items without enabling insights
4172        let req = make_request(
4173            "PutItem",
4174            json!({
4175                "TableName": "test-table",
4176                "Item": {
4177                    "pk": { "S": "key1" },
4178                    "data": { "S": "value" }
4179                }
4180            }),
4181        );
4182        svc.put_item(&req).unwrap();
4183
4184        // Describe — should show empty contributors
4185        let req = make_request(
4186            "DescribeContributorInsights",
4187            json!({ "TableName": "test-table" }),
4188        );
4189        let resp = svc.describe_contributor_insights(&req).unwrap();
4190        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4191        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
4192
4193        let contributors = body["TopContributors"].as_array().unwrap();
4194        assert!(contributors.is_empty());
4195    }
4196
4197    #[test]
4198    fn contributor_insights_disabled_table_no_counters_after_scan() {
4199        let svc = make_service();
4200        create_test_table(&svc);
4201
4202        // Put items
4203        for key in &["alpha", "beta"] {
4204            let req = make_request(
4205                "PutItem",
4206                json!({
4207                    "TableName": "test-table",
4208                    "Item": { "pk": { "S": key } }
4209                }),
4210            );
4211            svc.put_item(&req).unwrap();
4212        }
4213
4214        // Enable insights, then scan, then disable, then check counters are cleared
4215        let req = make_request(
4216            "UpdateContributorInsights",
4217            json!({
4218                "TableName": "test-table",
4219                "ContributorInsightsAction": "ENABLE"
4220            }),
4221        );
4222        svc.update_contributor_insights(&req).unwrap();
4223
4224        // Scan to trigger counter collection
4225        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4226        svc.scan(&req).unwrap();
4227
4228        // Verify counters were collected
4229        let req = make_request(
4230            "DescribeContributorInsights",
4231            json!({ "TableName": "test-table" }),
4232        );
4233        let resp = svc.describe_contributor_insights(&req).unwrap();
4234        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4235        let contributors = body["TopContributors"].as_array().unwrap();
4236        assert!(
4237            !contributors.is_empty(),
4238            "counters should be non-empty while enabled"
4239        );
4240
4241        // Disable insights (this clears counters)
4242        let req = make_request(
4243            "UpdateContributorInsights",
4244            json!({
4245                "TableName": "test-table",
4246                "ContributorInsightsAction": "DISABLE"
4247            }),
4248        );
4249        svc.update_contributor_insights(&req).unwrap();
4250
4251        // Scan again -- should NOT accumulate counters since insights is disabled
4252        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4253        svc.scan(&req).unwrap();
4254
4255        // Verify counters are still empty
4256        let req = make_request(
4257            "DescribeContributorInsights",
4258            json!({ "TableName": "test-table" }),
4259        );
4260        let resp = svc.describe_contributor_insights(&req).unwrap();
4261        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4262        let contributors = body["TopContributors"].as_array().unwrap();
4263        assert!(
4264            contributors.is_empty(),
4265            "counters should be empty after disabling insights"
4266        );
4267    }
4268
4269    #[test]
4270    fn scan_pagination_with_limit() {
4271        let svc = make_service();
4272        create_test_table(&svc);
4273
4274        // Insert 5 items
4275        for i in 0..5 {
4276            let req = make_request(
4277                "PutItem",
4278                json!({
4279                    "TableName": "test-table",
4280                    "Item": {
4281                        "pk": { "S": format!("item{i}") },
4282                        "data": { "S": format!("value{i}") }
4283                    }
4284                }),
4285            );
4286            svc.put_item(&req).unwrap();
4287        }
4288
4289        // Scan with limit=2
4290        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
4291        let resp = svc.scan(&req).unwrap();
4292        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4293        assert_eq!(body["Count"], 2);
4294        assert!(
4295            body["LastEvaluatedKey"].is_object(),
4296            "should have LastEvaluatedKey when limit < total items"
4297        );
4298        assert!(body["LastEvaluatedKey"]["pk"].is_object());
4299
4300        // Page through all items
4301        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
4302        let mut lek = body["LastEvaluatedKey"].clone();
4303
4304        while lek.is_object() {
4305            let req = make_request(
4306                "Scan",
4307                json!({
4308                    "TableName": "test-table",
4309                    "Limit": 2,
4310                    "ExclusiveStartKey": lek
4311                }),
4312            );
4313            let resp = svc.scan(&req).unwrap();
4314            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4315            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
4316            lek = body["LastEvaluatedKey"].clone();
4317        }
4318
4319        assert_eq!(
4320            all_items.len(),
4321            5,
4322            "should retrieve all 5 items via pagination"
4323        );
4324    }
4325
4326    #[test]
4327    fn scan_no_pagination_when_all_fit() {
4328        let svc = make_service();
4329        create_test_table(&svc);
4330
4331        for i in 0..3 {
4332            let req = make_request(
4333                "PutItem",
4334                json!({
4335                    "TableName": "test-table",
4336                    "Item": {
4337                        "pk": { "S": format!("item{i}") }
4338                    }
4339                }),
4340            );
4341            svc.put_item(&req).unwrap();
4342        }
4343
4344        // Scan with limit > item count
4345        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
4346        let resp = svc.scan(&req).unwrap();
4347        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4348        assert_eq!(body["Count"], 3);
4349        assert!(
4350            body["LastEvaluatedKey"].is_null(),
4351            "should not have LastEvaluatedKey when all items fit"
4352        );
4353
4354        // Scan without limit
4355        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4356        let resp = svc.scan(&req).unwrap();
4357        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4358        assert_eq!(body["Count"], 3);
4359        assert!(body["LastEvaluatedKey"].is_null());
4360    }
4361
4362    fn create_composite_table(svc: &DynamoDbService) {
4363        let req = make_request(
4364            "CreateTable",
4365            json!({
4366                "TableName": "composite-table",
4367                "KeySchema": [
4368                    { "AttributeName": "pk", "KeyType": "HASH" },
4369                    { "AttributeName": "sk", "KeyType": "RANGE" }
4370                ],
4371                "AttributeDefinitions": [
4372                    { "AttributeName": "pk", "AttributeType": "S" },
4373                    { "AttributeName": "sk", "AttributeType": "S" }
4374                ],
4375                "BillingMode": "PAY_PER_REQUEST"
4376            }),
4377        );
4378        svc.create_table(&req).unwrap();
4379    }
4380
4381    #[test]
4382    fn query_pagination_with_composite_key() {
4383        let svc = make_service();
4384        create_composite_table(&svc);
4385
4386        // Insert 5 items under the same partition key
4387        for i in 0..5 {
4388            let req = make_request(
4389                "PutItem",
4390                json!({
4391                    "TableName": "composite-table",
4392                    "Item": {
4393                        "pk": { "S": "user1" },
4394                        "sk": { "S": format!("item{i:03}") },
4395                        "data": { "S": format!("value{i}") }
4396                    }
4397                }),
4398            );
4399            svc.put_item(&req).unwrap();
4400        }
4401
4402        // Query with limit=2
4403        let req = make_request(
4404            "Query",
4405            json!({
4406                "TableName": "composite-table",
4407                "KeyConditionExpression": "pk = :pk",
4408                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4409                "Limit": 2
4410            }),
4411        );
4412        let resp = svc.query(&req).unwrap();
4413        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4414        assert_eq!(body["Count"], 2);
4415        assert!(body["LastEvaluatedKey"].is_object());
4416        assert!(body["LastEvaluatedKey"]["pk"].is_object());
4417        assert!(body["LastEvaluatedKey"]["sk"].is_object());
4418
4419        // Page through all items
4420        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
4421        let mut lek = body["LastEvaluatedKey"].clone();
4422
4423        while lek.is_object() {
4424            let req = make_request(
4425                "Query",
4426                json!({
4427                    "TableName": "composite-table",
4428                    "KeyConditionExpression": "pk = :pk",
4429                    "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4430                    "Limit": 2,
4431                    "ExclusiveStartKey": lek
4432                }),
4433            );
4434            let resp = svc.query(&req).unwrap();
4435            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4436            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
4437            lek = body["LastEvaluatedKey"].clone();
4438        }
4439
4440        assert_eq!(
4441            all_items.len(),
4442            5,
4443            "should retrieve all 5 items via pagination"
4444        );
4445
4446        // Verify items came back sorted by sort key
4447        let sks: Vec<String> = all_items
4448            .iter()
4449            .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
4450            .collect();
4451        let mut sorted = sks.clone();
4452        sorted.sort();
4453        assert_eq!(sks, sorted, "items should be sorted by sort key");
4454    }
4455
4456    #[test]
4457    fn query_no_pagination_when_all_fit() {
4458        let svc = make_service();
4459        create_composite_table(&svc);
4460
4461        for i in 0..2 {
4462            let req = make_request(
4463                "PutItem",
4464                json!({
4465                    "TableName": "composite-table",
4466                    "Item": {
4467                        "pk": { "S": "user1" },
4468                        "sk": { "S": format!("item{i}") }
4469                    }
4470                }),
4471            );
4472            svc.put_item(&req).unwrap();
4473        }
4474
4475        let req = make_request(
4476            "Query",
4477            json!({
4478                "TableName": "composite-table",
4479                "KeyConditionExpression": "pk = :pk",
4480                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4481                "Limit": 10
4482            }),
4483        );
4484        let resp = svc.query(&req).unwrap();
4485        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4486        assert_eq!(body["Count"], 2);
4487        assert!(
4488            body["LastEvaluatedKey"].is_null(),
4489            "should not have LastEvaluatedKey when all items fit"
4490        );
4491    }
4492
4493    fn create_gsi_table(svc: &DynamoDbService) {
4494        let req = make_request(
4495            "CreateTable",
4496            json!({
4497                "TableName": "gsi-table",
4498                "KeySchema": [
4499                    { "AttributeName": "pk", "KeyType": "HASH" }
4500                ],
4501                "AttributeDefinitions": [
4502                    { "AttributeName": "pk", "AttributeType": "S" },
4503                    { "AttributeName": "gsi_pk", "AttributeType": "S" },
4504                    { "AttributeName": "gsi_sk", "AttributeType": "S" }
4505                ],
4506                "BillingMode": "PAY_PER_REQUEST",
4507                "GlobalSecondaryIndexes": [
4508                    {
4509                        "IndexName": "gsi-index",
4510                        "KeySchema": [
4511                            { "AttributeName": "gsi_pk", "KeyType": "HASH" },
4512                            { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
4513                        ],
4514                        "Projection": { "ProjectionType": "ALL" }
4515                    }
4516                ]
4517            }),
4518        );
4519        svc.create_table(&req).unwrap();
4520    }
4521
4522    #[test]
4523    fn gsi_query_last_evaluated_key_includes_table_pk() {
4524        let svc = make_service();
4525        create_gsi_table(&svc);
4526
4527        // Insert 3 items with the SAME GSI key but different table PKs
4528        for i in 0..3 {
4529            let req = make_request(
4530                "PutItem",
4531                json!({
4532                    "TableName": "gsi-table",
4533                    "Item": {
4534                        "pk": { "S": format!("item{i}") },
4535                        "gsi_pk": { "S": "shared" },
4536                        "gsi_sk": { "S": "sort" }
4537                    }
4538                }),
4539            );
4540            svc.put_item(&req).unwrap();
4541        }
4542
4543        // Query GSI with Limit=1 to trigger pagination
4544        let req = make_request(
4545            "Query",
4546            json!({
4547                "TableName": "gsi-table",
4548                "IndexName": "gsi-index",
4549                "KeyConditionExpression": "gsi_pk = :v",
4550                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4551                "Limit": 1
4552            }),
4553        );
4554        let resp = svc.query(&req).unwrap();
4555        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4556        assert_eq!(body["Count"], 1);
4557        let lek = &body["LastEvaluatedKey"];
4558        assert!(lek.is_object(), "should have LastEvaluatedKey");
4559        // Must contain the index keys
4560        assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
4561        assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
4562        // Must also contain the table PK
4563        assert!(
4564            lek["pk"].is_object(),
4565            "LEK must contain table PK for GSI queries"
4566        );
4567    }
4568
4569    #[test]
4570    fn gsi_query_pagination_returns_all_items() {
4571        let svc = make_service();
4572        create_gsi_table(&svc);
4573
4574        // Insert 4 items with the SAME GSI key but different table PKs
4575        for i in 0..4 {
4576            let req = make_request(
4577                "PutItem",
4578                json!({
4579                    "TableName": "gsi-table",
4580                    "Item": {
4581                        "pk": { "S": format!("item{i:03}") },
4582                        "gsi_pk": { "S": "shared" },
4583                        "gsi_sk": { "S": "sort" }
4584                    }
4585                }),
4586            );
4587            svc.put_item(&req).unwrap();
4588        }
4589
4590        // Paginate through all items with Limit=2
4591        let mut all_pks = Vec::new();
4592        let mut lek: Option<Value> = None;
4593
4594        loop {
4595            let mut query = json!({
4596                "TableName": "gsi-table",
4597                "IndexName": "gsi-index",
4598                "KeyConditionExpression": "gsi_pk = :v",
4599                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4600                "Limit": 2
4601            });
4602            if let Some(ref start_key) = lek {
4603                query["ExclusiveStartKey"] = start_key.clone();
4604            }
4605
4606            let req = make_request("Query", query);
4607            let resp = svc.query(&req).unwrap();
4608            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4609
4610            for item in body["Items"].as_array().unwrap() {
4611                let pk = item["pk"]["S"].as_str().unwrap().to_string();
4612                all_pks.push(pk);
4613            }
4614
4615            if body["LastEvaluatedKey"].is_object() {
4616                lek = Some(body["LastEvaluatedKey"].clone());
4617            } else {
4618                break;
4619            }
4620        }
4621
4622        all_pks.sort();
4623        assert_eq!(
4624            all_pks,
4625            vec!["item000", "item001", "item002", "item003"],
4626            "pagination should return all items without duplicates"
4627        );
4628    }
4629
4630    fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
4631        pairs
4632            .iter()
4633            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4634            .collect()
4635    }
4636
4637    fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
4638        pairs
4639            .iter()
4640            .map(|(k, v)| (k.to_string(), v.to_string()))
4641            .collect()
4642    }
4643
4644    fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
4645        pairs
4646            .iter()
4647            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4648            .collect()
4649    }
4650
4651    #[test]
4652    fn test_evaluate_condition_bare_not_equal() {
4653        let item = cond_item(&[("state", "active")]);
4654        let names = cond_names(&[("#s", "state")]);
4655        let values = cond_values(&[(":c", "complete")]);
4656
4657        assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
4658
4659        let item2 = cond_item(&[("state", "complete")]);
4660        assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
4661    }
4662
4663    #[test]
4664    fn test_evaluate_condition_parenthesized_not_equal() {
4665        let item = cond_item(&[("state", "active")]);
4666        let names = cond_names(&[("#s", "state")]);
4667        let values = cond_values(&[(":c", "complete")]);
4668
4669        assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
4670    }
4671
4672    #[test]
4673    fn test_evaluate_condition_parenthesized_equal_mismatch() {
4674        let item = cond_item(&[("state", "active")]);
4675        let names = cond_names(&[("#s", "state")]);
4676        let values = cond_values(&[(":c", "complete")]);
4677
4678        assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
4679    }
4680
4681    #[test]
4682    fn test_evaluate_condition_compound_and() {
4683        let item = cond_item(&[("state", "active")]);
4684        let names = cond_names(&[("#s", "state")]);
4685        let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
4686
4687        // active <> complete AND active <> failed => true
4688        assert!(
4689            evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
4690        );
4691    }
4692
4693    #[test]
4694    fn test_evaluate_condition_compound_and_mismatch() {
4695        let item = cond_item(&[("state", "inactive")]);
4696        let names = cond_names(&[("#s", "state")]);
4697        let values = cond_values(&[(":a", "active"), (":b", "active")]);
4698
4699        // inactive = active AND inactive = active => false
4700        assert!(
4701            evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
4702        );
4703    }
4704
4705    #[test]
4706    fn test_evaluate_condition_compound_or() {
4707        let item = cond_item(&[("state", "running")]);
4708        let names = cond_names(&[("#s", "state")]);
4709        let values = cond_values(&[(":a", "active"), (":b", "idle")]);
4710
4711        // running = active OR running = idle => false
4712        assert!(
4713            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
4714        );
4715
4716        // running = active OR running = running => true
4717        let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
4718        assert!(
4719            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
4720        );
4721    }
4722
4723    #[test]
4724    fn test_evaluate_condition_not_operator() {
4725        let item = cond_item(&[("state", "active")]);
4726        let names = cond_names(&[("#s", "state")]);
4727        let values = cond_values(&[(":c", "complete")]);
4728
4729        // NOT (active = complete) => NOT false => true
4730        assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
4731
4732        // NOT (active <> complete) => NOT true => false
4733        assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
4734
4735        // NOT attribute_exists(#s) on existing item => NOT true => false
4736        assert!(
4737            evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
4738        );
4739
4740        // NOT attribute_exists(#s) on missing item => NOT false => true
4741        assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
4742    }
4743
4744    #[test]
4745    fn test_evaluate_condition_begins_with() {
4746        // After unification, conditions support begins_with via
4747        // evaluate_single_filter_condition (previously only filters had it).
4748        let item = cond_item(&[("name", "fakecloud-dynamodb")]);
4749        let names = cond_names(&[("#n", "name")]);
4750        let values = cond_values(&[(":p", "fakecloud")]);
4751
4752        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
4753
4754        let values2 = cond_values(&[(":p", "realcloud")]);
4755        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
4756    }
4757
4758    #[test]
4759    fn test_evaluate_condition_contains() {
4760        let item = cond_item(&[("tags", "alpha,beta,gamma")]);
4761        let names = cond_names(&[("#t", "tags")]);
4762        let values = cond_values(&[(":v", "beta")]);
4763
4764        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
4765
4766        let values2 = cond_values(&[(":v", "delta")]);
4767        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
4768    }
4769
4770    #[test]
4771    fn test_evaluate_condition_no_existing_item() {
4772        // When no item exists (PutItem with condition), attribute_not_exists
4773        // should succeed and attribute_exists should fail.
4774        let names = cond_names(&[("#s", "state")]);
4775        let values = cond_values(&[(":v", "active")]);
4776
4777        assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
4778        assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
4779        // Comparison against missing item: None != Some(val) => true for <>
4780        assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
4781        // None == Some(val) => false for =
4782        assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
4783    }
4784
4785    #[test]
4786    fn test_evaluate_filter_not_operator() {
4787        let item = cond_item(&[("status", "pending")]);
4788        let names = cond_names(&[("#s", "status")]);
4789        let values = cond_values(&[(":v", "pending")]);
4790
4791        assert!(!evaluate_filter_expression(
4792            "NOT (#s = :v)",
4793            &item,
4794            &names,
4795            &values
4796        ));
4797        assert!(evaluate_filter_expression(
4798            "NOT (#s <> :v)",
4799            &item,
4800            &names,
4801            &values
4802        ));
4803    }
4804
4805    #[test]
4806    fn test_evaluate_filter_expression_in_match() {
4807        // aws-sdk-go v2's expression.Name("state").In(Value("active"), Value("pending"))
4808        // emits "#0 IN (:0, :1)". Before fix: neither evaluate_single_filter_condition
4809        // nor evaluate_single_key_condition handled IN, so the filter leaf fell through
4810        // to the simple-comparison loop, hit no operators, and returned `true` — meaning
4811        // every item matched every IN filter regardless of value.
4812        let item = cond_item(&[("state", "active")]);
4813        let names = cond_names(&[("#s", "state")]);
4814        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4815
4816        assert!(
4817            evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4818            "state=active should match IN (active, pending)"
4819        );
4820    }
4821
4822    #[test]
4823    fn test_evaluate_filter_expression_in_no_match() {
4824        let item = cond_item(&[("state", "complete")]);
4825        let names = cond_names(&[("#s", "state")]);
4826        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4827
4828        assert!(
4829            !evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4830            "state=complete should not match IN (active, pending)"
4831        );
4832    }
4833
4834    #[test]
4835    fn test_evaluate_filter_expression_in_no_spaces() {
4836        // orderbot emits the raw form
4837        //     "#status IN (" + strings.Join(keys, ",") + ")"
4838        // which produces "IN (:v0,:v1,:v2)" — no spaces after commas. Must parse.
4839        let item = cond_item(&[("status", "shipped")]);
4840        let names = cond_names(&[("#s", "status")]);
4841        let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
4842
4843        assert!(
4844            evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
4845            "no-space IN list should still parse"
4846        );
4847    }
4848
4849    #[test]
4850    fn test_evaluate_filter_expression_in_missing_attribute() {
4851        // A missing attribute must not match any IN list — the silent-true
4852        // fallthrough would wrongly accept these items.
4853        let item: HashMap<String, AttributeValue> = HashMap::new();
4854        let names = cond_names(&[("#s", "state")]);
4855        let values = cond_values(&[(":a", "active")]);
4856
4857        assert!(
4858            !evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
4859            "missing attribute should not match any IN list"
4860        );
4861    }
4862
4863    #[test]
4864    fn test_evaluate_filter_expression_compound_in_and_eq() {
4865        // Shape emitted by `Name("state").In(...).And(Name("priority").Equal(...))`:
4866        //     "(#0 IN (:0, :1)) AND (#1 = :2)"
4867        // split_on_and handles the outer parens, but the IN leaf had the
4868        // silent-true fallthrough, so any item with priority=high would match
4869        // regardless of state.
4870        let item = cond_item(&[("state", "active"), ("priority", "high")]);
4871        let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
4872        let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
4873
4874        assert!(
4875            evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
4876            "(active IN (active, pending)) AND (high = high) should match"
4877        );
4878
4879        let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
4880        assert!(
4881            !evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
4882            "(complete IN (active, pending)) AND (high = high) should not match"
4883        );
4884    }
4885
4886    #[test]
4887    fn test_evaluate_condition_attribute_exists_with_space() {
4888        // aws-sdk-go v2's expression.NewBuilder emits function calls with a
4889        // space between the name and the opening paren:
4890        //     "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))"
4891        // Before fix: extract_function_arg used strip_prefix("attribute_exists(")
4892        // with no space, so these fell through the filter leaf entirely and
4893        // hit evaluate_single_key_condition's silent-true fallthrough —
4894        // every conditional write was silently accepted.
4895        let item = cond_item(&[("store_id", "s-1")]);
4896        let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
4897        let values = cond_values(&[(":0", "tab-A")]);
4898
4899        // On an existing item without active_viewer_tab_id: exists(store_id)
4900        // is true, not_exists(active_viewer_tab_id) is true → OK.
4901        assert!(
4902            evaluate_condition(
4903                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4904                Some(&item),
4905                &names,
4906                &values,
4907            )
4908            .is_ok(),
4909            "claim-lease compound on free item should succeed"
4910        );
4911
4912        // On a missing item: exists(store_id) is false → whole AND false → Err.
4913        assert!(
4914            evaluate_condition(
4915                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4916                None,
4917                &names,
4918                &values,
4919            )
4920            .is_err(),
4921            "claim-lease compound on missing item must fail attribute_exists branch"
4922        );
4923
4924        // On an item already held by tab-B: exists ✓, not_exists ✗, #1 = :0 ✗
4925        // → (✓) AND ((✗) OR (✗)) → false → Err.
4926        let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
4927        assert!(
4928            evaluate_condition(
4929                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4930                Some(&held),
4931                &names,
4932                &values,
4933            )
4934            .is_err(),
4935            "claim-lease compound on item held by another tab must fail"
4936        );
4937
4938        // Same tab re-claiming: exists ✓, not_exists ✗, #1 = :0 ✓
4939        // → (✓) AND ((✗) OR (✓)) → true → Ok.
4940        let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
4941        assert!(
4942            evaluate_condition(
4943                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4944                Some(&self_held),
4945                &names,
4946                &values,
4947            )
4948            .is_ok(),
4949            "same-tab re-claim must succeed"
4950        );
4951    }
4952
4953    #[test]
4954    fn test_evaluate_condition_in_match() {
4955        // evaluate_condition delegates to evaluate_filter_expression, so this
4956        // also proves the ConditionExpression path. Before fix: silently Ok.
4957        let item = cond_item(&[("state", "active")]);
4958        let names = cond_names(&[("#s", "state")]);
4959        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4960
4961        assert!(
4962            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
4963            "IN should succeed when actual value is in the list"
4964        );
4965    }
4966
4967    #[test]
4968    fn test_evaluate_condition_in_no_match() {
4969        // Before fix: evaluate_condition silently returned Ok(()) for IN — any
4970        // conditional write was accepted regardless of actual state, the
4971        // opposite of what the caller asked for.
4972        let item = cond_item(&[("state", "complete")]);
4973        let names = cond_names(&[("#s", "state")]);
4974        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4975
4976        assert!(
4977            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
4978            "IN should fail when actual value is not in the list"
4979        );
4980    }
4981
4982    #[test]
4983    fn test_apply_update_set_list_index_replaces_existing() {
4984        // Shape emitted by orderbot's order-item update retry loop:
4985        //     UpdateExpression: fmt.Sprintf("SET #items[%d] = :item", index)
4986        // Before fix: apply_set_assignment called resolve_attr_name on the
4987        // whole "#items[0]" token, which misses the name map, and then
4988        // item.insert("#items[0]", :item), producing a top-level key
4989        // literally named "#items[0]" rather than mutating the list.
4990        let mut item = HashMap::new();
4991        item.insert(
4992            "items".to_string(),
4993            json!({"L": [
4994                {"M": {"sku": {"S": "OLD-A"}}},
4995                {"M": {"sku": {"S": "OLD-B"}}},
4996            ]}),
4997        );
4998
4999        let names = cond_names(&[("#items", "items")]);
5000        let mut values = HashMap::new();
5001        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
5002
5003        apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
5004
5005        let items_list = item
5006            .get("items")
5007            .and_then(|v| v.get("L"))
5008            .and_then(|v| v.as_array())
5009            .expect("items should still be a list");
5010        assert_eq!(items_list.len(), 2, "list length should be unchanged");
5011        let sku0 = items_list[0]
5012            .get("M")
5013            .and_then(|m| m.get("sku"))
5014            .and_then(|s| s.get("S"))
5015            .and_then(|s| s.as_str());
5016        assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
5017        let sku1 = items_list[1]
5018            .get("M")
5019            .and_then(|m| m.get("sku"))
5020            .and_then(|s| s.get("S"))
5021            .and_then(|s| s.as_str());
5022        assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
5023
5024        assert!(!item.contains_key("items[0]"));
5025        assert!(!item.contains_key("#items[0]"));
5026    }
5027
5028    #[test]
5029    fn test_apply_update_set_list_index_second_slot() {
5030        let mut item = HashMap::new();
5031        item.insert(
5032            "items".to_string(),
5033            json!({"L": [
5034                {"M": {"sku": {"S": "A"}}},
5035                {"M": {"sku": {"S": "B"}}},
5036                {"M": {"sku": {"S": "C"}}},
5037            ]}),
5038        );
5039
5040        let names = cond_names(&[("#items", "items")]);
5041        let mut values = HashMap::new();
5042        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
5043
5044        apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
5045
5046        let items_list = item
5047            .get("items")
5048            .and_then(|v| v.get("L"))
5049            .and_then(|v| v.as_array())
5050            .unwrap();
5051        let skus: Vec<&str> = items_list
5052            .iter()
5053            .map(|v| {
5054                v.get("M")
5055                    .and_then(|m| m.get("sku"))
5056                    .and_then(|s| s.get("S"))
5057                    .and_then(|s| s.as_str())
5058                    .unwrap()
5059            })
5060            .collect();
5061        assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
5062    }
5063
5064    #[test]
5065    fn test_apply_update_set_list_index_without_name_ref() {
5066        // Same fix must also work when the LHS is a literal attribute name,
5067        // not an expression attribute name ref.
5068        let mut item = HashMap::new();
5069        item.insert(
5070            "tags".to_string(),
5071            json!({"L": [{"S": "red"}, {"S": "blue"}]}),
5072        );
5073
5074        let names: HashMap<String, String> = HashMap::new();
5075        let mut values = HashMap::new();
5076        values.insert(":t".to_string(), json!({"S": "green"}));
5077
5078        apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
5079
5080        let tags = item
5081            .get("tags")
5082            .and_then(|v| v.get("L"))
5083            .and_then(|v| v.as_array())
5084            .unwrap();
5085        assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
5086        assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
5087    }
5088
5089    #[test]
5090    fn test_list_append_into_empty_list() {
5091        // Regression: UpdateItem with `SET #0 = list_append(#0, :0)` where
5092        // the attribute already exists as an empty list silently no-oped.
5093        // Root cause: parse_update_clauses split `list_append(#0, :0)` at
5094        // the inner comma, so apply_set_list_append received a truncated
5095        // `rest` with no closing ')' and returned early without writing.
5096        let mut item = HashMap::new();
5097        item.insert("files".to_string(), json!({"L": []}));
5098
5099        let names = cond_names(&[("#0", "files")]);
5100        let mut values = HashMap::new();
5101        values.insert(
5102            ":0".to_string(),
5103            json!({"L": [{"M": {"field": {"S": "value"}}}]}),
5104        );
5105
5106        apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
5107            .unwrap();
5108
5109        let list = item
5110            .get("files")
5111            .and_then(|v| v.get("L"))
5112            .and_then(|v| v.as_array())
5113            .expect("files should be an L-typed attribute");
5114        assert_eq!(list.len(), 1, "one element should have been appended");
5115    }
5116
5117    #[test]
5118    fn test_list_append_into_nonempty_list() {
5119        // Verifies the same fix works when the existing list already has elements.
5120        let mut item = HashMap::new();
5121        item.insert(
5122            "files".to_string(),
5123            json!({"L": [{"M": {"field": {"S": "existing"}}}]}),
5124        );
5125
5126        let names = cond_names(&[("#0", "files")]);
5127        let mut values = HashMap::new();
5128        values.insert(
5129            ":0".to_string(),
5130            json!({"L": [{"M": {"field": {"S": "new"}}}]}),
5131        );
5132
5133        apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
5134            .unwrap();
5135
5136        let list = item
5137            .get("files")
5138            .and_then(|v| v.get("L"))
5139            .and_then(|v| v.as_array())
5140            .expect("files should be an L-typed attribute");
5141        assert_eq!(list.len(), 2, "existing element plus one new element");
5142    }
5143
5144    #[test]
5145    fn test_list_append_combined_with_plain_set() {
5146        // Verifies that a mixed expression like
5147        // `SET #a = list_append(#a, :v), #b = :other` correctly applies
5148        // both assignments after the paren-aware comma split fix.
5149        let mut item = HashMap::new();
5150        item.insert("logs".to_string(), json!({"L": []}));
5151        item.insert("count".to_string(), json!({"N": "0"}));
5152
5153        let names = cond_names(&[("#a", "logs"), ("#b", "count")]);
5154        let mut values = HashMap::new();
5155        values.insert(":v".to_string(), json!({"L": [{"S": "entry"}]}));
5156        values.insert(":other".to_string(), json!({"N": "1"}));
5157
5158        apply_update_expression(
5159            &mut item,
5160            "SET #a = list_append(#a, :v), #b = :other",
5161            &names,
5162            &values,
5163        )
5164        .unwrap();
5165
5166        let list = item
5167            .get("logs")
5168            .and_then(|v| v.get("L"))
5169            .and_then(|v| v.as_array())
5170            .expect("logs should be an L-typed attribute");
5171        assert_eq!(list.len(), 1, "one log entry appended");
5172
5173        let count = item
5174            .get("count")
5175            .and_then(|v| v.get("N"))
5176            .and_then(|v| v.as_str())
5177            .expect("count should be an N-typed attribute");
5178        assert_eq!(count, "1", "count updated to 1");
5179    }
5180
5181    #[test]
5182    fn test_unrecognized_expression_returns_false() {
5183        // evaluate_single_key_condition must fail-closed: an expression shape
5184        // it doesn't recognize should return false (reject), not true (accept).
5185        let item = cond_item(&[("x", "1")]);
5186        let names: HashMap<String, String> = HashMap::new();
5187        let values: HashMap<String, Value> = HashMap::new();
5188
5189        assert!(
5190            !evaluate_single_key_condition("GARBAGE NONSENSE", &item, &names, &values),
5191            "unrecognized expression must return false"
5192        );
5193    }
5194
5195    #[test]
5196    fn test_set_list_index_out_of_range_returns_error() {
5197        // SET list[N] where N > len must return a ValidationException,
5198        // not silently no-op.
5199        let mut item = HashMap::new();
5200        item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
5201
5202        let names: HashMap<String, String> = HashMap::new();
5203        let mut values = HashMap::new();
5204        values.insert(":v".to_string(), json!({"S": "z"}));
5205
5206        let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
5207        assert!(
5208            result.is_err(),
5209            "out-of-range list index must return an error"
5210        );
5211
5212        // List should be unchanged
5213        let list = item
5214            .get("items")
5215            .and_then(|v| v.get("L"))
5216            .and_then(|v| v.as_array())
5217            .unwrap();
5218        assert_eq!(list.len(), 2);
5219    }
5220
5221    #[test]
5222    fn test_set_list_index_on_non_list_returns_error() {
5223        // SET attr[0] = :v where attr is a string (not a list) must return
5224        // a ValidationException.
5225        let mut item = HashMap::new();
5226        item.insert("name".to_string(), json!({"S": "hello"}));
5227
5228        let names: HashMap<String, String> = HashMap::new();
5229        let mut values = HashMap::new();
5230        values.insert(":v".to_string(), json!({"S": "z"}));
5231
5232        let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
5233        assert!(
5234            result.is_err(),
5235            "list index on non-list attribute must return an error"
5236        );
5237    }
5238
5239    #[test]
5240    fn test_unrecognized_update_action_returns_error() {
5241        let mut item = HashMap::new();
5242        item.insert("name".to_string(), json!({"S": "hello"}));
5243
5244        let names: HashMap<String, String> = HashMap::new();
5245        let mut values = HashMap::new();
5246        values.insert(":bar".to_string(), json!({"S": "baz"}));
5247
5248        let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
5249        assert!(
5250            result.is_err(),
5251            "unrecognized UpdateExpression action must return an error"
5252        );
5253        let err_msg = format!("{}", result.unwrap_err());
5254        assert!(
5255            err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
5256            "error should mention Invalid UpdateExpression, got: {err_msg}"
5257        );
5258    }
5259
5260    // ── size() function tests ──────────────────────────────────────────
5261
5262    #[test]
5263    fn test_size_string() {
5264        let mut item = HashMap::new();
5265        item.insert("name".to_string(), json!({"S": "hello"}));
5266        let names = HashMap::new();
5267        let mut values = HashMap::new();
5268        values.insert(":limit".to_string(), json!({"N": "5"}));
5269
5270        assert!(evaluate_single_filter_condition(
5271            "size(name) = :limit",
5272            &item,
5273            &names,
5274            &values,
5275        ));
5276        values.insert(":limit".to_string(), json!({"N": "4"}));
5277        assert!(evaluate_single_filter_condition(
5278            "size(name) > :limit",
5279            &item,
5280            &names,
5281            &values,
5282        ));
5283    }
5284
5285    #[test]
5286    fn test_size_list() {
5287        let mut item = HashMap::new();
5288        item.insert(
5289            "items".to_string(),
5290            json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
5291        );
5292        let names = HashMap::new();
5293        let mut values = HashMap::new();
5294        values.insert(":limit".to_string(), json!({"N": "3"}));
5295
5296        assert!(evaluate_single_filter_condition(
5297            "size(items) = :limit",
5298            &item,
5299            &names,
5300            &values,
5301        ));
5302    }
5303
5304    #[test]
5305    fn test_size_map() {
5306        let mut item = HashMap::new();
5307        item.insert(
5308            "data".to_string(),
5309            json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
5310        );
5311        let names = HashMap::new();
5312        let mut values = HashMap::new();
5313        values.insert(":limit".to_string(), json!({"N": "2"}));
5314
5315        assert!(evaluate_single_filter_condition(
5316            "size(data) = :limit",
5317            &item,
5318            &names,
5319            &values,
5320        ));
5321    }
5322
5323    #[test]
5324    fn test_size_set() {
5325        let mut item = HashMap::new();
5326        item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
5327        let names = HashMap::new();
5328        let mut values = HashMap::new();
5329        values.insert(":limit".to_string(), json!({"N": "3"}));
5330
5331        assert!(evaluate_single_filter_condition(
5332            "size(tags) > :limit",
5333            &item,
5334            &names,
5335            &values,
5336        ));
5337    }
5338
5339    // ── attribute_type() function tests ────────────────────────────────
5340
5341    #[test]
5342    fn test_attribute_type_string() {
5343        let mut item = HashMap::new();
5344        item.insert("name".to_string(), json!({"S": "hello"}));
5345        let names = HashMap::new();
5346        let mut values = HashMap::new();
5347        values.insert(":t".to_string(), json!({"S": "S"}));
5348
5349        assert!(evaluate_single_filter_condition(
5350            "attribute_type(name, :t)",
5351            &item,
5352            &names,
5353            &values,
5354        ));
5355
5356        values.insert(":t".to_string(), json!({"S": "N"}));
5357        assert!(!evaluate_single_filter_condition(
5358            "attribute_type(name, :t)",
5359            &item,
5360            &names,
5361            &values,
5362        ));
5363    }
5364
5365    #[test]
5366    fn test_attribute_type_number() {
5367        let mut item = HashMap::new();
5368        item.insert("age".to_string(), json!({"N": "42"}));
5369        let names = HashMap::new();
5370        let mut values = HashMap::new();
5371        values.insert(":t".to_string(), json!({"S": "N"}));
5372
5373        assert!(evaluate_single_filter_condition(
5374            "attribute_type(age, :t)",
5375            &item,
5376            &names,
5377            &values,
5378        ));
5379    }
5380
5381    #[test]
5382    fn test_attribute_type_list() {
5383        let mut item = HashMap::new();
5384        item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
5385        let names = HashMap::new();
5386        let mut values = HashMap::new();
5387        values.insert(":t".to_string(), json!({"S": "L"}));
5388
5389        assert!(evaluate_single_filter_condition(
5390            "attribute_type(items, :t)",
5391            &item,
5392            &names,
5393            &values,
5394        ));
5395    }
5396
5397    #[test]
5398    fn test_attribute_type_map() {
5399        let mut item = HashMap::new();
5400        item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
5401        let names = HashMap::new();
5402        let mut values = HashMap::new();
5403        values.insert(":t".to_string(), json!({"S": "M"}));
5404
5405        assert!(evaluate_single_filter_condition(
5406            "attribute_type(data, :t)",
5407            &item,
5408            &names,
5409            &values,
5410        ));
5411    }
5412
5413    #[test]
5414    fn test_attribute_type_bool() {
5415        let mut item = HashMap::new();
5416        item.insert("active".to_string(), json!({"BOOL": true}));
5417        let names = HashMap::new();
5418        let mut values = HashMap::new();
5419        values.insert(":t".to_string(), json!({"S": "BOOL"}));
5420
5421        assert!(evaluate_single_filter_condition(
5422            "attribute_type(active, :t)",
5423            &item,
5424            &names,
5425            &values,
5426        ));
5427    }
5428
5429    // ── begins_with rejects non-string types ───────────────────────────
5430
5431    #[test]
5432    fn test_begins_with_rejects_number_type() {
5433        let mut item = HashMap::new();
5434        item.insert("code".to_string(), json!({"N": "12345"}));
5435        let names = HashMap::new();
5436        let mut values = HashMap::new();
5437        values.insert(":prefix".to_string(), json!({"S": "123"}));
5438
5439        assert!(
5440            !evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
5441            "begins_with must return false for N-type attributes"
5442        );
5443    }
5444
5445    #[test]
5446    fn test_begins_with_works_on_string_type() {
5447        let mut item = HashMap::new();
5448        item.insert("code".to_string(), json!({"S": "abc123"}));
5449        let names = HashMap::new();
5450        let mut values = HashMap::new();
5451        values.insert(":prefix".to_string(), json!({"S": "abc"}));
5452
5453        assert!(evaluate_single_filter_condition(
5454            "begins_with(code, :prefix)",
5455            &item,
5456            &names,
5457            &values,
5458        ));
5459    }
5460
5461    // ── contains on sets ───────────────────────────────────────────────
5462
5463    #[test]
5464    fn test_contains_string_set() {
5465        let mut item = HashMap::new();
5466        item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
5467        let names = HashMap::new();
5468        let mut values = HashMap::new();
5469        values.insert(":val".to_string(), json!({"S": "blue"}));
5470
5471        assert!(evaluate_single_filter_condition(
5472            "contains(tags, :val)",
5473            &item,
5474            &names,
5475            &values,
5476        ));
5477
5478        values.insert(":val".to_string(), json!({"S": "yellow"}));
5479        assert!(!evaluate_single_filter_condition(
5480            "contains(tags, :val)",
5481            &item,
5482            &names,
5483            &values,
5484        ));
5485    }
5486
5487    #[test]
5488    fn test_contains_number_set() {
5489        let mut item = HashMap::new();
5490        item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
5491        let names = HashMap::new();
5492        let mut values = HashMap::new();
5493        values.insert(":val".to_string(), json!({"N": "2"}));
5494
5495        assert!(evaluate_single_filter_condition(
5496            "contains(scores, :val)",
5497            &item,
5498            &names,
5499            &values,
5500        ));
5501    }
5502
5503    // ── SET arithmetic type validation ─────────────────────────────────
5504
5505    #[test]
5506    fn test_set_arithmetic_rejects_string_operand() {
5507        let mut item = HashMap::new();
5508        item.insert("name".to_string(), json!({"S": "hello"}));
5509        let names = HashMap::new();
5510        let mut values = HashMap::new();
5511        values.insert(":val".to_string(), json!({"N": "1"}));
5512
5513        let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
5514        assert!(
5515            result.is_err(),
5516            "arithmetic on S-type attribute must return a ValidationException"
5517        );
5518    }
5519
5520    #[test]
5521    fn test_set_arithmetic_rejects_string_value() {
5522        let mut item = HashMap::new();
5523        item.insert("count".to_string(), json!({"N": "5"}));
5524        let names = HashMap::new();
5525        let mut values = HashMap::new();
5526        values.insert(":val".to_string(), json!({"S": "notanumber"}));
5527
5528        let result =
5529            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5530        assert!(
5531            result.is_err(),
5532            "arithmetic with S-type value must return a ValidationException"
5533        );
5534    }
5535
5536    #[test]
5537    fn test_set_arithmetic_valid_numbers() {
5538        let mut item = HashMap::new();
5539        item.insert("count".to_string(), json!({"N": "10"}));
5540        let names = HashMap::new();
5541        let mut values = HashMap::new();
5542        values.insert(":val".to_string(), json!({"N": "3"}));
5543
5544        let result =
5545            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5546        assert!(result.is_ok());
5547        assert_eq!(item["count"], json!({"N": "13"}));
5548    }
5549
5550    // ── Binary Set (BS) support in ADD/DELETE ──────────────────────────
5551
5552    #[test]
5553    fn test_add_binary_set() {
5554        let mut item = HashMap::new();
5555        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
5556        let names = HashMap::new();
5557        let mut values = HashMap::new();
5558        values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
5559
5560        let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
5561        assert!(result.is_ok());
5562        let bs = item["data"]["BS"].as_array().unwrap();
5563        assert_eq!(bs.len(), 3, "should merge sets without duplicates");
5564        assert!(bs.contains(&json!("YQ==")));
5565        assert!(bs.contains(&json!("Yg==")));
5566        assert!(bs.contains(&json!("Yw==")));
5567    }
5568
5569    #[test]
5570    fn test_delete_binary_set() {
5571        let mut item = HashMap::new();
5572        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
5573        let names = HashMap::new();
5574        let mut values = HashMap::new();
5575        values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
5576
5577        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5578        assert!(result.is_ok());
5579        let bs = item["data"]["BS"].as_array().unwrap();
5580        assert_eq!(bs.len(), 2);
5581        assert!(!bs.contains(&json!("Yg==")));
5582    }
5583
5584    #[test]
5585    fn test_delete_binary_set_removes_attr_when_empty() {
5586        let mut item = HashMap::new();
5587        item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
5588        let names = HashMap::new();
5589        let mut values = HashMap::new();
5590        values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
5591
5592        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5593        assert!(result.is_ok());
5594        assert!(
5595            !item.contains_key("data"),
5596            "attribute should be removed when set becomes empty"
5597        );
5598    }
5599
5600    fn body_json(resp: &AwsResponse) -> Value {
5601        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
5602    }
5603
5604    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
5605        match result {
5606            Err(e) => e,
5607            Ok(_) => panic!("expected error, got Ok"),
5608        }
5609    }
5610
5611    // ── CreateTable ──
5612
5613    #[test]
5614    fn create_table_basic() {
5615        let svc = make_service();
5616        let req = make_request(
5617            "CreateTable",
5618            json!({
5619                "TableName": "my-table",
5620                "KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
5621                "AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"}],
5622                "BillingMode": "PAY_PER_REQUEST",
5623            }),
5624        );
5625        let resp = svc.create_table(&req).unwrap();
5626        let b = body_json(&resp);
5627        assert_eq!(b["TableDescription"]["TableName"], "my-table");
5628        assert_eq!(b["TableDescription"]["TableStatus"], "ACTIVE");
5629        assert!(b["TableDescription"]["TableArn"].as_str().is_some());
5630    }
5631
5632    #[test]
5633    fn create_table_with_sort_key_and_gsi() {
5634        let svc = make_service();
5635        let req = make_request(
5636            "CreateTable",
5637            json!({
5638                "TableName": "gsi-table",
5639                "KeySchema": [
5640                    {"AttributeName": "pk", "KeyType": "HASH"},
5641                    {"AttributeName": "sk", "KeyType": "RANGE"},
5642                ],
5643                "AttributeDefinitions": [
5644                    {"AttributeName": "pk", "AttributeType": "S"},
5645                    {"AttributeName": "sk", "AttributeType": "S"},
5646                    {"AttributeName": "gsi_key", "AttributeType": "N"},
5647                ],
5648                "GlobalSecondaryIndexes": [{
5649                    "IndexName": "gsi1",
5650                    "KeySchema": [{"AttributeName": "gsi_key", "KeyType": "HASH"}],
5651                    "Projection": {"ProjectionType": "ALL"},
5652                }],
5653                "BillingMode": "PAY_PER_REQUEST",
5654            }),
5655        );
5656        let resp = svc.create_table(&req).unwrap();
5657        let b = body_json(&resp);
5658        let gsi = b["TableDescription"]["GlobalSecondaryIndexes"]
5659            .as_array()
5660            .unwrap();
5661        assert_eq!(gsi.len(), 1);
5662        assert_eq!(gsi[0]["IndexName"], "gsi1");
5663    }
5664
5665    #[test]
5666    fn create_table_duplicate_fails() {
5667        let svc = make_service();
5668        create_test_table(&svc);
5669
5670        let req = make_request(
5671            "CreateTable",
5672            json!({
5673                "TableName": "test-table",
5674                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
5675                "AttributeDefinitions": [{"AttributeName": "pk", "AttributeType": "S"}],
5676                "BillingMode": "PAY_PER_REQUEST",
5677            }),
5678        );
5679        let err = expect_err(svc.create_table(&req));
5680        assert!(err.to_string().contains("ResourceInUseException"));
5681    }
5682
5683    #[test]
5684    fn create_table_missing_key_attr_in_definitions() {
5685        let svc = make_service();
5686        let req = make_request(
5687            "CreateTable",
5688            json!({
5689                "TableName": "bad",
5690                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
5691                "AttributeDefinitions": [{"AttributeName": "other", "AttributeType": "S"}],
5692                "BillingMode": "PAY_PER_REQUEST",
5693            }),
5694        );
5695        let err = expect_err(svc.create_table(&req));
5696        assert!(err.to_string().contains("ValidationException"));
5697    }
5698
5699    // ── DescribeTable ──
5700
5701    #[test]
5702    fn describe_table_found() {
5703        let svc = make_service();
5704        create_test_table(&svc);
5705
5706        let req = make_request("DescribeTable", json!({"TableName": "test-table"}));
5707        let resp = svc.describe_table(&req).unwrap();
5708        let b = body_json(&resp);
5709        assert_eq!(b["Table"]["TableName"], "test-table");
5710        assert_eq!(b["Table"]["TableStatus"], "ACTIVE");
5711    }
5712
5713    #[test]
5714    fn describe_table_not_found() {
5715        let svc = make_service();
5716        let req = make_request("DescribeTable", json!({"TableName": "nope"}));
5717        let err = expect_err(svc.describe_table(&req));
5718        assert!(err.to_string().contains("ResourceNotFoundException"));
5719    }
5720
5721    // ── DeleteTable ──
5722
5723    #[test]
5724    fn delete_table_removes_table() {
5725        let svc = make_service();
5726        create_test_table(&svc);
5727
5728        let req = make_request("DeleteTable", json!({"TableName": "test-table"}));
5729        let resp = svc.delete_table(&req).unwrap();
5730        let b = body_json(&resp);
5731        assert_eq!(b["TableDescription"]["TableName"], "test-table");
5732
5733        // Should be gone
5734        let req = make_request("DescribeTable", json!({"TableName": "test-table"}));
5735        assert!(svc.describe_table(&req).is_err());
5736    }
5737
5738    // ── ListTables ──
5739
5740    #[test]
5741    fn list_tables_returns_names() {
5742        let svc = make_service();
5743        create_test_table(&svc);
5744
5745        let req = make_request("ListTables", json!({}));
5746        let resp = svc.list_tables(&req).unwrap();
5747        let b = body_json(&resp);
5748        let names = b["TableNames"].as_array().unwrap();
5749        assert!(names.iter().any(|n| n == "test-table"));
5750    }
5751
5752    // ── PutItem / GetItem / DeleteItem ──
5753
5754    #[test]
5755    fn put_and_get_item() {
5756        let svc = make_service();
5757        create_test_table(&svc);
5758
5759        let req = make_request(
5760            "PutItem",
5761            json!({
5762                "TableName": "test-table",
5763                "Item": {
5764                    "pk": {"S": "key1"},
5765                    "name": {"S": "Alice"},
5766                    "age": {"N": "30"},
5767                },
5768            }),
5769        );
5770        svc.put_item(&req).unwrap();
5771
5772        let req = make_request(
5773            "GetItem",
5774            json!({
5775                "TableName": "test-table",
5776                "Key": {"pk": {"S": "key1"}},
5777            }),
5778        );
5779        let resp = svc.get_item(&req).unwrap();
5780        let b = body_json(&resp);
5781        assert_eq!(b["Item"]["name"]["S"], "Alice");
5782        assert_eq!(b["Item"]["age"]["N"], "30");
5783    }
5784
5785    #[test]
5786    fn get_item_not_found() {
5787        let svc = make_service();
5788        create_test_table(&svc);
5789
5790        let req = make_request(
5791            "GetItem",
5792            json!({
5793                "TableName": "test-table",
5794                "Key": {"pk": {"S": "nonexistent"}},
5795            }),
5796        );
5797        let resp = svc.get_item(&req).unwrap();
5798        let b = body_json(&resp);
5799        assert!(b.get("Item").is_none() || b["Item"].is_null());
5800    }
5801
5802    #[test]
5803    fn delete_item_removes_item() {
5804        let svc = make_service();
5805        create_test_table(&svc);
5806
5807        let req = make_request(
5808            "PutItem",
5809            json!({
5810                "TableName": "test-table",
5811                "Item": {"pk": {"S": "del-me"}},
5812            }),
5813        );
5814        svc.put_item(&req).unwrap();
5815
5816        let req = make_request(
5817            "DeleteItem",
5818            json!({
5819                "TableName": "test-table",
5820                "Key": {"pk": {"S": "del-me"}},
5821            }),
5822        );
5823        svc.delete_item(&req).unwrap();
5824
5825        let req = make_request(
5826            "GetItem",
5827            json!({
5828                "TableName": "test-table",
5829                "Key": {"pk": {"S": "del-me"}},
5830            }),
5831        );
5832        let resp = svc.get_item(&req).unwrap();
5833        let b = body_json(&resp);
5834        assert!(b.get("Item").is_none() || b["Item"].is_null());
5835    }
5836
5837    #[test]
5838    fn put_item_returns_old_item() {
5839        let svc = make_service();
5840        create_test_table(&svc);
5841
5842        let req = make_request(
5843            "PutItem",
5844            json!({
5845                "TableName": "test-table",
5846                "Item": {"pk": {"S": "overwrite"}, "v": {"N": "1"}},
5847            }),
5848        );
5849        svc.put_item(&req).unwrap();
5850
5851        let req = make_request(
5852            "PutItem",
5853            json!({
5854                "TableName": "test-table",
5855                "Item": {"pk": {"S": "overwrite"}, "v": {"N": "2"}},
5856                "ReturnValues": "ALL_OLD",
5857            }),
5858        );
5859        let resp = svc.put_item(&req).unwrap();
5860        let b = body_json(&resp);
5861        assert_eq!(b["Attributes"]["v"]["N"], "1");
5862    }
5863
5864    // ── UpdateItem ──
5865
5866    #[test]
5867    fn update_item_set_attribute() {
5868        let svc = make_service();
5869        create_test_table(&svc);
5870
5871        let req = make_request(
5872            "PutItem",
5873            json!({
5874                "TableName": "test-table",
5875                "Item": {"pk": {"S": "upd"}, "count": {"N": "0"}},
5876            }),
5877        );
5878        svc.put_item(&req).unwrap();
5879
5880        let req = make_request(
5881            "UpdateItem",
5882            json!({
5883                "TableName": "test-table",
5884                "Key": {"pk": {"S": "upd"}},
5885                "UpdateExpression": "SET #c = :val",
5886                "ExpressionAttributeNames": {"#c": "count"},
5887                "ExpressionAttributeValues": {":val": {"N": "42"}},
5888                "ReturnValues": "ALL_NEW",
5889            }),
5890        );
5891        let resp = svc.update_item(&req).unwrap();
5892        let b = body_json(&resp);
5893        assert_eq!(b["Attributes"]["count"]["N"], "42");
5894    }
5895
5896    // ── Query ──
5897
5898    #[test]
5899    fn query_returns_matching_items() {
5900        let svc = make_service();
5901        // Table with hash+range
5902        let req = make_request(
5903            "CreateTable",
5904            json!({
5905                "TableName": "query-table",
5906                "KeySchema": [
5907                    {"AttributeName": "pk", "KeyType": "HASH"},
5908                    {"AttributeName": "sk", "KeyType": "RANGE"},
5909                ],
5910                "AttributeDefinitions": [
5911                    {"AttributeName": "pk", "AttributeType": "S"},
5912                    {"AttributeName": "sk", "AttributeType": "S"},
5913                ],
5914                "BillingMode": "PAY_PER_REQUEST",
5915            }),
5916        );
5917        svc.create_table(&req).unwrap();
5918
5919        for i in 0..3 {
5920            let req = make_request(
5921                "PutItem",
5922                json!({
5923                    "TableName": "query-table",
5924                    "Item": {
5925                        "pk": {"S": "user1"},
5926                        "sk": {"S": format!("item-{i}")},
5927                    },
5928                }),
5929            );
5930            svc.put_item(&req).unwrap();
5931        }
5932        // Different partition
5933        let req = make_request(
5934            "PutItem",
5935            json!({
5936                "TableName": "query-table",
5937                "Item": {"pk": {"S": "user2"}, "sk": {"S": "item-0"}},
5938            }),
5939        );
5940        svc.put_item(&req).unwrap();
5941
5942        let req = make_request(
5943            "Query",
5944            json!({
5945                "TableName": "query-table",
5946                "KeyConditionExpression": "pk = :pk",
5947                "ExpressionAttributeValues": {":pk": {"S": "user1"}},
5948            }),
5949        );
5950        let resp = svc.query(&req).unwrap();
5951        let b = body_json(&resp);
5952        assert_eq!(b["Count"], 3);
5953        assert_eq!(b["Items"].as_array().unwrap().len(), 3);
5954    }
5955
5956    // ── Scan ──
5957
5958    #[test]
5959    fn scan_returns_all_items() {
5960        let svc = make_service();
5961        create_test_table(&svc);
5962
5963        for i in 0..5 {
5964            let req = make_request(
5965                "PutItem",
5966                json!({
5967                    "TableName": "test-table",
5968                    "Item": {"pk": {"S": format!("scan-{i}")}},
5969                }),
5970            );
5971            svc.put_item(&req).unwrap();
5972        }
5973
5974        let req = make_request("Scan", json!({"TableName": "test-table"}));
5975        let resp = svc.scan(&req).unwrap();
5976        let b = body_json(&resp);
5977        assert_eq!(b["Count"], 5);
5978    }
5979
5980    // ── BatchWriteItem / BatchGetItem ──
5981
5982    #[test]
5983    fn batch_write_and_get_items() {
5984        let svc = make_service();
5985        create_test_table(&svc);
5986
5987        let req = make_request(
5988            "BatchWriteItem",
5989            json!({
5990                "RequestItems": {
5991                    "test-table": [
5992                        {"PutRequest": {"Item": {"pk": {"S": "b1"}, "val": {"S": "v1"}}}},
5993                        {"PutRequest": {"Item": {"pk": {"S": "b2"}, "val": {"S": "v2"}}}},
5994                        {"PutRequest": {"Item": {"pk": {"S": "b3"}, "val": {"S": "v3"}}}},
5995                    ]
5996                }
5997            }),
5998        );
5999        let resp = svc.batch_write_item(&req).unwrap();
6000        let b = body_json(&resp);
6001        // Unprocessed should be empty
6002        assert!(
6003            b["UnprocessedItems"].as_object().unwrap().is_empty()
6004                || b["UnprocessedItems"]["test-table"]
6005                    .as_array()
6006                    .is_none_or(|a| a.is_empty())
6007        );
6008
6009        // BatchGetItem
6010        let req = make_request(
6011            "BatchGetItem",
6012            json!({
6013                "RequestItems": {
6014                    "test-table": {
6015                        "Keys": [
6016                            {"pk": {"S": "b1"}},
6017                            {"pk": {"S": "b2"}},
6018                            {"pk": {"S": "b3"}},
6019                        ]
6020                    }
6021                }
6022            }),
6023        );
6024        let resp = svc.batch_get_item(&req).unwrap();
6025        let b = body_json(&resp);
6026        let items = b["Responses"]["test-table"].as_array().unwrap();
6027        assert_eq!(items.len(), 3);
6028    }
6029
6030    // ── TransactWriteItems / TransactGetItems ──
6031
6032    #[test]
6033    fn transact_write_and_get() {
6034        let svc = make_service();
6035        create_test_table(&svc);
6036
6037        let req = make_request(
6038            "TransactWriteItems",
6039            json!({
6040                "TransactItems": [
6041                    {"Put": {"TableName": "test-table", "Item": {"pk": {"S": "tx1"}}}},
6042                    {"Put": {"TableName": "test-table", "Item": {"pk": {"S": "tx2"}}}},
6043                ]
6044            }),
6045        );
6046        svc.transact_write_items(&req).unwrap();
6047
6048        let req = make_request(
6049            "TransactGetItems",
6050            json!({
6051                "TransactItems": [
6052                    {"Get": {"TableName": "test-table", "Key": {"pk": {"S": "tx1"}}}},
6053                    {"Get": {"TableName": "test-table", "Key": {"pk": {"S": "tx2"}}}},
6054                ]
6055            }),
6056        );
6057        let resp = svc.transact_get_items(&req).unwrap();
6058        let b = body_json(&resp);
6059        let responses = b["Responses"].as_array().unwrap();
6060        assert_eq!(responses.len(), 2);
6061    }
6062
6063    // ── TagResource / UntagResource / ListTagsOfResource ──
6064
6065    #[test]
6066    fn tag_operations() {
6067        let svc = make_service();
6068        create_test_table(&svc);
6069        let arn = {
6070            let s = svc.state.read();
6071            s.default_ref()
6072                .tables
6073                .get("test-table")
6074                .unwrap()
6075                .arn
6076                .clone()
6077        };
6078
6079        let req = make_request(
6080            "TagResource",
6081            json!({
6082                "ResourceArn": arn,
6083                "Tags": [{"Key": "env", "Value": "test"}],
6084            }),
6085        );
6086        svc.tag_resource(&req).unwrap();
6087
6088        let req = make_request("ListTagsOfResource", json!({"ResourceArn": arn}));
6089        let resp = svc.list_tags_of_resource(&req).unwrap();
6090        let b = body_json(&resp);
6091        let tags = b["Tags"].as_array().unwrap();
6092        assert_eq!(tags.len(), 1);
6093        assert_eq!(tags[0]["Key"], "env");
6094
6095        let req = make_request(
6096            "UntagResource",
6097            json!({
6098                "ResourceArn": arn,
6099                "TagKeys": ["env"],
6100            }),
6101        );
6102        svc.untag_resource(&req).unwrap();
6103
6104        let req = make_request("ListTagsOfResource", json!({"ResourceArn": arn}));
6105        let resp = svc.list_tags_of_resource(&req).unwrap();
6106        let b = body_json(&resp);
6107        assert!(b["Tags"].as_array().unwrap().is_empty());
6108    }
6109
6110    // ── UpdateTable ──
6111
6112    #[test]
6113    fn update_table_add_gsi() {
6114        let svc = make_service();
6115        let req = make_request(
6116            "CreateTable",
6117            json!({
6118                "TableName": "upd-table",
6119                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
6120                "AttributeDefinitions": [
6121                    {"AttributeName": "pk", "AttributeType": "S"},
6122                    {"AttributeName": "gk", "AttributeType": "S"},
6123                ],
6124                "BillingMode": "PAY_PER_REQUEST",
6125            }),
6126        );
6127        svc.create_table(&req).unwrap();
6128
6129        let req = make_request(
6130            "UpdateTable",
6131            json!({
6132                "TableName": "upd-table",
6133                "GlobalSecondaryIndexUpdates": [{
6134                    "Create": {
6135                        "IndexName": "new-gsi",
6136                        "KeySchema": [{"AttributeName": "gk", "KeyType": "HASH"}],
6137                        "Projection": {"ProjectionType": "ALL"},
6138                    }
6139                }],
6140            }),
6141        );
6142        let resp = svc.update_table(&req).unwrap();
6143        let b = body_json(&resp);
6144        let gsi = b["TableDescription"]["GlobalSecondaryIndexes"]
6145            .as_array()
6146            .unwrap();
6147        assert_eq!(gsi.len(), 1);
6148        assert_eq!(gsi[0]["IndexName"], "new-gsi");
6149    }
6150
6151    // ── Scan with FilterExpression ──
6152
6153    #[test]
6154    fn scan_with_filter_expression() {
6155        let svc = make_service();
6156        create_test_table(&svc);
6157
6158        for i in 0..5 {
6159            let req = make_request(
6160                "PutItem",
6161                json!({
6162                    "TableName": "test-table",
6163                    "Item": {
6164                        "pk": {"S": format!("f-{i}")},
6165                        "status": {"S": if i % 2 == 0 { "active" } else { "inactive" }},
6166                    },
6167                }),
6168            );
6169            svc.put_item(&req).unwrap();
6170        }
6171
6172        let req = make_request(
6173            "Scan",
6174            json!({
6175                "TableName": "test-table",
6176                "FilterExpression": "#s = :val",
6177                "ExpressionAttributeNames": {"#s": "status"},
6178                "ExpressionAttributeValues": {":val": {"S": "active"}},
6179            }),
6180        );
6181        let resp = svc.scan(&req).unwrap();
6182        let b = body_json(&resp);
6183        assert_eq!(b["Count"], 3);
6184    }
6185
6186    // ── PartiQL operations (batch.rs coverage) ──
6187
6188    #[test]
6189    fn execute_statement_select() {
6190        let svc = make_service();
6191        create_test_table(&svc);
6192
6193        let req = make_request(
6194            "PutItem",
6195            json!({"TableName": "test-table", "Item": {"pk": {"S": "qs1"}, "val": {"S": "hello"}}}),
6196        );
6197        svc.put_item(&req).unwrap();
6198
6199        let req = make_request(
6200            "ExecuteStatement",
6201            json!({"Statement": "SELECT * FROM \"test-table\" WHERE pk='qs1'"}),
6202        );
6203        let resp = svc.execute_statement(&req).unwrap();
6204        let b = body_json(&resp);
6205        assert!(!b["Items"].as_array().unwrap().is_empty());
6206    }
6207
6208    #[test]
6209    fn execute_statement_insert() {
6210        let svc = make_service();
6211        create_test_table(&svc);
6212
6213        let req = make_request(
6214            "ExecuteStatement",
6215            json!({"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'ins1', 'data': 'val'}"}),
6216        );
6217        svc.execute_statement(&req).unwrap();
6218
6219        let req = make_request(
6220            "GetItem",
6221            json!({"TableName": "test-table", "Key": {"pk": {"S": "ins1"}}}),
6222        );
6223        let resp = svc.get_item(&req).unwrap();
6224        let b = body_json(&resp);
6225        assert_eq!(b["Item"]["data"]["S"], "val");
6226    }
6227
6228    #[test]
6229    fn batch_execute_statement() {
6230        let svc = make_service();
6231        create_test_table(&svc);
6232
6233        let req = make_request(
6234            "PutItem",
6235            json!({"TableName": "test-table", "Item": {"pk": {"S": "be1"}}}),
6236        );
6237        svc.put_item(&req).unwrap();
6238
6239        let req = make_request(
6240            "BatchExecuteStatement",
6241            json!({
6242                "Statements": [
6243                    {"Statement": "SELECT * FROM \"test-table\" WHERE pk='be1'"},
6244                ]
6245            }),
6246        );
6247        let resp = svc.batch_execute_statement(&req).unwrap();
6248        let b = body_json(&resp);
6249        assert!(b["Responses"].as_array().is_some());
6250    }
6251
6252    #[test]
6253    fn execute_transaction() {
6254        let svc = make_service();
6255        create_test_table(&svc);
6256
6257        let req = make_request(
6258            "ExecuteTransaction",
6259            json!({
6260                "TransactStatements": [
6261                    {"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'tx1'}"},
6262                    {"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'tx2'}"},
6263                ]
6264            }),
6265        );
6266        svc.execute_transaction(&req).unwrap();
6267
6268        let req = make_request(
6269            "GetItem",
6270            json!({"TableName": "test-table", "Key": {"pk": {"S": "tx1"}}}),
6271        );
6272        let resp = svc.get_item(&req).unwrap();
6273        let b = body_json(&resp);
6274        assert!(b["Item"].is_object());
6275    }
6276
6277    // ── Batch write with delete ──
6278
6279    #[test]
6280    fn batch_write_with_delete_requests() {
6281        let svc = make_service();
6282        create_test_table(&svc);
6283
6284        // Put items first
6285        for key in &["bwd1", "bwd2", "bwd3"] {
6286            let req = make_request(
6287                "PutItem",
6288                json!({"TableName": "test-table", "Item": {"pk": {"S": key}}}),
6289            );
6290            svc.put_item(&req).unwrap();
6291        }
6292
6293        // Batch delete two
6294        let req = make_request(
6295            "BatchWriteItem",
6296            json!({
6297                "RequestItems": {
6298                    "test-table": [
6299                        {"DeleteRequest": {"Key": {"pk": {"S": "bwd1"}}}},
6300                        {"DeleteRequest": {"Key": {"pk": {"S": "bwd2"}}}},
6301                    ]
6302                }
6303            }),
6304        );
6305        svc.batch_write_item(&req).unwrap();
6306
6307        // bwd3 should still exist
6308        let req = make_request(
6309            "GetItem",
6310            json!({"TableName": "test-table", "Key": {"pk": {"S": "bwd3"}}}),
6311        );
6312        let resp = svc.get_item(&req).unwrap();
6313        let b = body_json(&resp);
6314        assert!(b["Item"].is_object());
6315
6316        // bwd1 should be gone
6317        let req = make_request(
6318            "GetItem",
6319            json!({"TableName": "test-table", "Key": {"pk": {"S": "bwd1"}}}),
6320        );
6321        let resp = svc.get_item(&req).unwrap();
6322        let b = body_json(&resp);
6323        assert!(b.get("Item").is_none() || b["Item"].is_null());
6324    }
6325
6326    // ── Query with sort key condition ──
6327
6328    #[test]
6329    fn query_with_sort_key_begins_with() {
6330        let svc = make_service();
6331        // Table with hash+range
6332        let req = make_request(
6333            "CreateTable",
6334            json!({
6335                "TableName": "sk-table",
6336                "KeySchema": [
6337                    {"AttributeName": "pk", "KeyType": "HASH"},
6338                    {"AttributeName": "sk", "KeyType": "RANGE"},
6339                ],
6340                "AttributeDefinitions": [
6341                    {"AttributeName": "pk", "AttributeType": "S"},
6342                    {"AttributeName": "sk", "AttributeType": "S"},
6343                ],
6344                "BillingMode": "PAY_PER_REQUEST",
6345            }),
6346        );
6347        svc.create_table(&req).unwrap();
6348
6349        for sk in &["order#001", "order#002", "profile#main"] {
6350            let req = make_request(
6351                "PutItem",
6352                json!({"TableName": "sk-table", "Item": {"pk": {"S": "u1"}, "sk": {"S": sk}}}),
6353            );
6354            svc.put_item(&req).unwrap();
6355        }
6356
6357        let req = make_request(
6358            "Query",
6359            json!({
6360                "TableName": "sk-table",
6361                "KeyConditionExpression": "pk = :pk AND begins_with(sk, :prefix)",
6362                "ExpressionAttributeValues": {":pk": {"S": "u1"}, ":prefix": {"S": "order#"}},
6363            }),
6364        );
6365        let resp = svc.query(&req).unwrap();
6366        let b = body_json(&resp);
6367        assert_eq!(b["Count"], 2);
6368    }
6369
6370    // ── Scan with limit ──
6371
6372    #[test]
6373    fn scan_with_limit() {
6374        let svc = make_service();
6375        create_test_table(&svc);
6376
6377        for i in 0..10 {
6378            let req = make_request(
6379                "PutItem",
6380                json!({"TableName": "test-table", "Item": {"pk": {"S": format!("lim{i}")}}}),
6381            );
6382            svc.put_item(&req).unwrap();
6383        }
6384
6385        let req = make_request("Scan", json!({"TableName": "test-table", "Limit": 3}));
6386        let resp = svc.scan(&req).unwrap();
6387        let b = body_json(&resp);
6388        assert_eq!(b["Count"], 3);
6389        assert!(b["LastEvaluatedKey"].is_object());
6390    }
6391
6392    // ── Error branches ──
6393
6394    #[test]
6395    fn batch_get_item_table_not_found() {
6396        let svc = make_service();
6397        let req = make_request(
6398            "BatchGetItem",
6399            json!({"RequestItems": {"ghost": {"Keys": [{"pk": {"S": "k"}}]}}}),
6400        );
6401        assert!(svc.batch_get_item(&req).is_err());
6402    }
6403
6404    #[test]
6405    fn batch_write_item_table_not_found() {
6406        let svc = make_service();
6407        let req = make_request(
6408            "BatchWriteItem",
6409            json!({"RequestItems": {"ghost": [{"PutRequest": {"Item": {"pk": {"S": "k"}}}}]}}),
6410        );
6411        assert!(svc.batch_write_item(&req).is_err());
6412    }
6413
6414    // ── Global tables ──
6415
6416    #[test]
6417    fn create_and_describe_global_table() {
6418        let svc = make_service();
6419        create_test_table(&svc);
6420
6421        let req = make_request(
6422            "CreateGlobalTable",
6423            json!({
6424                "GlobalTableName": "test-table",
6425                "ReplicationGroup": [{"RegionName": "us-east-1"}, {"RegionName": "eu-west-1"}],
6426            }),
6427        );
6428        svc.create_global_table(&req).unwrap();
6429
6430        let req = make_request(
6431            "DescribeGlobalTable",
6432            json!({"GlobalTableName": "test-table"}),
6433        );
6434        let resp = svc.describe_global_table(&req).unwrap();
6435        let b = body_json(&resp);
6436        assert!(b["GlobalTableDescription"].is_object());
6437    }
6438
6439    #[test]
6440    fn list_global_tables() {
6441        let svc = make_service();
6442        let req = make_request("ListGlobalTables", json!({}));
6443        let resp = svc.list_global_tables(&req).unwrap();
6444        let b = body_json(&resp);
6445        assert!(b["GlobalTables"].as_array().is_some());
6446    }
6447
6448    // ── Backup operations ──
6449
6450    #[test]
6451    fn create_and_list_backups() {
6452        let svc = make_service();
6453        create_test_table(&svc);
6454
6455        let req = make_request(
6456            "CreateBackup",
6457            json!({"TableName": "test-table", "BackupName": "bak1"}),
6458        );
6459        let resp = svc.create_backup(&req).unwrap();
6460        let b = body_json(&resp);
6461        assert!(b["BackupDetails"]["BackupArn"].as_str().is_some());
6462
6463        let req = make_request("ListBackups", json!({}));
6464        let resp = svc.list_backups(&req).unwrap();
6465        let b = body_json(&resp);
6466        assert!(!b["BackupSummaries"].as_array().unwrap().is_empty());
6467    }
6468
6469    // ── Import/Export ──
6470
6471    #[test]
6472    fn describe_import_not_found() {
6473        let svc = make_service();
6474        let req = make_request(
6475            "DescribeImport",
6476            json!({"ImportArn": "arn:aws:dynamodb:us-east-1:123:table/t/import/ghost"}),
6477        );
6478        assert!(svc.describe_import(&req).is_err());
6479    }
6480
6481    #[test]
6482    fn describe_export_not_found() {
6483        let svc = make_service();
6484        let req = make_request(
6485            "DescribeExport",
6486            json!({"ExportArn": "arn:aws:dynamodb:us-east-1:123:table/t/export/ghost"}),
6487        );
6488        assert!(svc.describe_export(&req).is_err());
6489    }
6490
6491    // ── tables.rs error branches ──
6492
6493    #[test]
6494    fn create_table_missing_name_errors() {
6495        let svc = make_service();
6496        let req = make_request(
6497            "CreateTable",
6498            json!({
6499                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6500                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6501                "BillingMode": "PAY_PER_REQUEST"
6502            }),
6503        );
6504        assert!(svc.create_table(&req).is_err());
6505    }
6506
6507    #[test]
6508    fn create_table_duplicate_errors() {
6509        let svc = make_service();
6510        let req = make_request(
6511            "CreateTable",
6512            json!({
6513                "TableName": "dup",
6514                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6515                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6516                "BillingMode": "PAY_PER_REQUEST"
6517            }),
6518        );
6519        svc.create_table(&req).unwrap();
6520        assert!(svc.create_table(&req).is_err());
6521    }
6522
6523    #[test]
6524    fn delete_table_missing_name_errors() {
6525        let svc = make_service();
6526        let req = make_request("DeleteTable", json!({}));
6527        assert!(svc.delete_table(&req).is_err());
6528    }
6529
6530    #[test]
6531    fn delete_table_not_found_errors() {
6532        let svc = make_service();
6533        let req = make_request("DeleteTable", json!({"TableName": "ghost"}));
6534        assert!(svc.delete_table(&req).is_err());
6535    }
6536
6537    #[test]
6538    fn describe_table_missing_name_errors() {
6539        let svc = make_service();
6540        let req = make_request("DescribeTable", json!({}));
6541        assert!(svc.describe_table(&req).is_err());
6542    }
6543
6544    #[test]
6545    fn describe_table_not_found_errors() {
6546        let svc = make_service();
6547        let req = make_request("DescribeTable", json!({"TableName": "ghost"}));
6548        assert!(svc.describe_table(&req).is_err());
6549    }
6550
6551    #[test]
6552    fn update_table_missing_name_errors() {
6553        let svc = make_service();
6554        let req = make_request("UpdateTable", json!({}));
6555        assert!(svc.update_table(&req).is_err());
6556    }
6557
6558    #[test]
6559    fn update_table_not_found_errors() {
6560        let svc = make_service();
6561        let req = make_request("UpdateTable", json!({"TableName": "ghost"}));
6562        assert!(svc.update_table(&req).is_err());
6563    }
6564
6565    #[test]
6566    fn list_tables_pagination() {
6567        let svc = make_service();
6568        for i in 0..5 {
6569            let req = make_request(
6570                "CreateTable",
6571                json!({
6572                    "TableName": format!("pt{i}"),
6573                    "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6574                    "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6575                    "BillingMode": "PAY_PER_REQUEST"
6576                }),
6577            );
6578            svc.create_table(&req).unwrap();
6579        }
6580        let req = make_request("ListTables", json!({"Limit": 2}));
6581        let resp = svc.list_tables(&req).unwrap();
6582        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6583        assert_eq!(body["TableNames"].as_array().unwrap().len(), 2);
6584        assert!(body["LastEvaluatedTableName"].is_string());
6585    }
6586
6587    #[test]
6588    fn list_tables_start_exclusive() {
6589        let svc = make_service();
6590        for i in 0..3 {
6591            let req = make_request(
6592                "CreateTable",
6593                json!({
6594                    "TableName": format!("pt{i}"),
6595                    "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6596                    "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6597                    "BillingMode": "PAY_PER_REQUEST"
6598                }),
6599            );
6600            svc.create_table(&req).unwrap();
6601        }
6602        let req = make_request("ListTables", json!({"ExclusiveStartTableName": "pt0"}));
6603        let resp = svc.list_tables(&req).unwrap();
6604        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6605        let names = body["TableNames"].as_array().unwrap();
6606        assert!(!names.iter().any(|n| n == "pt0"));
6607    }
6608
6609    #[test]
6610    fn update_time_to_live_unknown_table_errors() {
6611        let svc = make_service();
6612        let req = make_request(
6613            "UpdateTimeToLive",
6614            json!({
6615                "TableName": "ghost",
6616                "TimeToLiveSpecification": {"Enabled": true, "AttributeName": "ttl"}
6617            }),
6618        );
6619        assert!(svc.update_time_to_live(&req).is_err());
6620    }
6621
6622    #[test]
6623    fn describe_time_to_live_unknown_table_errors() {
6624        let svc = make_service();
6625        let req = make_request("DescribeTimeToLive", json!({"TableName": "ghost"}));
6626        assert!(svc.describe_time_to_live(&req).is_err());
6627    }
6628
6629    // ── resource policy ──
6630
6631    #[test]
6632    fn put_resource_policy_missing_policy_errors() {
6633        let svc = make_service();
6634        let req = make_request(
6635            "CreateTable",
6636            json!({
6637                "TableName": "rp",
6638                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6639                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6640                "BillingMode": "PAY_PER_REQUEST"
6641            }),
6642        );
6643        svc.create_table(&req).unwrap();
6644        let req = make_request(
6645            "PutResourcePolicy",
6646            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/rp"}),
6647        );
6648        assert!(svc.put_resource_policy(&req).is_err());
6649    }
6650
6651    #[test]
6652    fn get_resource_policy_unknown_table_errors() {
6653        let svc = make_service();
6654        let req = make_request(
6655            "GetResourcePolicy",
6656            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost"}),
6657        );
6658        assert!(svc.get_resource_policy(&req).is_err());
6659    }
6660
6661    // ── tags ──
6662
6663    #[test]
6664    fn tag_resource_unknown_table_errors() {
6665        let svc = make_service();
6666        let req = make_request(
6667            "TagResource",
6668            json!({
6669                "ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost",
6670                "Tags": [{"Key": "k", "Value": "v"}]
6671            }),
6672        );
6673        assert!(svc.tag_resource(&req).is_err());
6674    }
6675
6676    #[test]
6677    fn list_tags_unknown_table_errors() {
6678        let svc = make_service();
6679        let req = make_request(
6680            "ListTagsOfResource",
6681            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost"}),
6682        );
6683        assert!(svc.list_tags_of_resource(&req).is_err());
6684    }
6685
6686    // ── backups ──
6687
6688    #[test]
6689    fn create_backup_unknown_table_errors() {
6690        let svc = make_service();
6691        let req = make_request(
6692            "CreateBackup",
6693            json!({"TableName": "ghost", "BackupName": "b1"}),
6694        );
6695        assert!(svc.create_backup(&req).is_err());
6696    }
6697
6698    #[test]
6699    fn delete_backup_not_found_errors() {
6700        let svc = make_service();
6701        let req = make_request(
6702            "DeleteBackup",
6703            json!({"BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"}),
6704        );
6705        assert!(svc.delete_backup(&req).is_err());
6706    }
6707
6708    #[test]
6709    fn describe_backup_not_found_errors() {
6710        let svc = make_service();
6711        let req = make_request(
6712            "DescribeBackup",
6713            json!({"BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"}),
6714        );
6715        assert!(svc.describe_backup(&req).is_err());
6716    }
6717
6718    #[test]
6719    fn restore_table_from_backup_not_found_errors() {
6720        let svc = make_service();
6721        let req = make_request(
6722            "RestoreTableFromBackup",
6723            json!({
6724                "TargetTableName": "restored",
6725                "BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"
6726            }),
6727        );
6728        assert!(svc.restore_table_from_backup(&req).is_err());
6729    }
6730
6731    #[test]
6732    fn update_continuous_backups_unknown_table_errors() {
6733        let svc = make_service();
6734        let req = make_request(
6735            "UpdateContinuousBackups",
6736            json!({
6737                "TableName": "ghost",
6738                "PointInTimeRecoverySpecification": {"PointInTimeRecoveryEnabled": true}
6739            }),
6740        );
6741        assert!(svc.update_continuous_backups(&req).is_err());
6742    }
6743
6744    // ── items.rs: put_item error branches ──
6745
6746    #[test]
6747    fn put_item_unknown_table_errors() {
6748        let svc = make_service();
6749        let req = make_request(
6750            "PutItem",
6751            json!({
6752                "TableName": "ghost",
6753                "Item": {"k": {"S": "v"}}
6754            }),
6755        );
6756        assert!(svc.put_item(&req).is_err());
6757    }
6758
6759    #[test]
6760    fn put_item_missing_key_attribute_errors() {
6761        let svc = make_service();
6762        svc.create_table(&make_request(
6763            "CreateTable",
6764            json!({
6765                "TableName": "pmk",
6766                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6767                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6768                "BillingMode": "PAY_PER_REQUEST"
6769            }),
6770        ))
6771        .unwrap();
6772        let req = make_request(
6773            "PutItem",
6774            json!({
6775                "TableName": "pmk",
6776                "Item": {"other": {"S": "v"}}
6777            }),
6778        );
6779        assert!(svc.put_item(&req).is_err());
6780    }
6781
6782    #[test]
6783    fn get_item_unknown_table_errors() {
6784        let svc = make_service();
6785        let req = make_request(
6786            "GetItem",
6787            json!({"TableName": "ghost", "Key": {"k": {"S": "1"}}}),
6788        );
6789        assert!(svc.get_item(&req).is_err());
6790    }
6791
6792    #[test]
6793    fn delete_item_unknown_table_errors() {
6794        let svc = make_service();
6795        let req = make_request(
6796            "DeleteItem",
6797            json!({"TableName": "ghost", "Key": {"k": {"S": "1"}}}),
6798        );
6799        assert!(svc.delete_item(&req).is_err());
6800    }
6801
6802    #[test]
6803    fn update_item_unknown_table_errors() {
6804        let svc = make_service();
6805        let req = make_request(
6806            "UpdateItem",
6807            json!({
6808                "TableName": "ghost",
6809                "Key": {"k": {"S": "1"}},
6810                "UpdateExpression": "SET x = :v",
6811                "ExpressionAttributeValues": {":v": {"S": "val"}}
6812            }),
6813        );
6814        assert!(svc.update_item(&req).is_err());
6815    }
6816
6817    #[test]
6818    fn query_unknown_table_errors() {
6819        let svc = make_service();
6820        let req = make_request(
6821            "Query",
6822            json!({
6823                "TableName": "ghost",
6824                "KeyConditionExpression": "k = :v",
6825                "ExpressionAttributeValues": {":v": {"S": "x"}}
6826            }),
6827        );
6828        assert!(svc.query(&req).is_err());
6829    }
6830
6831    #[test]
6832    fn scan_unknown_table_errors() {
6833        let svc = make_service();
6834        let req = make_request("Scan", json!({"TableName": "ghost"}));
6835        assert!(svc.scan(&req).is_err());
6836    }
6837
6838    #[test]
6839    fn scan_with_limit_returns_ok() {
6840        let svc = make_service();
6841        svc.create_table(&make_request(
6842            "CreateTable",
6843            json!({
6844                "TableName": "slt",
6845                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6846                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6847                "BillingMode": "PAY_PER_REQUEST"
6848            }),
6849        ))
6850        .unwrap();
6851        for i in 0..5 {
6852            svc.put_item(&make_request(
6853                "PutItem",
6854                json!({
6855                    "TableName": "slt",
6856                    "Item": {"k": {"S": format!("key-{i}")}}
6857                }),
6858            ))
6859            .unwrap();
6860        }
6861        let req = make_request("Scan", json!({"TableName": "slt", "Limit": 2}));
6862        let resp = svc.scan(&req).unwrap();
6863        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6864        assert_eq!(body["Count"], 2);
6865    }
6866
6867    #[test]
6868    fn batch_get_item_unknown_table_errors() {
6869        let svc = make_service();
6870        let req = make_request(
6871            "BatchGetItem",
6872            json!({
6873                "RequestItems": {
6874                    "ghost": {"Keys": [{"k": {"S": "1"}}]}
6875                }
6876            }),
6877        );
6878        assert!(svc.batch_get_item(&req).is_err());
6879    }
6880
6881    #[test]
6882    fn batch_write_item_unknown_table_errors() {
6883        let svc = make_service();
6884        let req = make_request(
6885            "BatchWriteItem",
6886            json!({
6887                "RequestItems": {
6888                    "ghost": [{"PutRequest": {"Item": {"k": {"S": "1"}}}}]
6889                }
6890            }),
6891        );
6892        assert!(svc.batch_write_item(&req).is_err());
6893    }
6894
6895    #[test]
6896    fn transact_write_items_unknown_table_errors() {
6897        let svc = make_service();
6898        let req = make_request(
6899            "TransactWriteItems",
6900            json!({
6901                "TransactItems": [{
6902                    "Put": {"TableName": "ghost", "Item": {"k": {"S": "1"}}}
6903                }]
6904            }),
6905        );
6906        assert!(svc.transact_write_items(&req).is_err());
6907    }
6908
6909    #[test]
6910    fn transact_get_items_unknown_table_errors() {
6911        let svc = make_service();
6912        let req = make_request(
6913            "TransactGetItems",
6914            json!({
6915                "TransactItems": [{
6916                    "Get": {"TableName": "ghost", "Key": {"k": {"S": "1"}}}
6917                }]
6918            }),
6919        );
6920        assert!(svc.transact_get_items(&req).is_err());
6921    }
6922
6923    #[test]
6924    fn describe_global_table_not_found_b() {
6925        let svc = make_service();
6926        let req = make_request("DescribeGlobalTable", json!({"GlobalTableName": "ghost"}));
6927        assert!(svc.describe_global_table(&req).is_err());
6928    }
6929
6930    #[test]
6931    fn list_global_tables_empty_ok() {
6932        let svc = make_service();
6933        let req = make_request("ListGlobalTables", json!({}));
6934        let resp = svc.list_global_tables(&req).unwrap();
6935        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6936        assert!(body["GlobalTables"].is_array());
6937    }
6938
6939    #[test]
6940    fn split_on_top_level_keyword_between_swallows_inner_and() {
6941        let parts = split_on_top_level_keyword("x = :a AND y BETWEEN :lo AND :hi", "AND");
6942        assert_eq!(
6943            parts.len(),
6944            2,
6945            "BETWEEN's inner AND must not split; got parts = {parts:?}"
6946        );
6947    }
6948
6949    #[test]
6950    fn split_on_top_level_keyword_between_nested_parens() {
6951        let parts = split_on_top_level_keyword("(x = :a) AND (y BETWEEN :lo AND :hi)", "AND");
6952        assert_eq!(parts.len(), 2);
6953    }
6954
6955    #[test]
6956    fn split_on_top_level_keyword_whitespace_variants() {
6957        for expr in [
6958            "x = :a AND y = :b",
6959            "x=:a AND y=:b",
6960            "  x = :a   AND   y = :b  ",
6961            "x\t=\t:a\tAND\ty\t=\t:b",
6962            "x = :a\nAND\ny = :b",
6963        ] {
6964            let parts = split_on_top_level_keyword(expr, "AND");
6965            assert_eq!(parts.len(), 2, "whitespace variant failed: {expr:?}");
6966        }
6967    }
6968
6969    #[test]
6970    fn split_on_top_level_keyword_case_insensitive() {
6971        let parts = split_on_top_level_keyword("x = :a and y = :b", "AND");
6972        assert_eq!(parts.len(), 2);
6973        let parts = split_on_top_level_keyword("x = :a OR y = :b", "OR");
6974        assert_eq!(parts.len(), 2);
6975    }
6976
6977    #[test]
6978    fn split_on_top_level_keyword_does_not_match_inside_identifiers() {
6979        // `land` contains "AND" but isn't word-bounded — must not split.
6980        let parts = split_on_top_level_keyword("land = :a", "AND");
6981        assert_eq!(parts.len(), 1);
6982    }
6983}