Skip to main content

fakecloud_dynamodb/service/
mod.rs

1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use base64::Engine;
15use http::StatusCode;
16use serde_json::{json, Value};
17
18use fakecloud_core::delivery::DeliveryBus;
19use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
20
21use fakecloud_persistence::{S3Store, SnapshotStore};
22use fakecloud_s3::state::SharedS3State;
23
24use crate::state::{
25    attribute_type_and_value, AttributeDefinition, AttributeValue, DynamoDbSnapshot, DynamoTable,
26    GlobalSecondaryIndex, KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection,
27    ProvisionedThroughput, SharedDynamoDbState, DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
28};
29
30/// Minimal subset of a ``DynamoTable`` that Kinesis streaming delivery needs.
31///
32/// A table can carry megabytes of items; cloning the whole table just to
33/// release the write lock and deliver one change record is extremely wasteful.
34/// Extracting only the fields the delivery path actually reads (destinations,
35/// arn, name) keeps the clone small.
36pub(super) struct KinesisDeliveryTarget {
37    pub destinations: Vec<KinesisDestination>,
38    pub arn: String,
39    pub name: String,
40}
41
42/// Operation flavor for the per-item KMS audit-trail emitter. Reads
43/// emit a paired `Decrypt` after `GenerateDataKey`; writes only emit
44/// `GenerateDataKey`, mirroring AWS's audit shape.
45pub(crate) enum TableKmsOp {
46    Read,
47    Write,
48}
49
50pub struct DynamoDbService {
51    state: SharedDynamoDbState,
52    pub(crate) s3_state: Option<SharedS3State>,
53    pub(crate) s3_store: Option<Arc<dyn S3Store>>,
54    delivery: Option<Arc<DeliveryBus>>,
55    snapshot_store: Option<Arc<dyn SnapshotStore>>,
56    pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
57    pub(crate) region: String,
58    /// Serializes concurrent snapshot writes so the newest observed
59    /// state always wins on disk. Without it, two tasks could race
60    /// between state.read().clone() and store.save() and leave older
61    /// bytes as the final on-disk state.
62    snapshot_lock: Arc<tokio::sync::Mutex<()>>,
63}
64
65impl DynamoDbService {
66    pub fn new(state: SharedDynamoDbState) -> Self {
67        Self {
68            state,
69            s3_state: None,
70            s3_store: None,
71            delivery: None,
72            snapshot_store: None,
73            kms_hook: None,
74            region: "us-east-1".to_string(),
75            snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
76        }
77    }
78
79    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
80        self.s3_state = Some(s3_state);
81        self
82    }
83
84    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
85        self.s3_store = Some(store);
86        self
87    }
88
89    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
90        self.delivery = Some(delivery);
91        self
92    }
93
94    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
95        self.snapshot_store = Some(store);
96        self
97    }
98
99    pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
100        self.kms_hook = Some(hook);
101        self
102    }
103
104    pub fn with_region(mut self, region: impl Into<String>) -> Self {
105        self.region = region.into();
106        self
107    }
108
109    /// Record `GenerateDataKey` + `Decrypt` for an SSE-KMS table on a
110    /// PutItem/UpdateItem (write) and GetItem/Query/Scan (read). DDB
111    /// item bodies are nested attribute maps — encrypting them in
112    /// fakecloud would balloon scope without adding test coverage that
113    /// users actually want, so we just emit the audit-trail records the
114    /// AWS API produces and let callers assert KMS usage via
115    /// `/_fakecloud/kms/usage`.
116    pub(crate) fn record_table_kms_usage(
117        &self,
118        account_id: &str,
119        table_arn: &str,
120        kms_key_arn: Option<&str>,
121        operation: TableKmsOp,
122    ) {
123        let Some(hook) = &self.kms_hook else { return };
124        let key = kms_key_arn
125            .filter(|k| !k.is_empty())
126            .unwrap_or("aws/dynamodb");
127        // DynamoDB SSE-KMS uses the AWS-documented encryption context:
128        // {aws:dynamodb:tableName: <name>, aws:dynamodb:subscriberId: <account>}
129        // — see the AWS DynamoDB encryption-at-rest docs. The table arn
130        // ends with `:table/<name>`, so derive the name from it.
131        let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
132        let mut ctx = std::collections::HashMap::new();
133        ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
134        ctx.insert(
135            "aws:dynamodb:subscriberId".to_string(),
136            account_id.to_string(),
137        );
138        let envelope = match hook.encrypt(
139            account_id,
140            &self.region,
141            key,
142            b"ddb-item",
143            "dynamodb.amazonaws.com",
144            ctx.clone(),
145        ) {
146            Ok(env) => env,
147            Err(_) => return,
148        };
149        if matches!(operation, TableKmsOp::Read) {
150            let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
151        }
152    }
153
154    /// Persist the current in-memory state as a snapshot. Called after
155    /// every state-mutating action. A noop when no snapshot store is
156    /// configured (i.e. `StorageMode::Memory`).
157    ///
158    /// The snapshot lock serializes the full clone + serialize + write
159    /// so concurrent mutators cannot leave older bytes on disk, and
160    /// serialization + the blocking file write are offloaded to the
161    /// blocking pool to keep Tokio workers responsive.
162    async fn save_snapshot(&self) {
163        let Some(store) = self.snapshot_store.clone() else {
164            return;
165        };
166        let _guard = self.snapshot_lock.lock().await;
167        let snapshot = DynamoDbSnapshot {
168            schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
169            accounts: Some(self.state.read().clone()),
170            state: None,
171        };
172        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
173            let bytes = serde_json::to_vec(&snapshot)
174                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
175            store.save(&bytes)
176        })
177        .await;
178        match join {
179            Ok(Ok(())) => {}
180            Ok(Err(err)) => tracing::error!(%err, "failed to write dynamodb snapshot"),
181            Err(err) => tracing::error!(%err, "dynamodb snapshot task panicked"),
182        }
183    }
184
185    fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
186        if table
187            .kinesis_destinations
188            .iter()
189            .any(|d| d.destination_status == "ACTIVE")
190        {
191            Some(KinesisDeliveryTarget {
192                destinations: table.kinesis_destinations.clone(),
193                arn: table.arn.clone(),
194                name: table.name.clone(),
195            })
196        } else {
197            None
198        }
199    }
200
201    /// Deliver a change record to all active Kinesis streaming destinations for a table.
202    pub(super) fn deliver_to_kinesis_destinations(
203        &self,
204        target: &KinesisDeliveryTarget,
205        event_name: &str,
206        keys: &HashMap<String, AttributeValue>,
207        old_image: Option<&HashMap<String, AttributeValue>>,
208        new_image: Option<&HashMap<String, AttributeValue>>,
209    ) {
210        let delivery = match &self.delivery {
211            Some(d) => d,
212            None => return,
213        };
214
215        let active_destinations: Vec<_> = target
216            .destinations
217            .iter()
218            .filter(|d| d.destination_status == "ACTIVE")
219            .collect();
220
221        if active_destinations.is_empty() {
222            return;
223        }
224
225        let mut record = json!({
226            "eventID": uuid::Uuid::new_v4().to_string(),
227            "eventName": event_name,
228            "eventVersion": "1.1",
229            "eventSource": "aws:dynamodb",
230            "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
231            "dynamodb": {
232                "Keys": keys,
233                "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
234                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
235                "StreamViewType": "NEW_AND_OLD_IMAGES",
236            },
237            "eventSourceARN": &target.arn,
238            "tableName": &target.name,
239        });
240
241        if let Some(old) = old_image {
242            record["dynamodb"]["OldImage"] = json!(old);
243        }
244        if let Some(new) = new_image {
245            record["dynamodb"]["NewImage"] = json!(new);
246        }
247
248        let record_str = serde_json::to_string(&record).unwrap_or_default();
249        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
250        let partition_key = serde_json::to_string(keys).unwrap_or_default();
251
252        for dest in active_destinations {
253            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
254        }
255    }
256
257    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
258        serde_json::from_slice(&req.body).map_err(|e| {
259            AwsServiceError::aws_error(
260                StatusCode::BAD_REQUEST,
261                "SerializationException",
262                format!("Invalid JSON: {e}"),
263            )
264        })
265    }
266
267    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
268        Ok(AwsResponse::ok_json(body))
269    }
270}
271
272#[async_trait]
273impl AwsService for DynamoDbService {
274    fn service_name(&self) -> &str {
275        "dynamodb"
276    }
277
278    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
279        let mutates = is_mutating_action(req.action.as_str());
280        let result = match req.action.as_str() {
281            "CreateTable" => self.create_table(&req),
282            "DeleteTable" => self.delete_table(&req),
283            "DescribeTable" => self.describe_table(&req),
284            "ListTables" => self.list_tables(&req),
285            "UpdateTable" => self.update_table(&req),
286            "PutItem" => self.put_item(&req),
287            "GetItem" => self.get_item(&req),
288            "DeleteItem" => self.delete_item(&req),
289            "UpdateItem" => self.update_item(&req),
290            "Query" => self.query(&req),
291            "Scan" => self.scan(&req),
292            "BatchGetItem" => self.batch_get_item(&req),
293            "BatchWriteItem" => self.batch_write_item(&req),
294            "TagResource" => self.tag_resource(&req),
295            "UntagResource" => self.untag_resource(&req),
296            "ListTagsOfResource" => self.list_tags_of_resource(&req),
297            "TransactGetItems" => self.transact_get_items(&req),
298            "TransactWriteItems" => self.transact_write_items(&req),
299            "ExecuteStatement" => self.execute_statement(&req),
300            "BatchExecuteStatement" => self.batch_execute_statement(&req),
301            "ExecuteTransaction" => self.execute_transaction(&req),
302            "UpdateTimeToLive" => self.update_time_to_live(&req),
303            "DescribeTimeToLive" => self.describe_time_to_live(&req),
304            "PutResourcePolicy" => self.put_resource_policy(&req),
305            "GetResourcePolicy" => self.get_resource_policy(&req),
306            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
307            // Synthetic defaults (no DAX endpoint discovery / no real per-account quotas tracked)
308            "DescribeEndpoints" => self.describe_endpoints(&req),
309            "DescribeLimits" => self.describe_limits(&req),
310            // Backups
311            "CreateBackup" => self.create_backup(&req),
312            "DeleteBackup" => self.delete_backup(&req),
313            "DescribeBackup" => self.describe_backup(&req),
314            "ListBackups" => self.list_backups(&req),
315            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
316            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
317            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
318            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
319            // Global tables
320            "CreateGlobalTable" => self.create_global_table(&req),
321            "DescribeGlobalTable" => self.describe_global_table(&req),
322            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
323            "ListGlobalTables" => self.list_global_tables(&req),
324            "UpdateGlobalTable" => self.update_global_table(&req),
325            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
326            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
327            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
328            // Kinesis streaming
329            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
330            "DisableKinesisStreamingDestination" => {
331                self.disable_kinesis_streaming_destination(&req)
332            }
333            "DescribeKinesisStreamingDestination" => {
334                self.describe_kinesis_streaming_destination(&req)
335            }
336            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
337            // Contributor insights
338            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
339            "UpdateContributorInsights" => self.update_contributor_insights(&req),
340            "ListContributorInsights" => self.list_contributor_insights(&req),
341            // Import/Export
342            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
343            "DescribeExport" => self.describe_export(&req),
344            "ListExports" => self.list_exports(&req),
345            "ImportTable" => self.import_table(&req),
346            "DescribeImport" => self.describe_import(&req),
347            "ListImports" => self.list_imports(&req),
348            _ => Err(AwsServiceError::action_not_implemented(
349                "dynamodb",
350                &req.action,
351            )),
352        };
353        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
354            self.save_snapshot().await;
355        }
356        result
357    }
358
359    fn supported_actions(&self) -> &[&str] {
360        &[
361            "CreateTable",
362            "DeleteTable",
363            "DescribeTable",
364            "ListTables",
365            "UpdateTable",
366            "PutItem",
367            "GetItem",
368            "DeleteItem",
369            "UpdateItem",
370            "Query",
371            "Scan",
372            "BatchGetItem",
373            "BatchWriteItem",
374            "TagResource",
375            "UntagResource",
376            "ListTagsOfResource",
377            "TransactGetItems",
378            "TransactWriteItems",
379            "ExecuteStatement",
380            "BatchExecuteStatement",
381            "ExecuteTransaction",
382            "UpdateTimeToLive",
383            "DescribeTimeToLive",
384            "PutResourcePolicy",
385            "GetResourcePolicy",
386            "DeleteResourcePolicy",
387            "DescribeEndpoints",
388            "DescribeLimits",
389            "CreateBackup",
390            "DeleteBackup",
391            "DescribeBackup",
392            "ListBackups",
393            "RestoreTableFromBackup",
394            "RestoreTableToPointInTime",
395            "UpdateContinuousBackups",
396            "DescribeContinuousBackups",
397            "CreateGlobalTable",
398            "DescribeGlobalTable",
399            "DescribeGlobalTableSettings",
400            "ListGlobalTables",
401            "UpdateGlobalTable",
402            "UpdateGlobalTableSettings",
403            "DescribeTableReplicaAutoScaling",
404            "UpdateTableReplicaAutoScaling",
405            "EnableKinesisStreamingDestination",
406            "DisableKinesisStreamingDestination",
407            "DescribeKinesisStreamingDestination",
408            "UpdateKinesisStreamingDestination",
409            "DescribeContributorInsights",
410            "UpdateContributorInsights",
411            "ListContributorInsights",
412            "ExportTableToPointInTime",
413            "DescribeExport",
414            "ListExports",
415            "ImportTable",
416            "DescribeImport",
417            "ListImports",
418        ]
419    }
420}
421/// Actions that mutate DynamoDB state and therefore require a snapshot
422/// write after success. Kept in sync with the dispatch table above.
423fn is_mutating_action(action: &str) -> bool {
424    matches!(
425        action,
426        "CreateTable"
427            | "DeleteTable"
428            | "UpdateTable"
429            | "PutItem"
430            | "DeleteItem"
431            | "UpdateItem"
432            | "BatchWriteItem"
433            | "TagResource"
434            | "UntagResource"
435            | "TransactWriteItems"
436            | "ExecuteStatement"
437            | "BatchExecuteStatement"
438            | "ExecuteTransaction"
439            | "UpdateTimeToLive"
440            | "PutResourcePolicy"
441            | "DeleteResourcePolicy"
442            | "CreateBackup"
443            | "DeleteBackup"
444            | "RestoreTableFromBackup"
445            | "RestoreTableToPointInTime"
446            | "UpdateContinuousBackups"
447            | "CreateGlobalTable"
448            | "UpdateGlobalTable"
449            | "UpdateGlobalTableSettings"
450            | "UpdateTableReplicaAutoScaling"
451            | "EnableKinesisStreamingDestination"
452            | "DisableKinesisStreamingDestination"
453            | "UpdateKinesisStreamingDestination"
454            | "UpdateContributorInsights"
455            | "ExportTableToPointInTime"
456            | "ImportTable"
457    )
458}
459
460// ── Helper functions ────────────────────────────────────────────────────
461
462fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
463    body[field].as_str().ok_or_else(|| {
464        AwsServiceError::aws_error(
465            StatusCode::BAD_REQUEST,
466            "ValidationException",
467            format!("{field} is required"),
468        )
469    })
470}
471
472fn require_object(
473    body: &Value,
474    field: &str,
475) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
476    let obj = body[field].as_object().ok_or_else(|| {
477        AwsServiceError::aws_error(
478            StatusCode::BAD_REQUEST,
479            "ValidationException",
480            format!("{field} is required"),
481        )
482    })?;
483    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
484}
485
486fn get_table<'a>(
487    tables: &'a HashMap<String, DynamoTable>,
488    name: &str,
489) -> Result<&'a DynamoTable, AwsServiceError> {
490    tables.get(name).ok_or_else(|| {
491        AwsServiceError::aws_error(
492            StatusCode::BAD_REQUEST,
493            "ResourceNotFoundException",
494            format!("Requested resource not found: Table: {name} not found"),
495        )
496    })
497}
498
499fn get_table_mut<'a>(
500    tables: &'a mut HashMap<String, DynamoTable>,
501    name: &str,
502) -> Result<&'a mut DynamoTable, AwsServiceError> {
503    tables.get_mut(name).ok_or_else(|| {
504        AwsServiceError::aws_error(
505            StatusCode::BAD_REQUEST,
506            "ResourceNotFoundException",
507            format!("Requested resource not found: Table: {name} not found"),
508        )
509    })
510}
511
512fn find_table_by_arn<'a>(
513    tables: &'a HashMap<String, DynamoTable>,
514    arn: &str,
515) -> Result<&'a DynamoTable, AwsServiceError> {
516    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
517        AwsServiceError::aws_error(
518            StatusCode::BAD_REQUEST,
519            "ResourceNotFoundException",
520            format!("Requested resource not found: {arn}"),
521        )
522    })
523}
524
525fn find_table_by_arn_mut<'a>(
526    tables: &'a mut HashMap<String, DynamoTable>,
527    arn: &str,
528) -> Result<&'a mut DynamoTable, AwsServiceError> {
529    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
530        AwsServiceError::aws_error(
531            StatusCode::BAD_REQUEST,
532            "ResourceNotFoundException",
533            format!("Requested resource not found: {arn}"),
534        )
535    })
536}
537
538fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
539    let arr = val.as_array().ok_or_else(|| {
540        AwsServiceError::aws_error(
541            StatusCode::BAD_REQUEST,
542            "ValidationException",
543            "KeySchema is required",
544        )
545    })?;
546    Ok(arr
547        .iter()
548        .map(|elem| KeySchemaElement {
549            attribute_name: elem["AttributeName"]
550                .as_str()
551                .unwrap_or_default()
552                .to_string(),
553            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
554        })
555        .collect())
556}
557
558fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
559    let arr = val.as_array().ok_or_else(|| {
560        AwsServiceError::aws_error(
561            StatusCode::BAD_REQUEST,
562            "ValidationException",
563            "AttributeDefinitions is required",
564        )
565    })?;
566    Ok(arr
567        .iter()
568        .map(|elem| AttributeDefinition {
569            attribute_name: elem["AttributeName"]
570                .as_str()
571                .unwrap_or_default()
572                .to_string(),
573            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
574        })
575        .collect())
576}
577
578fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
579    Ok(ProvisionedThroughput {
580        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
581        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
582    })
583}
584
585fn parse_gsi(val: &Value, billing_mode: &str) -> Vec<GlobalSecondaryIndex> {
586    let Some(arr) = val.as_array() else {
587        return Vec::new();
588    };
589    arr.iter()
590        .filter_map(|g| {
591            Some(GlobalSecondaryIndex {
592                index_name: g["IndexName"].as_str()?.to_string(),
593                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
594                projection: parse_projection(&g["Projection"]),
595                provisioned_throughput: Some(parse_gsi_throughput(
596                    &g["ProvisionedThroughput"],
597                    billing_mode,
598                )),
599                on_demand_throughput: parse_on_demand_throughput(&g["OnDemandThroughput"]),
600            })
601        })
602        .collect()
603}
604
605/// Parse an `OnDemandThroughput` block. Absent fields default to `-1`,
606/// which is AWS's sentinel for "no cap" — and the value real AWS echoes
607/// back on DescribeTable when the caller omitted either axis.
608pub(super) fn parse_on_demand_throughput(val: &Value) -> Option<crate::state::OnDemandThroughput> {
609    if !val.is_object() {
610        return None;
611    }
612    Some(crate::state::OnDemandThroughput {
613        max_read_request_units: val["MaxReadRequestUnits"].as_i64().unwrap_or(-1),
614        max_write_request_units: val["MaxWriteRequestUnits"].as_i64().unwrap_or(-1),
615    })
616}
617
618/// Resolve the provisioned-throughput slot for a GSI on a CreateTable or
619/// UpdateTable Create action. Real DynamoDB returns `{0, 0}` for GSIs on
620/// PAY_PER_REQUEST tables regardless of whether the caller sent a
621/// `ProvisionedThroughput` block, and the Terraform provider's `flatten`
622/// code keys `name`/`read_capacity`/`write_capacity` off the presence of
623/// that field — returning `None` would desynchronise state.
624fn parse_gsi_throughput(val: &Value, billing_mode: &str) -> ProvisionedThroughput {
625    if billing_mode == "PAY_PER_REQUEST" {
626        return ProvisionedThroughput {
627            read_capacity_units: 0,
628            write_capacity_units: 0,
629        };
630    }
631    ProvisionedThroughput {
632        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
633        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
634    }
635}
636
637fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
638    let Some(arr) = val.as_array() else {
639        return Vec::new();
640    };
641    arr.iter()
642        .filter_map(|l| {
643            Some(LocalSecondaryIndex {
644                index_name: l["IndexName"].as_str()?.to_string(),
645                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
646                projection: parse_projection(&l["Projection"]),
647            })
648        })
649        .collect()
650}
651
652pub(super) fn parse_projection(val: &Value) -> Projection {
653    Projection {
654        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
655        non_key_attributes: val["NonKeyAttributes"]
656            .as_array()
657            .map(|arr| {
658                arr.iter()
659                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
660                    .collect()
661            })
662            .unwrap_or_default(),
663    }
664}
665
666fn parse_tags(val: &Value) -> HashMap<String, String> {
667    let mut tags = HashMap::new();
668    if let Some(arr) = val.as_array() {
669        for tag in arr {
670            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
671                tags.insert(k.to_string(), v.to_string());
672            }
673        }
674    }
675    tags
676}
677
678fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
679    let mut names = HashMap::new();
680    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
681        for (k, v) in obj {
682            if let Some(s) = v.as_str() {
683                names.insert(k.clone(), s.to_string());
684            }
685        }
686    }
687    names
688}
689
690fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
691    let mut values = HashMap::new();
692    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
693        for (k, v) in obj {
694            values.insert(k.clone(), v.clone());
695        }
696    }
697    values
698}
699
700fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
701    if name.starts_with('#') {
702        expr_attr_names
703            .get(name)
704            .cloned()
705            .unwrap_or_else(|| name.to_string())
706    } else {
707        name.to_string()
708    }
709}
710
711/// Resolve a (possibly dotted, possibly `#name`-containing) document path to
712/// the leaf `AttributeValue` inside `item`. Single-segment paths (`foo`,
713/// `#foo`) resolve to a top-level attribute. Dotted paths (`profile.email`,
714/// `#p.#e`, `items[0].sku`) walk into `M`/`L` containers. Returns `None` if
715/// any segment is missing or the intermediate value isn't a map/list.
716fn resolve_path(
717    path: &str,
718    item: &HashMap<String, AttributeValue>,
719    expr_attr_names: &HashMap<String, String>,
720) -> Option<Value> {
721    // Fast path: a single-segment expression (no `.` and no `[` in the raw
722    // input) refers to a top-level attribute by its literal name, even if the
723    // resolved alias contains a `.`. Without this, `#sw` -> `Safety.Warning`
724    // would be misread as the nested path `Safety` -> `Warning`.
725    if !path.contains('.') && !path.contains('[') {
726        return item.get(&resolve_attr_name(path, expr_attr_names)).cloned();
727    }
728    let resolved = resolve_projection_path(path, expr_attr_names);
729    resolve_nested_path(item, &resolved)
730}
731
732fn extract_key(
733    table: &DynamoTable,
734    item: &HashMap<String, AttributeValue>,
735) -> HashMap<String, AttributeValue> {
736    let mut key = HashMap::new();
737    let hash_key = table.hash_key_name();
738    if let Some(v) = item.get(hash_key) {
739        key.insert(hash_key.to_string(), v.clone());
740    }
741    if let Some(range_key) = table.range_key_name() {
742        if let Some(v) = item.get(range_key) {
743            key.insert(range_key.to_string(), v.clone());
744        }
745    }
746    key
747}
748
749/// Parse a JSON object into a key map (used for ExclusiveStartKey).
750fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
751    let obj = value.as_object()?;
752    if obj.is_empty() {
753        return None;
754    }
755    Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
756}
757
758/// Check whether an item's key attributes match the given key map.
759fn item_matches_key(
760    item: &HashMap<String, AttributeValue>,
761    key: &HashMap<String, AttributeValue>,
762    hash_key_name: &str,
763    range_key_name: Option<&str>,
764) -> bool {
765    let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
766        (Some(a), Some(b)) => a == b,
767        _ => false,
768    };
769    if !hash_match {
770        return false;
771    }
772    match range_key_name {
773        Some(rk) => match (item.get(rk), key.get(rk)) {
774            (Some(a), Some(b)) => a == b,
775            (None, None) => true,
776            _ => false,
777        },
778        None => true,
779    }
780}
781
782/// Extract the primary key from an item given explicit key attribute names.
783fn extract_key_for_schema(
784    item: &HashMap<String, AttributeValue>,
785    hash_key_name: &str,
786    range_key_name: Option<&str>,
787) -> HashMap<String, AttributeValue> {
788    let mut key = HashMap::new();
789    if let Some(v) = item.get(hash_key_name) {
790        key.insert(hash_key_name.to_string(), v.clone());
791    }
792    if let Some(rk) = range_key_name {
793        if let Some(v) = item.get(rk) {
794            key.insert(rk.to_string(), v.clone());
795        }
796    }
797    key
798}
799
800fn validate_key_in_item(
801    table: &DynamoTable,
802    item: &HashMap<String, AttributeValue>,
803) -> Result<(), AwsServiceError> {
804    let hash_key = table.hash_key_name();
805    if !item.contains_key(hash_key) {
806        return Err(AwsServiceError::aws_error(
807            StatusCode::BAD_REQUEST,
808            "ValidationException",
809            format!("Missing the key {hash_key} in the item"),
810        ));
811    }
812    if let Some(range_key) = table.range_key_name() {
813        if !item.contains_key(range_key) {
814            return Err(AwsServiceError::aws_error(
815                StatusCode::BAD_REQUEST,
816                "ValidationException",
817                format!("Missing the key {range_key} in the item"),
818            ));
819        }
820    }
821    Ok(())
822}
823
824fn validate_key_attributes_in_key(
825    table: &DynamoTable,
826    key: &HashMap<String, AttributeValue>,
827) -> Result<(), AwsServiceError> {
828    let hash_key = table.hash_key_name();
829    if !key.contains_key(hash_key) {
830        return Err(AwsServiceError::aws_error(
831            StatusCode::BAD_REQUEST,
832            "ValidationException",
833            format!("Missing the key {hash_key} in the item"),
834        ));
835    }
836    Ok(())
837}
838
839fn project_item(
840    item: &HashMap<String, AttributeValue>,
841    body: &Value,
842) -> HashMap<String, AttributeValue> {
843    let projection = body["ProjectionExpression"].as_str();
844    match projection {
845        Some(proj) if !proj.is_empty() => {
846            let expr_attr_names = parse_expression_attribute_names(body);
847            let mut result = HashMap::new();
848            for raw in proj.split(',') {
849                let raw = raw.trim();
850                // Single-segment: treat as literal top-level attribute even if
851                // the alias resolves to a name containing `.` (e.g. `#sw` ->
852                // `Safety.Warning`).
853                if !raw.contains('.') && !raw.contains('[') {
854                    let key = resolve_attr_name(raw, &expr_attr_names);
855                    if let Some(v) = item.get(&key) {
856                        result.insert(key, v.clone());
857                    }
858                } else {
859                    let resolved = resolve_projection_path(raw, &expr_attr_names);
860                    if let Some(v) = resolve_nested_path(item, &resolved) {
861                        insert_nested_value(&mut result, &resolved, v);
862                    }
863                }
864            }
865            result
866        }
867        _ => item.clone(),
868    }
869}
870
871/// Resolve expression attribute names within each segment of a projection path.
872/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
873fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
874    // Split on dots, resolve each part, rejoin
875    let mut result = String::new();
876    for (i, segment) in path.split('.').enumerate() {
877        if i > 0 {
878            result.push('.');
879        }
880        // A segment might be like "#n" or "people[0]" or "#attr[0]"
881        if let Some(bracket_pos) = segment.find('[') {
882            let key_part = &segment[..bracket_pos];
883            let index_part = &segment[bracket_pos..];
884            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
885            result.push_str(index_part);
886        } else {
887            result.push_str(&resolve_attr_name(segment, expr_attr_names));
888        }
889    }
890    result
891}
892
893/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
894fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
895    let segments = parse_path_segments(path);
896    if segments.is_empty() {
897        return None;
898    }
899
900    let first = &segments[0];
901    let top_key = match first {
902        PathSegment::Key(k) => k.as_str(),
903        _ => return None,
904    };
905
906    let mut current = item.get(top_key)?.clone();
907
908    for segment in &segments[1..] {
909        match segment {
910            PathSegment::Key(k) => {
911                // Navigate into a Map: {"M": {"key": ...}}
912                current = current.get("M")?.get(k)?.clone();
913            }
914            PathSegment::Index(idx) => {
915                // Navigate into a List: {"L": [...]}
916                current = current.get("L")?.get(*idx)?.clone();
917            }
918        }
919    }
920
921    Some(current)
922}
923
924#[derive(Debug)]
925enum PathSegment {
926    Key(String),
927    Index(usize),
928}
929
930/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
931fn parse_path_segments(path: &str) -> Vec<PathSegment> {
932    let mut segments = Vec::new();
933    let mut current = String::new();
934
935    let chars: Vec<char> = path.chars().collect();
936    let mut i = 0;
937    while i < chars.len() {
938        match chars[i] {
939            '.' => {
940                if !current.is_empty() {
941                    segments.push(PathSegment::Key(current.clone()));
942                    current.clear();
943                }
944            }
945            '[' => {
946                if !current.is_empty() {
947                    segments.push(PathSegment::Key(current.clone()));
948                    current.clear();
949                }
950                i += 1;
951                let mut num = String::new();
952                while i < chars.len() && chars[i] != ']' {
953                    num.push(chars[i]);
954                    i += 1;
955                }
956                if let Ok(idx) = num.parse::<usize>() {
957                    segments.push(PathSegment::Index(idx));
958                }
959                // skip ']'
960            }
961            c => {
962                current.push(c);
963            }
964        }
965        i += 1;
966    }
967    if !current.is_empty() {
968        segments.push(PathSegment::Key(current));
969    }
970    segments
971}
972
973/// Insert a value at a nested path in the result HashMap.
974/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
975fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
976    // Simple case: no nesting
977    if !path.contains('.') && !path.contains('[') {
978        result.insert(path.to_string(), value);
979        return;
980    }
981
982    let segments = parse_path_segments(path);
983    if segments.is_empty() {
984        return;
985    }
986
987    let top_key = match &segments[0] {
988        PathSegment::Key(k) => k.clone(),
989        _ => return,
990    };
991
992    if segments.len() == 1 {
993        result.insert(top_key, value);
994        return;
995    }
996
997    // For nested paths, wrap the value back into the nested structure
998    let wrapped = wrap_value_in_path(&segments[1..], value);
999    // Merge into existing value if present
1000    let existing = result.remove(&top_key);
1001    let merged = match existing {
1002        Some(existing) => merge_attribute_values(existing, wrapped),
1003        None => wrapped,
1004    };
1005    result.insert(top_key, merged);
1006}
1007
1008/// Wrap a value in the nested path structure.
1009fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
1010    if segments.is_empty() {
1011        return value;
1012    }
1013    let inner = wrap_value_in_path(&segments[1..], value);
1014    match &segments[0] {
1015        PathSegment::Key(k) => {
1016            json!({"M": {k.clone(): inner}})
1017        }
1018        PathSegment::Index(idx) => {
1019            let mut arr = vec![Value::Null; idx + 1];
1020            arr[*idx] = inner;
1021            json!({"L": arr})
1022        }
1023    }
1024}
1025
1026/// Merge two attribute values (for overlapping projections).
1027fn merge_attribute_values(a: Value, b: Value) -> Value {
1028    if let (Some(a_map), Some(b_map)) = (
1029        a.get("M").and_then(|v| v.as_object()),
1030        b.get("M").and_then(|v| v.as_object()),
1031    ) {
1032        let mut merged = a_map.clone();
1033        for (k, v) in b_map {
1034            if let Some(existing) = merged.get(k) {
1035                merged.insert(
1036                    k.clone(),
1037                    merge_attribute_values(existing.clone(), v.clone()),
1038                );
1039            } else {
1040                merged.insert(k.clone(), v.clone());
1041            }
1042        }
1043        json!({"M": merged})
1044    } else {
1045        b
1046    }
1047}
1048
1049fn evaluate_condition(
1050    condition: &str,
1051    existing: Option<&HashMap<String, AttributeValue>>,
1052    expr_attr_names: &HashMap<String, String>,
1053    expr_attr_values: &HashMap<String, Value>,
1054) -> Result<(), AwsServiceError> {
1055    // ConditionExpression and FilterExpression share the same DynamoDB grammar,
1056    // so we delegate to evaluate_filter_expression. An empty map models "item
1057    // doesn't exist" correctly: attribute_exists → false, attribute_not_exists
1058    // → true, comparisons against missing attributes → None vs Some(val).
1059    let empty = HashMap::new();
1060    let item = existing.unwrap_or(&empty);
1061    if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
1062        Ok(())
1063    } else {
1064        Err(AwsServiceError::aws_error(
1065            StatusCode::BAD_REQUEST,
1066            "ConditionalCheckFailedException",
1067            "The conditional request failed",
1068        ))
1069    }
1070}
1071
1072fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
1073    // aws-sdk-go v2's expression builder emits function calls with a space
1074    // between the name and the opening paren (`attribute_exists (#0)`),
1075    // while hand-written expressions usually don't — accept both.
1076    let with_paren = format!("{func_name}(");
1077    let with_space = format!("{func_name} (");
1078    let rest = expr
1079        .strip_prefix(&with_paren)
1080        .or_else(|| expr.strip_prefix(&with_space))?;
1081    let inner = rest.strip_suffix(')')?;
1082    Some(inner.trim())
1083}
1084
1085fn evaluate_key_condition(
1086    expr: &str,
1087    item: &HashMap<String, AttributeValue>,
1088    expr_attr_names: &HashMap<String, String>,
1089    expr_attr_values: &HashMap<String, Value>,
1090) -> bool {
1091    let trimmed = expr.trim();
1092
1093    let parts = split_on_and(trimmed);
1094    if parts.len() > 1 {
1095        return parts.iter().all(|part| {
1096            evaluate_key_condition(part.trim(), item, expr_attr_names, expr_attr_values)
1097        });
1098    }
1099
1100    let stripped = strip_outer_parens(trimmed);
1101    if stripped != trimmed {
1102        return evaluate_key_condition(stripped, item, expr_attr_names, expr_attr_values);
1103    }
1104
1105    evaluate_single_key_condition(trimmed, item, expr_attr_names, expr_attr_values)
1106}
1107
1108/// Split a DynamoDB condition expression on a top-level keyword (``AND`` /
1109/// ``OR``), case-insensitive, with ASCII-whitespace word boundaries so
1110/// ``:s\tAND\t:o`` and ``:s\nAND\n:o`` split the same as ``:s AND :o``.
1111///
1112/// Parenthesised groups are skipped so only unparenthesised occurrences of the
1113/// keyword act as separators. When splitting on ``AND``, each top-level
1114/// ``BETWEEN`` keyword consumes the next top-level ``AND`` as its own inner
1115/// separator (``x BETWEEN :lo AND :hi``) rather than letting it split the
1116/// expression.
1117fn split_on_top_level_keyword<'a>(expr: &'a str, keyword: &str) -> Vec<&'a str> {
1118    let bytes = expr.as_bytes();
1119    let len = bytes.len();
1120    let kw = keyword.as_bytes();
1121    let is_and = keyword.eq_ignore_ascii_case("AND");
1122
1123    let mut parts: Vec<&str> = Vec::new();
1124    let mut start = 0usize;
1125    let mut depth: i32 = 0;
1126    let mut between_skip: u32 = 0;
1127    let mut i = 0usize;
1128
1129    while i < len {
1130        let ch = bytes[i];
1131        if ch == b'(' {
1132            depth += 1;
1133            i += 1;
1134            continue;
1135        }
1136        if ch == b')' {
1137            if depth > 0 {
1138                depth -= 1;
1139            }
1140            i += 1;
1141            continue;
1142        }
1143        if depth == 0 {
1144            if is_and {
1145                if let Some(end) = match_keyword(bytes, i, b"BETWEEN") {
1146                    between_skip = between_skip.saturating_add(1);
1147                    i = end;
1148                    continue;
1149                }
1150            }
1151            if let Some(end) = match_keyword(bytes, i, kw) {
1152                if is_and && between_skip > 0 {
1153                    between_skip -= 1;
1154                    i = end;
1155                    continue;
1156                }
1157                parts.push(&expr[start..i]);
1158                start = end;
1159                i = end;
1160                continue;
1161            }
1162        }
1163        i += 1;
1164    }
1165    parts.push(&expr[start..]);
1166    parts
1167}
1168
1169/// Case-insensitive keyword match. For alphanumeric keywords (``AND``,
1170/// ``OR``, ``BETWEEN``) the match also requires ASCII-whitespace word
1171/// boundaries so substrings of identifiers are not mistaken for keywords.
1172/// Punctuation keywords (``,``) match literally.
1173fn match_keyword(bytes: &[u8], i: usize, keyword: &[u8]) -> Option<usize> {
1174    let end = i + keyword.len();
1175    if end > bytes.len() {
1176        return None;
1177    }
1178    for k in 0..keyword.len() {
1179        if !bytes[i + k].eq_ignore_ascii_case(&keyword[k]) {
1180            return None;
1181        }
1182    }
1183    let needs_word_boundary = keyword.iter().all(|b| b.is_ascii_alphanumeric());
1184    if needs_word_boundary {
1185        let left_ok = i == 0 || bytes[i - 1].is_ascii_whitespace();
1186        if !left_ok {
1187            return None;
1188        }
1189        let right_ok = end == bytes.len() || bytes[end].is_ascii_whitespace();
1190        if !right_ok {
1191            return None;
1192        }
1193    }
1194    Some(end)
1195}
1196
1197fn split_on_and(expr: &str) -> Vec<&str> {
1198    split_on_top_level_keyword(expr, "AND")
1199}
1200
1201fn split_on_or(expr: &str) -> Vec<&str> {
1202    split_on_top_level_keyword(expr, "OR")
1203}
1204
1205fn evaluate_single_key_condition(
1206    part: &str,
1207    item: &HashMap<String, AttributeValue>,
1208    expr_attr_names: &HashMap<String, String>,
1209    expr_attr_values: &HashMap<String, Value>,
1210) -> bool {
1211    let part = part.trim();
1212
1213    if let Some(rest) = part
1214        .strip_prefix("begins_with(")
1215        .or_else(|| part.strip_prefix("begins_with ("))
1216    {
1217        return key_cond_begins_with(rest, item, expr_attr_names, expr_attr_values);
1218    }
1219
1220    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
1221        return key_cond_between(part, between_pos, item, expr_attr_names, expr_attr_values);
1222    }
1223
1224    key_cond_simple_comparison(part, item, expr_attr_names, expr_attr_values)
1225}
1226
1227/// `begins_with(attr, :val)` — KeyCondition variant: supports only
1228/// S-typed attributes (mirrors AWS's behavior of returning false for
1229/// type mismatches). The filter-expression evaluator has its own
1230/// `eval_begins_with` because it operates on filter-grammar inputs.
1231fn key_cond_begins_with(
1232    rest: &str,
1233    item: &HashMap<String, AttributeValue>,
1234    expr_attr_names: &HashMap<String, String>,
1235    expr_attr_values: &HashMap<String, Value>,
1236) -> bool {
1237    let Some(inner) = rest.strip_suffix(')') else {
1238        return false;
1239    };
1240    let mut split = inner.splitn(2, ',');
1241    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1242        return false;
1243    };
1244    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1245    let expected = expr_attr_values.get(val_ref.trim());
1246    let actual = item.get(&attr_name);
1247    match (actual, expected) {
1248        (Some(a), Some(e)) => {
1249            let a_str = a.get("S").and_then(|v| v.as_str());
1250            let e_str = e.get("S").and_then(|v| v.as_str());
1251            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
1252        }
1253        _ => false,
1254    }
1255}
1256
1257/// `attr BETWEEN :lo AND :hi` — inclusive range comparison via the
1258/// shared `compare_attribute_values` ordering.
1259fn key_cond_between(
1260    part: &str,
1261    between_pos: usize,
1262    item: &HashMap<String, AttributeValue>,
1263    expr_attr_names: &HashMap<String, String>,
1264    expr_attr_values: &HashMap<String, Value>,
1265) -> bool {
1266    let attr_part = part[..between_pos].trim();
1267    let attr_name = resolve_attr_name(attr_part, expr_attr_names);
1268    let range_part = &part[between_pos + 7..];
1269    let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") else {
1270        return false;
1271    };
1272    let lo_ref = range_part[..and_pos].trim();
1273    let hi_ref = range_part[and_pos + 5..].trim();
1274    let lo = expr_attr_values.get(lo_ref);
1275    let hi = expr_attr_values.get(hi_ref);
1276    let actual = item.get(&attr_name);
1277    match (actual, lo, hi) {
1278        (Some(a), Some(l), Some(h)) => {
1279            compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
1280                && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
1281        }
1282        _ => false,
1283    }
1284}
1285
1286/// `attr <op> :val` — six operators (`=`, `<>`, `<`, `>`, `<=`, `>=`).
1287/// Multi-character operators come first in the search list so that `<=`
1288/// is not mistakenly matched as `<`.
1289fn key_cond_simple_comparison(
1290    part: &str,
1291    item: &HashMap<String, AttributeValue>,
1292    expr_attr_names: &HashMap<String, String>,
1293    expr_attr_values: &HashMap<String, Value>,
1294) -> bool {
1295    for op in &["<=", ">=", "<>", "=", "<", ">"] {
1296        let Some(pos) = part.find(op) else {
1297            continue;
1298        };
1299        let left = part[..pos].trim();
1300        let right = part[pos + op.len()..].trim();
1301        let actual_owned = resolve_path(left, item, expr_attr_names);
1302        let actual = actual_owned.as_ref();
1303        let expected = expr_attr_values.get(right);
1304
1305        return match *op {
1306            "=" => actual == expected,
1307            "<>" => actual != expected,
1308            "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
1309            ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
1310            "<=" => {
1311                let cmp = compare_attribute_values(actual, expected);
1312                cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
1313            }
1314            ">=" => {
1315                let cmp = compare_attribute_values(actual, expected);
1316                cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
1317            }
1318            _ => false,
1319        };
1320    }
1321    false
1322}
1323
1324/// Returns the "size" of a DynamoDB attribute value per AWS docs:
1325/// - S → character count
1326/// - B → decoded byte count
1327/// - SS/NS/BS → element count
1328/// - L → element count
1329/// - M → element count
1330///
1331/// `size()` is not valid on N, BOOL, or NULL per AWS; returns None for those so
1332/// the enclosing comparison evaluates to false (matching AWS's behavior of
1333/// silently filtering type-mismatched rows in FilterExpression context).
1334fn attribute_size(val: &Value) -> Option<usize> {
1335    if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
1336        return Some(s.len());
1337    }
1338    if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
1339        // B is base64-encoded — return decoded byte count
1340        let decoded_len = base64::engine::general_purpose::STANDARD
1341            .decode(b)
1342            .map(|v| v.len())
1343            .unwrap_or(b.len());
1344        return Some(decoded_len);
1345    }
1346    if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
1347        return Some(arr.len());
1348    }
1349    if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
1350        return Some(arr.len());
1351    }
1352    if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
1353        return Some(arr.len());
1354    }
1355    if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
1356        return Some(arr.len());
1357    }
1358    if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
1359        return Some(obj.len());
1360    }
1361    None
1362}
1363
1364/// Evaluate a `size(path) op :val` comparison expression.
1365fn evaluate_size_comparison(
1366    part: &str,
1367    item: &HashMap<String, AttributeValue>,
1368    expr_attr_names: &HashMap<String, String>,
1369    expr_attr_values: &HashMap<String, Value>,
1370) -> Option<bool> {
1371    // Find the closing paren of size(...)
1372    let open = part.find('(')?;
1373    let close = part[open..].find(')')? + open;
1374    let path = part[open + 1..close].trim();
1375    let remainder = part[close + 1..].trim();
1376
1377    // Parse operator and value ref
1378    let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
1379        ("<=", rest.trim())
1380    } else if let Some(rest) = remainder.strip_prefix(">=") {
1381        (">=", rest.trim())
1382    } else if let Some(rest) = remainder.strip_prefix("<>") {
1383        ("<>", rest.trim())
1384    } else if let Some(rest) = remainder.strip_prefix('<') {
1385        ("<", rest.trim())
1386    } else if let Some(rest) = remainder.strip_prefix('>') {
1387        (">", rest.trim())
1388    } else if let Some(rest) = remainder.strip_prefix('=') {
1389        ("=", rest.trim())
1390    } else {
1391        return None;
1392    };
1393
1394    let actual_owned = resolve_path(path, item, expr_attr_names)?;
1395    let size = attribute_size(&actual_owned)? as f64;
1396
1397    let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
1398
1399    Some(match op {
1400        "=" => (size - expected).abs() < f64::EPSILON,
1401        "<>" => (size - expected).abs() >= f64::EPSILON,
1402        "<" => size < expected,
1403        ">" => size > expected,
1404        "<=" => size <= expected,
1405        ">=" => size >= expected,
1406        _ => false,
1407    })
1408}
1409
1410fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
1411    match (a, b) {
1412        (None, None) => std::cmp::Ordering::Equal,
1413        (None, Some(_)) => std::cmp::Ordering::Less,
1414        (Some(_), None) => std::cmp::Ordering::Greater,
1415        (Some(a), Some(b)) => {
1416            let a_type = attribute_type_and_value(a);
1417            let b_type = attribute_type_and_value(b);
1418            match (a_type, b_type) {
1419                (Some(("S", a_val)), Some(("S", b_val))) => {
1420                    let a_str = a_val.as_str().unwrap_or("");
1421                    let b_str = b_val.as_str().unwrap_or("");
1422                    a_str.cmp(b_str)
1423                }
1424                (Some(("N", a_val)), Some(("N", b_val))) => {
1425                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1426                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1427                    a_num
1428                        .partial_cmp(&b_num)
1429                        .unwrap_or(std::cmp::Ordering::Equal)
1430                }
1431                (Some(("B", a_val)), Some(("B", b_val))) => {
1432                    let a_str = a_val.as_str().unwrap_or("");
1433                    let b_str = b_val.as_str().unwrap_or("");
1434                    a_str.cmp(b_str)
1435                }
1436                _ => std::cmp::Ordering::Equal,
1437            }
1438        }
1439    }
1440}
1441
1442fn evaluate_filter_expression(
1443    expr: &str,
1444    item: &HashMap<String, AttributeValue>,
1445    expr_attr_names: &HashMap<String, String>,
1446    expr_attr_values: &HashMap<String, Value>,
1447) -> bool {
1448    let trimmed = expr.trim();
1449
1450    // Split on OR first (lower precedence), respecting parentheses
1451    let or_parts = split_on_or(trimmed);
1452    if or_parts.len() > 1 {
1453        return or_parts.iter().any(|part| {
1454            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1455        });
1456    }
1457
1458    // Then split on AND (higher precedence), respecting parentheses
1459    let and_parts = split_on_and(trimmed);
1460    if and_parts.len() > 1 {
1461        return and_parts.iter().all(|part| {
1462            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1463        });
1464    }
1465
1466    // Strip outer parentheses if present
1467    let stripped = strip_outer_parens(trimmed);
1468    if stripped != trimmed {
1469        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
1470    }
1471
1472    // Handle NOT prefix (case-insensitive)
1473    if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
1474        return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
1475    }
1476
1477    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
1478}
1479
1480/// Strip matching outer parentheses from an expression.
1481fn strip_outer_parens(expr: &str) -> &str {
1482    let trimmed = expr.trim();
1483    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
1484        return trimmed;
1485    }
1486    // Verify the outer parens actually match each other
1487    let inner = &trimmed[1..trimmed.len() - 1];
1488    let mut depth = 0;
1489    for ch in inner.bytes() {
1490        match ch {
1491            b'(' => depth += 1,
1492            b')' => {
1493                if depth == 0 {
1494                    return trimmed; // closing paren matches something inside, not the outer one
1495                }
1496                depth -= 1;
1497            }
1498            _ => {}
1499        }
1500    }
1501    if depth == 0 {
1502        inner
1503    } else {
1504        trimmed
1505    }
1506}
1507
1508fn evaluate_single_filter_condition(
1509    part: &str,
1510    item: &HashMap<String, AttributeValue>,
1511    expr_attr_names: &HashMap<String, String>,
1512    expr_attr_values: &HashMap<String, Value>,
1513) -> bool {
1514    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
1515        return resolve_path(inner, item, expr_attr_names).is_some();
1516    }
1517
1518    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
1519        return resolve_path(inner, item, expr_attr_names).is_none();
1520    }
1521
1522    if let Some(rest) = part
1523        .strip_prefix("begins_with(")
1524        .or_else(|| part.strip_prefix("begins_with ("))
1525    {
1526        return eval_begins_with(rest, item, expr_attr_names, expr_attr_values);
1527    }
1528
1529    if let Some(rest) = part
1530        .strip_prefix("contains(")
1531        .or_else(|| part.strip_prefix("contains ("))
1532    {
1533        return eval_contains(rest, item, expr_attr_names, expr_attr_values);
1534    }
1535
1536    if part.starts_with("size(") || part.starts_with("size (") {
1537        if let Some(result) =
1538            evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
1539        {
1540            return result;
1541        }
1542    }
1543
1544    if let Some(rest) = part
1545        .strip_prefix("attribute_type(")
1546        .or_else(|| part.strip_prefix("attribute_type ("))
1547    {
1548        return eval_attribute_type(rest, item, expr_attr_names, expr_attr_values);
1549    }
1550
1551    if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
1552        let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
1553        let actual = item.get(&attr_name);
1554        return evaluate_in_match(actual, &value_refs, expr_attr_values);
1555    }
1556
1557    evaluate_single_key_condition(part, item, expr_attr_names, expr_attr_values)
1558}
1559
1560/// `begins_with(path, :val)` — only S (string) operands. Returns false on
1561/// any parse failure or type mismatch (this is the same shape DynamoDB
1562/// returns: a malformed predicate is silently false rather than an error).
1563fn eval_begins_with(
1564    rest: &str,
1565    item: &HashMap<String, AttributeValue>,
1566    expr_attr_names: &HashMap<String, String>,
1567    expr_attr_values: &HashMap<String, Value>,
1568) -> bool {
1569    let Some(inner) = rest.strip_suffix(')') else {
1570        return false;
1571    };
1572    let mut split = inner.splitn(2, ',');
1573    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1574        return false;
1575    };
1576    let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
1577    let expected = expr_attr_values.get(val_ref.trim());
1578    match (actual.as_ref(), expected) {
1579        (Some(a), Some(e)) => {
1580            let a_str = a.get("S").and_then(|v| v.as_str());
1581            let e_str = e.get("S").and_then(|v| v.as_str());
1582            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
1583        }
1584        _ => false,
1585    }
1586}
1587
1588/// `contains(path, :val)` — substring check on S, set membership on
1589/// SS/NS/BS, and element membership on L. Other type pairings return
1590/// false.
1591fn eval_contains(
1592    rest: &str,
1593    item: &HashMap<String, AttributeValue>,
1594    expr_attr_names: &HashMap<String, String>,
1595    expr_attr_values: &HashMap<String, Value>,
1596) -> bool {
1597    let Some(inner) = rest.strip_suffix(')') else {
1598        return false;
1599    };
1600    let mut split = inner.splitn(2, ',');
1601    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1602        return false;
1603    };
1604    let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
1605    let expected = expr_attr_values.get(val_ref.trim());
1606    let (Some(a), Some(e)) = (actual.as_ref(), expected) else {
1607        return false;
1608    };
1609
1610    if let (Some(a_s), Some(e_s)) = (
1611        a.get("S").and_then(|v| v.as_str()),
1612        e.get("S").and_then(|v| v.as_str()),
1613    ) {
1614        return a_s.contains(e_s);
1615    }
1616    if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
1617        if let Some(val) = e.get("S") {
1618            return set.contains(val);
1619        }
1620    }
1621    if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
1622        if let Some(val) = e.get("N") {
1623            return set.contains(val);
1624        }
1625    }
1626    if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
1627        if let Some(val) = e.get("B") {
1628            return set.contains(val);
1629        }
1630    }
1631    if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
1632        return list.contains(e);
1633    }
1634    false
1635}
1636
1637/// `attribute_type(path, :type)` — checks whether the attribute at `path`
1638/// is stored under the wire type identified by `:type` (one of the
1639/// DynamoDB type letters S/N/B/BOOL/NULL/SS/NS/BS/L/M).
1640fn eval_attribute_type(
1641    rest: &str,
1642    item: &HashMap<String, AttributeValue>,
1643    expr_attr_names: &HashMap<String, String>,
1644    expr_attr_values: &HashMap<String, Value>,
1645) -> bool {
1646    let Some(inner) = rest.strip_suffix(')') else {
1647        return false;
1648    };
1649    let mut split = inner.splitn(2, ',');
1650    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1651        return false;
1652    };
1653    let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
1654    let expected_type = expr_attr_values
1655        .get(val_ref.trim())
1656        .and_then(|v| v.get("S"))
1657        .and_then(|v| v.as_str());
1658    let (Some(val), Some(t)) = (actual.as_ref(), expected_type) else {
1659        return false;
1660    };
1661    match t {
1662        "S" => val.get("S").is_some(),
1663        "N" => val.get("N").is_some(),
1664        "B" => val.get("B").is_some(),
1665        "BOOL" => val.get("BOOL").is_some(),
1666        "NULL" => val.get("NULL").is_some(),
1667        "SS" => val.get("SS").is_some(),
1668        "NS" => val.get("NS").is_some(),
1669        "BS" => val.get("BS").is_some(),
1670        "L" => val.get("L").is_some(),
1671        "M" => val.get("M").is_some(),
1672        _ => false,
1673    }
1674}
1675
1676/// Parse an `attr IN (:v1, :v2, ...)` expression. Mirrors the DynamoDB
1677/// ConditionExpression / FilterExpression grammar where IN takes a single
1678/// operand on the left and 1–100 comma-separated value refs inside parens
1679/// on the right. Case-insensitive; tolerates missing spaces after commas
1680/// (aws-sdk-go's `expression` builder emits ", " but hand-built expressions
1681/// often use `strings.Join(..., ",")`). Returns None for non-IN inputs so
1682/// callers can fall through to their other grammar branches.
1683fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
1684    let upper = expr.to_ascii_uppercase();
1685    let in_pos = upper.find(" IN ")?;
1686    let attr_ref = expr[..in_pos].trim();
1687    if attr_ref.is_empty() {
1688        return None;
1689    }
1690    let rest = expr[in_pos + 4..].trim_start();
1691    let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
1692    let values: Vec<&str> = inner
1693        .split(',')
1694        .map(|s| s.trim())
1695        .filter(|s| !s.is_empty())
1696        .collect();
1697    if values.is_empty() {
1698        return None;
1699    }
1700    Some((attr_ref, values))
1701}
1702
1703/// Return true iff `actual` equals any of the `value_refs` resolved through
1704/// `expr_attr_values`. A missing attribute never matches (mirrors AWS, which
1705/// evaluates `IN` against undefined attributes as false).
1706fn evaluate_in_match(
1707    actual: Option<&AttributeValue>,
1708    value_refs: &[&str],
1709    expr_attr_values: &HashMap<String, Value>,
1710) -> bool {
1711    value_refs.iter().any(|v_ref| {
1712        let expected = expr_attr_values.get(*v_ref);
1713        matches!((actual, expected), (Some(a), Some(e)) if a == e)
1714    })
1715}
1716
1717/// One of the four DynamoDB ``UpdateExpression`` action keywords.
1718#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1719enum UpdateAction {
1720    Set,
1721    Remove,
1722    Add,
1723    Delete,
1724}
1725
1726impl UpdateAction {
1727    /// All four keywords as written on the wire — these double as the search
1728    /// terms for ``parse_update_clauses``.
1729    const KEYWORDS: &'static [(&'static str, UpdateAction)] = &[
1730        ("SET", UpdateAction::Set),
1731        ("REMOVE", UpdateAction::Remove),
1732        ("ADD", UpdateAction::Add),
1733        ("DELETE", UpdateAction::Delete),
1734    ];
1735
1736    fn keyword(self) -> &'static str {
1737        match self {
1738            UpdateAction::Set => "SET",
1739            UpdateAction::Remove => "REMOVE",
1740            UpdateAction::Add => "ADD",
1741            UpdateAction::Delete => "DELETE",
1742        }
1743    }
1744}
1745
1746fn apply_update_expression(
1747    item: &mut HashMap<String, AttributeValue>,
1748    expr: &str,
1749    expr_attr_names: &HashMap<String, String>,
1750    expr_attr_values: &HashMap<String, Value>,
1751) -> Result<(), AwsServiceError> {
1752    let clauses = parse_update_clauses(expr);
1753    if clauses.is_empty() && !expr.trim().is_empty() {
1754        return Err(AwsServiceError::aws_error(
1755            StatusCode::BAD_REQUEST,
1756            "ValidationException",
1757            "Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
1758        ));
1759    }
1760    for (action, assignments) in &clauses {
1761        match action {
1762            UpdateAction::Set => {
1763                for assignment in assignments {
1764                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1765                }
1766            }
1767            UpdateAction::Remove => {
1768                for attr_ref in assignments {
1769                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1770                    item.remove(&attr);
1771                }
1772            }
1773            UpdateAction::Add => {
1774                for assignment in assignments {
1775                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1776                }
1777            }
1778            UpdateAction::Delete => {
1779                for assignment in assignments {
1780                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1781                }
1782            }
1783        }
1784    }
1785    Ok(())
1786}
1787
1788fn parse_update_clauses(expr: &str) -> Vec<(UpdateAction, Vec<String>)> {
1789    let mut clauses: Vec<(UpdateAction, Vec<String>)> = Vec::new();
1790    let upper = expr.to_ascii_uppercase();
1791    let mut positions: Vec<(usize, UpdateAction)> = Vec::new();
1792
1793    for &(kw, action) in UpdateAction::KEYWORDS {
1794        let mut search_from = 0;
1795        while let Some(pos) = upper[search_from..].find(kw) {
1796            let abs_pos = search_from + pos;
1797            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
1798            let after_pos = abs_pos + kw.len();
1799            let after_ok =
1800                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
1801            if before_ok && after_ok {
1802                positions.push((abs_pos, action));
1803            }
1804            search_from = abs_pos + kw.len();
1805        }
1806    }
1807
1808    positions.sort_by_key(|(pos, _)| *pos);
1809
1810    for (i, &(pos, action)) in positions.iter().enumerate() {
1811        let start = pos + action.keyword().len();
1812        let end = if i + 1 < positions.len() {
1813            positions[i + 1].0
1814        } else {
1815            expr.len()
1816        };
1817        let content = expr[start..end].trim();
1818        // Use a paren-aware split so that function-call arguments such as
1819        // `list_append(#a, :b)` are kept as a single assignment rather than
1820        // being torn apart at the inner comma.
1821        let assignments: Vec<String> = split_on_top_level_keyword(content, ",")
1822            .into_iter()
1823            .map(|s| s.trim().to_string())
1824            .collect();
1825        clauses.push((action, assignments));
1826    }
1827
1828    clauses
1829}
1830
1831fn apply_set_assignment(
1832    item: &mut HashMap<String, AttributeValue>,
1833    assignment: &str,
1834    expr_attr_names: &HashMap<String, String>,
1835    expr_attr_values: &HashMap<String, Value>,
1836) -> Result<(), AwsServiceError> {
1837    let Some((left, right)) = assignment.split_once('=') else {
1838        return Ok(());
1839    };
1840
1841    let left_trimmed = left.trim();
1842    let right = right.trim();
1843
1844    // One RHS evaluator used for every LHS shape so `SET a.b = a.b + :d`,
1845    // `SET a.b = list_append(a.b, :list)`, and `SET a.b = if_not_exists(a.b, :v)`
1846    // all work against nested paths, not just top-level attributes. The evaluator
1847    // returns Ok(None) when the RHS is a no-op (if_not_exists where the target
1848    // already has a value, or an unresolvable plain reference).
1849    let new_value = evaluate_set_rhs(right, item, expr_attr_names, expr_attr_values)?;
1850
1851    if is_dotted_path(left_trimmed) {
1852        // A None value is a no-op (if_not_exists skip, or unresolvable plain
1853        // ref) — matches top-level SET's silent-skip behavior for the same
1854        // shapes. Structural errors (missing parent map, non-map intermediate)
1855        // surface from assign_nested_path itself.
1856        let Some(v) = new_value else {
1857            return Ok(());
1858        };
1859        return assign_nested_path(item, left_trimmed, expr_attr_names, v);
1860    }
1861
1862    // Split off a trailing `[N]` list-index suffix so we can resolve the
1863    // attribute name ref on its own. Without this, `resolve_attr_name` sees
1864    // "#items[0]" as a whole and misses the `#items` → `items` mapping.
1865    let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
1866        Some((name, idx)) => (name, Some(idx)),
1867        None => (left_trimmed, None),
1868    };
1869    let attr = resolve_attr_name(attr_ref, expr_attr_names);
1870
1871    let Some(v) = new_value else {
1872        return Ok(());
1873    };
1874    match list_index {
1875        Some(idx) => assign_list_index(item, &attr, idx, v),
1876        None => {
1877            item.insert(attr, v);
1878            Ok(())
1879        }
1880    }
1881}
1882
1883/// Evaluate the RHS of a `SET` assignment without writing it anywhere.
1884/// Returns `Ok(Some(value))` with the computed value, `Ok(None)` for
1885/// no-op cases (if_not_exists where the target already has a value, or
1886/// an unresolvable plain reference in dotted-path context), or
1887/// `Err(ValidationException)` for type-mismatched arithmetic.
1888fn evaluate_set_rhs(
1889    right: &str,
1890    item: &HashMap<String, AttributeValue>,
1891    expr_attr_names: &HashMap<String, String>,
1892    expr_attr_values: &HashMap<String, Value>,
1893) -> Result<Option<Value>, AwsServiceError> {
1894    if let Some(rest) = right
1895        .strip_prefix("if_not_exists(")
1896        .or_else(|| right.strip_prefix("if_not_exists ("))
1897    {
1898        return Ok(evaluate_if_not_exists_rhs(
1899            rest,
1900            item,
1901            expr_attr_names,
1902            expr_attr_values,
1903        ));
1904    }
1905
1906    if let Some(rest) = right
1907        .strip_prefix("list_append(")
1908        .or_else(|| right.strip_prefix("list_append ("))
1909    {
1910        return Ok(evaluate_list_append_rhs(
1911            rest,
1912            item,
1913            expr_attr_names,
1914            expr_attr_values,
1915        ));
1916    }
1917
1918    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
1919        return evaluate_arithmetic_rhs(
1920            arith_left,
1921            arith_right,
1922            is_add,
1923            item,
1924            expr_attr_names,
1925            expr_attr_values,
1926        );
1927    }
1928
1929    Ok(resolve_ref_or_path(
1930        right,
1931        item,
1932        expr_attr_names,
1933        expr_attr_values,
1934    ))
1935}
1936
1937/// `if_not_exists(path, :val)` — evaluates to nothing when `path` already
1938/// resolves to a value, and to the default ref otherwise. `path` may be a
1939/// top-level attribute, a placeholder, or a dotted path inside an M-typed
1940/// attribute.
1941fn evaluate_if_not_exists_rhs(
1942    rest: &str,
1943    item: &HashMap<String, AttributeValue>,
1944    expr_attr_names: &HashMap<String, String>,
1945    expr_attr_values: &HashMap<String, Value>,
1946) -> Option<Value> {
1947    let inner = rest.strip_suffix(')')?;
1948    let mut split = inner.splitn(2, ',');
1949    let (check, default) = (split.next()?, split.next()?);
1950    if resolve_ref_or_path(check.trim(), item, expr_attr_names, expr_attr_values).is_some() {
1951        return None;
1952    }
1953    resolve_ref_or_path(default.trim(), item, expr_attr_names, expr_attr_values)
1954}
1955
1956/// `list_append(a, b)` — concatenate the L arrays of two list operands.
1957/// Either operand may be missing or non-list, in which case it contributes
1958/// nothing. Both operands may be value refs (`:list`) or document paths
1959/// (top-level or dotted).
1960fn evaluate_list_append_rhs(
1961    rest: &str,
1962    item: &HashMap<String, AttributeValue>,
1963    expr_attr_names: &HashMap<String, String>,
1964    expr_attr_values: &HashMap<String, Value>,
1965) -> Option<Value> {
1966    let inner = rest.strip_suffix(')')?;
1967    let mut split = inner.splitn(2, ',');
1968    let (a_ref, b_ref) = (split.next()?, split.next()?);
1969    let a_val = resolve_ref_or_path(a_ref.trim(), item, expr_attr_names, expr_attr_values);
1970    let b_val = resolve_ref_or_path(b_ref.trim(), item, expr_attr_names, expr_attr_values);
1971
1972    let mut merged = Vec::new();
1973    for v in [&a_val, &b_val].iter().copied().flatten() {
1974        if let Value::Object(obj) = v {
1975            if let Some(Value::Array(arr)) = obj.get("L") {
1976                merged.extend(arr.clone());
1977            }
1978        }
1979    }
1980    Some(json!({ "L": merged }))
1981}
1982
1983/// `<arith_left> +/- <arith_right>` — both operands must resolve to N values
1984/// (or the LHS may be missing, in which case it's treated as 0). Anything
1985/// else is rejected with the same `ValidationException` AWS returns.
1986fn evaluate_arithmetic_rhs(
1987    arith_left: &str,
1988    arith_right: &str,
1989    is_add: bool,
1990    item: &HashMap<String, AttributeValue>,
1991    expr_attr_names: &HashMap<String, String>,
1992    expr_attr_values: &HashMap<String, Value>,
1993) -> Result<Option<Value>, AwsServiceError> {
1994    let left_val = resolve_ref_or_path(arith_left.trim(), item, expr_attr_names, expr_attr_values);
1995    let right_val =
1996        resolve_ref_or_path(arith_right.trim(), item, expr_attr_names, expr_attr_values);
1997
1998    let left_num = match extract_number(&left_val) {
1999        Some(n) => n,
2000        None if left_val.is_some() => {
2001            return Err(AwsServiceError::aws_error(
2002                StatusCode::BAD_REQUEST,
2003                "ValidationException",
2004                "An operand in the update expression has an incorrect data type",
2005            ));
2006        }
2007        None => 0.0,
2008    };
2009    let right_num = extract_number(&right_val).ok_or_else(|| {
2010        AwsServiceError::aws_error(
2011            StatusCode::BAD_REQUEST,
2012            "ValidationException",
2013            "An operand in the update expression has an incorrect data type",
2014        )
2015    })?;
2016
2017    let result = if is_add {
2018        left_num + right_num
2019    } else {
2020        left_num - right_num
2021    };
2022
2023    let num_str = if result == result.trunc() {
2024        format!("{}", result as i64)
2025    } else {
2026        format!("{result}")
2027    };
2028
2029    Ok(Some(json!({ "N": num_str })))
2030}
2031
2032/// Parse a trailing `[N]` list-index suffix off the LHS of a SET assignment.
2033/// Returns the bare attribute reference and the index, or None when the LHS
2034/// is a plain attribute (or a path shape we don't yet support).
2035fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
2036    let path = path.trim();
2037    if !path.ends_with(']') {
2038        return None;
2039    }
2040    let open = path.rfind('[')?;
2041    // Require no further `.` / `[` / `]` inside the bracketed portion and no
2042    // further path segments after — we only handle the single-index case
2043    // `name[N]`, not nested shapes like `a.b[0].c`.
2044    let idx_str = &path[open + 1..path.len() - 1];
2045    let idx: usize = idx_str.parse().ok()?;
2046    let name = &path[..open];
2047    if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
2048        return None;
2049    }
2050    Some((name, idx))
2051}
2052
2053/// Assign a value to a specific index of a `L`-typed attribute. If `idx` is
2054/// within the current list, replaces that slot; if it's at the end, appends.
2055/// AWS rejects writes beyond `len`, so we return a `ValidationException` for
2056/// out-of-range indices and non-list attributes.
2057fn assign_list_index(
2058    item: &mut HashMap<String, AttributeValue>,
2059    attr: &str,
2060    idx: usize,
2061    value: Value,
2062) -> Result<(), AwsServiceError> {
2063    let Some(existing) = item.get_mut(attr) else {
2064        return Err(invalid_document_path());
2065    };
2066    let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
2067        return Err(invalid_document_path());
2068    };
2069    if idx < list.len() {
2070        list[idx] = value;
2071    } else if idx == list.len() {
2072        list.push(value);
2073    } else {
2074        return Err(invalid_document_path());
2075    }
2076    Ok(())
2077}
2078
2079fn invalid_document_path() -> AwsServiceError {
2080    AwsServiceError::aws_error(
2081        StatusCode::BAD_REQUEST,
2082        "ValidationException",
2083        "The document path provided in the update expression is invalid for update",
2084    )
2085}
2086
2087/// Resolve a SET-RHS operand that may be either a value placeholder
2088/// (``:foo``) or a document path (top-level attribute, ``#name``, or a
2089/// dotted path like ``profile.email`` / ``#web.#count``).
2090fn resolve_ref_or_path(
2091    reference: &str,
2092    item: &HashMap<String, AttributeValue>,
2093    expr_attr_names: &HashMap<String, String>,
2094    expr_attr_values: &HashMap<String, Value>,
2095) -> Option<Value> {
2096    let reference = reference.trim();
2097    if reference.starts_with(':') {
2098        return expr_attr_values.get(reference).cloned();
2099    }
2100    resolve_path(reference, item, expr_attr_names)
2101}
2102
2103/// True if `path` targets a nested key inside an M-typed attribute. Bracketed
2104/// list indices (`a[0]`, `a.b[0]`) are not supported by the nested-SET writer.
2105fn is_dotted_path(path: &str) -> bool {
2106    path.contains('.') && !path.contains('[')
2107}
2108
2109/// Write `value` at a dotted path inside an M-typed attribute.
2110///
2111/// Resolves each `#name` segment through `expr_attr_names`. The top-level
2112/// attribute and every intermediate segment must already exist as a Map —
2113/// DynamoDB rejects writes through missing parents with ValidationException.
2114fn assign_nested_path(
2115    item: &mut HashMap<String, AttributeValue>,
2116    path: &str,
2117    expr_attr_names: &HashMap<String, String>,
2118    value: Value,
2119) -> Result<(), AwsServiceError> {
2120    let mut segments: Vec<String> = path
2121        .split('.')
2122        .map(|seg| resolve_attr_name(seg.trim(), expr_attr_names))
2123        .collect();
2124    if segments.len() < 2 {
2125        return Err(invalid_document_path());
2126    }
2127
2128    let leaf = segments.pop().expect("len >= 2");
2129    let top = segments.remove(0);
2130
2131    let top_attr = item.get_mut(&top).ok_or_else(invalid_document_path)?;
2132    let mut current = top_attr
2133        .get_mut("M")
2134        .and_then(|m| m.as_object_mut())
2135        .ok_or_else(invalid_document_path)?;
2136
2137    for seg in &segments {
2138        current = current
2139            .get_mut(seg)
2140            .and_then(|v| v.get_mut("M"))
2141            .and_then(|m| m.as_object_mut())
2142            .ok_or_else(invalid_document_path)?;
2143    }
2144
2145    current.insert(leaf, value);
2146    Ok(())
2147}
2148
2149fn extract_number(val: &Option<Value>) -> Option<f64> {
2150    val.as_ref()
2151        .and_then(|v| v.get("N"))
2152        .and_then(|n| n.as_str())
2153        .and_then(|s| s.parse().ok())
2154}
2155
2156fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
2157    let mut depth = 0;
2158    for (i, c) in expr.char_indices() {
2159        match c {
2160            '(' => depth += 1,
2161            ')' => depth -= 1,
2162            '+' if depth == 0 && i > 0 => {
2163                return Some((&expr[..i], &expr[i + 1..], true));
2164            }
2165            '-' if depth == 0 && i > 0 => {
2166                return Some((&expr[..i], &expr[i + 1..], false));
2167            }
2168            _ => {}
2169        }
2170    }
2171    None
2172}
2173
2174fn apply_add_assignment(
2175    item: &mut HashMap<String, AttributeValue>,
2176    assignment: &str,
2177    expr_attr_names: &HashMap<String, String>,
2178    expr_attr_values: &HashMap<String, Value>,
2179) -> Result<(), AwsServiceError> {
2180    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
2181    if parts.len() != 2 {
2182        return Ok(());
2183    }
2184
2185    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
2186    let val_ref = parts[1].trim();
2187    let add_val = expr_attr_values.get(val_ref);
2188
2189    if let Some(add_val) = add_val {
2190        if let Some(existing) = item.get(&attr) {
2191            if let (Some(existing_num), Some(add_num)) = (
2192                extract_number(&Some(existing.clone())),
2193                extract_number(&Some(add_val.clone())),
2194            ) {
2195                let result = existing_num + add_num;
2196                let num_str = if result == result.trunc() {
2197                    format!("{}", result as i64)
2198                } else {
2199                    format!("{result}")
2200                };
2201                item.insert(attr, json!({"N": num_str}));
2202            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
2203                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
2204                    let mut merged: Vec<Value> = existing_set.clone();
2205                    for v in add_set {
2206                        if !merged.contains(v) {
2207                            merged.push(v.clone());
2208                        }
2209                    }
2210                    item.insert(attr, json!({"SS": merged}));
2211                }
2212            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
2213                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
2214                    let mut merged: Vec<Value> = existing_set.clone();
2215                    for v in add_set {
2216                        if !merged.contains(v) {
2217                            merged.push(v.clone());
2218                        }
2219                    }
2220                    item.insert(attr, json!({"NS": merged}));
2221                }
2222            } else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
2223                if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
2224                    let mut merged: Vec<Value> = existing_set.clone();
2225                    for v in add_set {
2226                        if !merged.contains(v) {
2227                            merged.push(v.clone());
2228                        }
2229                    }
2230                    item.insert(attr, json!({"BS": merged}));
2231                }
2232            }
2233        } else {
2234            item.insert(attr, add_val.clone());
2235        }
2236    }
2237
2238    Ok(())
2239}
2240
2241fn apply_delete_assignment(
2242    item: &mut HashMap<String, AttributeValue>,
2243    assignment: &str,
2244    expr_attr_names: &HashMap<String, String>,
2245    expr_attr_values: &HashMap<String, Value>,
2246) -> Result<(), AwsServiceError> {
2247    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
2248    if parts.len() != 2 {
2249        return Ok(());
2250    }
2251
2252    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
2253    let val_ref = parts[1].trim();
2254    let del_val = expr_attr_values.get(val_ref);
2255
2256    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
2257        if let (Some(existing_set), Some(del_set)) = (
2258            existing.get("SS").and_then(|v| v.as_array()),
2259            del_val.get("SS").and_then(|v| v.as_array()),
2260        ) {
2261            let filtered: Vec<Value> = existing_set
2262                .iter()
2263                .filter(|v| !del_set.contains(v))
2264                .cloned()
2265                .collect();
2266            if filtered.is_empty() {
2267                item.remove(&attr);
2268            } else {
2269                item.insert(attr, json!({"SS": filtered}));
2270            }
2271        } else if let (Some(existing_set), Some(del_set)) = (
2272            existing.get("NS").and_then(|v| v.as_array()),
2273            del_val.get("NS").and_then(|v| v.as_array()),
2274        ) {
2275            let filtered: Vec<Value> = existing_set
2276                .iter()
2277                .filter(|v| !del_set.contains(v))
2278                .cloned()
2279                .collect();
2280            if filtered.is_empty() {
2281                item.remove(&attr);
2282            } else {
2283                item.insert(attr, json!({"NS": filtered}));
2284            }
2285        } else if let (Some(existing_set), Some(del_set)) = (
2286            existing.get("BS").and_then(|v| v.as_array()),
2287            del_val.get("BS").and_then(|v| v.as_array()),
2288        ) {
2289            let filtered: Vec<Value> = existing_set
2290                .iter()
2291                .filter(|v| !del_set.contains(v))
2292                .cloned()
2293                .collect();
2294            if filtered.is_empty() {
2295                item.remove(&attr);
2296            } else {
2297                item.insert(attr, json!({"BS": filtered}));
2298            }
2299        }
2300    }
2301
2302    Ok(())
2303}
2304
2305pub(super) struct TableDescriptionInput<'a> {
2306    pub arn: &'a str,
2307    pub table_id: &'a str,
2308    pub key_schema: &'a [KeySchemaElement],
2309    pub attribute_definitions: &'a [AttributeDefinition],
2310    pub provisioned_throughput: &'a ProvisionedThroughput,
2311    pub gsi: &'a [GlobalSecondaryIndex],
2312    pub lsi: &'a [LocalSecondaryIndex],
2313    pub billing_mode: &'a str,
2314    pub created_at: chrono::DateTime<chrono::Utc>,
2315    pub item_count: i64,
2316    pub size_bytes: i64,
2317    pub status: &'a str,
2318    pub deletion_protection_enabled: bool,
2319    pub on_demand_throughput: Option<&'a crate::state::OnDemandThroughput>,
2320}
2321
2322fn build_table_description_json(input: &TableDescriptionInput<'_>) -> Value {
2323    let TableDescriptionInput {
2324        arn,
2325        table_id,
2326        key_schema,
2327        attribute_definitions,
2328        provisioned_throughput,
2329        gsi,
2330        lsi,
2331        billing_mode,
2332        created_at,
2333        item_count,
2334        size_bytes,
2335        status,
2336        deletion_protection_enabled,
2337        on_demand_throughput,
2338    } = *input;
2339    let table_name = arn.rsplit('/').next().unwrap_or("");
2340    let creation_timestamp =
2341        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
2342
2343    let ks: Vec<Value> = key_schema
2344        .iter()
2345        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2346        .collect();
2347
2348    let ad: Vec<Value> = attribute_definitions
2349        .iter()
2350        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
2351        .collect();
2352
2353    let mut desc = json!({
2354        "TableName": table_name,
2355        "TableArn": arn,
2356        "TableId": table_id,
2357        "TableStatus": status,
2358        "KeySchema": ks,
2359        "AttributeDefinitions": ad,
2360        "CreationDateTime": creation_timestamp,
2361        "ItemCount": item_count,
2362        "TableSizeBytes": size_bytes,
2363        "BillingModeSummary": { "BillingMode": billing_mode },
2364        "DeletionProtectionEnabled": deletion_protection_enabled,
2365    });
2366
2367    if billing_mode != "PAY_PER_REQUEST" {
2368        desc["ProvisionedThroughput"] = json!({
2369            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
2370            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
2371            "NumberOfDecreasesToday": 0,
2372        });
2373    } else {
2374        desc["ProvisionedThroughput"] = json!({
2375            "ReadCapacityUnits": 0,
2376            "WriteCapacityUnits": 0,
2377            "NumberOfDecreasesToday": 0,
2378        });
2379    }
2380
2381    if let Some(odt) = on_demand_throughput {
2382        desc["OnDemandThroughput"] = json!({
2383            "MaxReadRequestUnits": odt.max_read_request_units,
2384            "MaxWriteRequestUnits": odt.max_write_request_units,
2385        });
2386    }
2387
2388    // Terraform's AWS provider now waits on WarmThroughput after CreateTable.
2389    // Real AWS returns an ACTIVE warm throughput object for active tables,
2390    // including PAY_PER_REQUEST tables. Returning null keeps the provider in a
2391    // perpetual "still creating" loop.
2392    if status == "ACTIVE" {
2393        desc["WarmThroughput"] = json!({
2394            "ReadUnitsPerSecond": 0,
2395            "WriteUnitsPerSecond": 0,
2396            "Status": "ACTIVE",
2397        });
2398    }
2399
2400    if !gsi.is_empty() {
2401        let gsi_json: Vec<Value> = gsi
2402            .iter()
2403            .map(|g| {
2404                let gks: Vec<Value> = g
2405                    .key_schema
2406                    .iter()
2407                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2408                    .collect();
2409                let mut idx = json!({
2410                    "IndexName": g.index_name,
2411                    "KeySchema": gks,
2412                    "Projection": { "ProjectionType": g.projection.projection_type },
2413                    "IndexStatus": "ACTIVE",
2414                    "IndexArn": format!("{arn}/index/{}", g.index_name),
2415                    "ItemCount": 0,
2416                    "IndexSizeBytes": 0,
2417                });
2418                if !g.projection.non_key_attributes.is_empty() {
2419                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
2420                }
2421                if let Some(ref pt) = g.provisioned_throughput {
2422                    idx["ProvisionedThroughput"] = json!({
2423                        "ReadCapacityUnits": pt.read_capacity_units,
2424                        "WriteCapacityUnits": pt.write_capacity_units,
2425                        "NumberOfDecreasesToday": 0,
2426                    });
2427                }
2428                if let Some(ref odt) = g.on_demand_throughput {
2429                    idx["OnDemandThroughput"] = json!({
2430                        "MaxReadRequestUnits": odt.max_read_request_units,
2431                        "MaxWriteRequestUnits": odt.max_write_request_units,
2432                    });
2433                }
2434                idx
2435            })
2436            .collect();
2437        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
2438    }
2439
2440    if !lsi.is_empty() {
2441        let lsi_json: Vec<Value> = lsi
2442            .iter()
2443            .map(|l| {
2444                let lks: Vec<Value> = l
2445                    .key_schema
2446                    .iter()
2447                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2448                    .collect();
2449                let mut idx = json!({
2450                    "IndexName": l.index_name,
2451                    "KeySchema": lks,
2452                    "Projection": { "ProjectionType": l.projection.projection_type },
2453                    "IndexArn": format!("{arn}/index/{}", l.index_name),
2454                    "ItemCount": 0,
2455                    "IndexSizeBytes": 0,
2456                });
2457                if !l.projection.non_key_attributes.is_empty() {
2458                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
2459                }
2460                idx
2461            })
2462            .collect();
2463        desc["LocalSecondaryIndexes"] = json!(lsi_json);
2464    }
2465
2466    desc
2467}
2468
2469fn build_table_description(table: &DynamoTable) -> Value {
2470    let mut desc = build_table_description_json(&TableDescriptionInput {
2471        arn: &table.arn,
2472        table_id: &table.table_id,
2473        key_schema: &table.key_schema,
2474        attribute_definitions: &table.attribute_definitions,
2475        provisioned_throughput: &table.provisioned_throughput,
2476        gsi: &table.gsi,
2477        lsi: &table.lsi,
2478        billing_mode: &table.billing_mode,
2479        created_at: table.created_at,
2480        item_count: table.item_count,
2481        size_bytes: table.size_bytes,
2482        status: &table.status,
2483        deletion_protection_enabled: table.deletion_protection_enabled,
2484        on_demand_throughput: table.on_demand_throughput.as_ref(),
2485    });
2486
2487    // `LatestStreamArn` / `LatestStreamLabel` persist after a stream has
2488    // been created, even if streams are currently disabled — real AWS
2489    // keeps them for ~24h post-disable so DescribeTable callers can still
2490    // observe the last active stream. fakecloud keeps them for the
2491    // table's lifetime, which is sufficient for any single test run.
2492    if let Some(ref stream_arn) = table.stream_arn {
2493        desc["LatestStreamArn"] = json!(stream_arn);
2494        desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
2495    }
2496    // The `StreamSpecification` block is only present while streams are
2497    // actively enabled. When absent, the Terraform provider Read falls
2498    // through to the prior `stream_view_type` from its own state rather
2499    // than clearing it, which matches the diff behaviour the upstream
2500    // acceptance tests assert on.
2501    if table.stream_enabled {
2502        if let Some(ref view_type) = table.stream_view_type {
2503            desc["StreamSpecification"] = json!({
2504                "StreamEnabled": true,
2505                "StreamViewType": view_type,
2506            });
2507        }
2508    }
2509
2510    // SSEDescription is only returned when the customer explicitly enabled
2511    // a KMS-backed SSE. Real AWS tables using the default AWS-owned key omit
2512    // this field entirely, and the Terraform provider's Read asserts
2513    // `server_side_encryption.#` == 0 in that case.
2514    if let Some(ref sse_type) = table.sse_type {
2515        let mut sse_desc = json!({
2516            "Status": "ENABLED",
2517            "SSEType": sse_type,
2518        });
2519        if let Some(ref key_arn) = table.sse_kms_key_arn {
2520            sse_desc["KMSMasterKeyArn"] = json!(key_arn);
2521        }
2522        desc["SSEDescription"] = sse_desc;
2523    }
2524
2525    desc
2526}
2527
2528fn execute_partiql_statement(
2529    state: &SharedDynamoDbState,
2530    statement: &str,
2531    parameters: &[Value],
2532) -> Result<AwsResponse, AwsServiceError> {
2533    let trimmed = statement.trim();
2534    let upper = trimmed.to_ascii_uppercase();
2535
2536    if upper.starts_with("SELECT") {
2537        execute_partiql_select(state, trimmed, parameters)
2538    } else if upper.starts_with("INSERT") {
2539        execute_partiql_insert(state, trimmed, parameters)
2540    } else if upper.starts_with("UPDATE") {
2541        execute_partiql_update(state, trimmed, parameters)
2542    } else if upper.starts_with("DELETE") {
2543        execute_partiql_delete(state, trimmed, parameters)
2544    } else {
2545        Err(AwsServiceError::aws_error(
2546            StatusCode::BAD_REQUEST,
2547            "ValidationException",
2548            format!("Unsupported PartiQL statement: {trimmed}"),
2549        ))
2550    }
2551}
2552
2553/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
2554fn execute_partiql_select(
2555    state: &SharedDynamoDbState,
2556    statement: &str,
2557    parameters: &[Value],
2558) -> Result<AwsResponse, AwsServiceError> {
2559    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
2560    let upper = statement.to_ascii_uppercase();
2561    let from_pos = upper.find("FROM").ok_or_else(|| {
2562        AwsServiceError::aws_error(
2563            StatusCode::BAD_REQUEST,
2564            "ValidationException",
2565            "Invalid SELECT statement: missing FROM",
2566        )
2567    })?;
2568
2569    let after_from = statement[from_pos + 4..].trim();
2570    let (table_name, rest) = parse_partiql_table_name(after_from);
2571
2572    let __mas = state.read();
2573    let state = __mas.default_ref();
2574    let table = get_table(&state.tables, &table_name)?;
2575
2576    let rest_upper = rest.trim().to_ascii_uppercase();
2577    if rest_upper.starts_with("WHERE") {
2578        let where_clause = rest.trim()[5..].trim();
2579        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
2580        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
2581        DynamoDbService::ok_json(json!({ "Items": items }))
2582    } else {
2583        // No WHERE, return all items
2584        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
2585        DynamoDbService::ok_json(json!({ "Items": items }))
2586    }
2587}
2588
2589fn execute_partiql_insert(
2590    state: &SharedDynamoDbState,
2591    statement: &str,
2592    parameters: &[Value],
2593) -> Result<AwsResponse, AwsServiceError> {
2594    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
2595    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
2596    let upper = statement.to_ascii_uppercase();
2597    let into_pos = upper.find("INTO").ok_or_else(|| {
2598        AwsServiceError::aws_error(
2599            StatusCode::BAD_REQUEST,
2600            "ValidationException",
2601            "Invalid INSERT statement: missing INTO",
2602        )
2603    })?;
2604
2605    let after_into = statement[into_pos + 4..].trim();
2606    let (table_name, rest) = parse_partiql_table_name(after_into);
2607
2608    let rest_upper = rest.trim().to_ascii_uppercase();
2609    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
2610        AwsServiceError::aws_error(
2611            StatusCode::BAD_REQUEST,
2612            "ValidationException",
2613            "Invalid INSERT statement: missing VALUE",
2614        )
2615    })?;
2616
2617    let value_str = rest.trim()[value_pos + 5..].trim();
2618    let item = parse_partiql_value_object(value_str, parameters)?;
2619
2620    let mut __mas = state.write();
2621    let state = __mas.default_mut();
2622    let table = get_table_mut(&mut state.tables, &table_name)?;
2623    let key = extract_key(table, &item);
2624    if table.find_item_index(&key).is_some() {
2625        // DynamoDB PartiQL INSERT fails if item exists
2626        return Err(AwsServiceError::aws_error(
2627            StatusCode::BAD_REQUEST,
2628            "DuplicateItemException",
2629            "Duplicate primary key exists in table",
2630        ));
2631    } else {
2632        table.items.push(item);
2633    }
2634    table.recalculate_stats();
2635
2636    DynamoDbService::ok_json(json!({}))
2637}
2638
2639fn execute_partiql_update(
2640    state: &SharedDynamoDbState,
2641    statement: &str,
2642    parameters: &[Value],
2643) -> Result<AwsResponse, AwsServiceError> {
2644    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
2645    // or: UPDATE "tablename" SET attr=? WHERE pk=?
2646    let after_update = statement[6..].trim(); // skip "UPDATE"
2647    let (table_name, rest) = parse_partiql_table_name(after_update);
2648
2649    let rest_upper = rest.trim().to_ascii_uppercase();
2650    let set_pos = rest_upper.find("SET").ok_or_else(|| {
2651        AwsServiceError::aws_error(
2652            StatusCode::BAD_REQUEST,
2653            "ValidationException",
2654            "Invalid UPDATE statement: missing SET",
2655        )
2656    })?;
2657
2658    let after_set = rest.trim()[set_pos + 3..].trim();
2659
2660    // Split on WHERE
2661    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
2662    let (set_clause, where_clause) = if let Some(wp) = where_pos {
2663        (&after_set[..wp], after_set[wp + 5..].trim())
2664    } else {
2665        (after_set, "")
2666    };
2667
2668    let mut __mas = state.write();
2669    let state = __mas.default_mut();
2670    let table = get_table_mut(&mut state.tables, &table_name)?;
2671
2672    let matched_indices = if !where_clause.is_empty() {
2673        find_partiql_where_indices(table, where_clause, parameters)?
2674    } else {
2675        (0..table.items.len()).collect()
2676    };
2677
2678    // Parse SET assignments: attr=value, attr2=value2
2679    let param_offset = count_params_in_str(where_clause);
2680    let assignments: Vec<&str> = set_clause.split(',').collect();
2681    for idx in &matched_indices {
2682        let mut local_offset = param_offset;
2683        for assignment in &assignments {
2684            let assignment = assignment.trim();
2685            if let Some((attr, val_str)) = assignment.split_once('=') {
2686                let attr = attr.trim().trim_matches('"');
2687                let val_str = val_str.trim();
2688                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
2689                if let Some(v) = value {
2690                    table.items[*idx].insert(attr.to_string(), v);
2691                }
2692            }
2693        }
2694    }
2695    table.recalculate_stats();
2696
2697    DynamoDbService::ok_json(json!({}))
2698}
2699
2700fn execute_partiql_delete(
2701    state: &SharedDynamoDbState,
2702    statement: &str,
2703    parameters: &[Value],
2704) -> Result<AwsResponse, AwsServiceError> {
2705    // Pattern: DELETE FROM "tablename" WHERE pk='val'
2706    let upper = statement.to_ascii_uppercase();
2707    let from_pos = upper.find("FROM").ok_or_else(|| {
2708        AwsServiceError::aws_error(
2709            StatusCode::BAD_REQUEST,
2710            "ValidationException",
2711            "Invalid DELETE statement: missing FROM",
2712        )
2713    })?;
2714
2715    let after_from = statement[from_pos + 4..].trim();
2716    let (table_name, rest) = parse_partiql_table_name(after_from);
2717
2718    let rest_upper = rest.trim().to_ascii_uppercase();
2719    if !rest_upper.starts_with("WHERE") {
2720        return Err(AwsServiceError::aws_error(
2721            StatusCode::BAD_REQUEST,
2722            "ValidationException",
2723            "DELETE requires a WHERE clause",
2724        ));
2725    }
2726    let where_clause = rest.trim()[5..].trim();
2727
2728    let mut __mas = state.write();
2729    let state = __mas.default_mut();
2730    let table = get_table_mut(&mut state.tables, &table_name)?;
2731
2732    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
2733    // Remove from highest index first to avoid invalidating lower indices
2734    indices.sort_unstable();
2735    indices.reverse();
2736    for idx in indices {
2737        table.items.remove(idx);
2738    }
2739    table.recalculate_stats();
2740
2741    DynamoDbService::ok_json(json!({}))
2742}
2743
2744/// Parse a table name that may be quoted with double quotes.
2745/// Returns (table_name, rest_of_string).
2746fn parse_partiql_table_name(s: &str) -> (String, &str) {
2747    let s = s.trim();
2748    if let Some(stripped) = s.strip_prefix('"') {
2749        // Quoted name
2750        if let Some(end) = stripped.find('"') {
2751            let name = &stripped[..end];
2752            let rest = &stripped[end + 1..];
2753            (name.to_string(), rest)
2754        } else {
2755            let end = s.find(' ').unwrap_or(s.len());
2756            (s[..end].trim_matches('"').to_string(), &s[end..])
2757        }
2758    } else {
2759        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
2760        (s[..end].to_string(), &s[end..])
2761    }
2762}
2763
2764/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
2765/// Returns matching items.
2766fn evaluate_partiql_where<'a>(
2767    table: &'a DynamoTable,
2768    where_clause: &str,
2769    parameters: &[Value],
2770) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
2771    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
2772    Ok(indices.iter().map(|i| &table.items[*i]).collect())
2773}
2774
2775fn find_partiql_where_indices(
2776    table: &DynamoTable,
2777    where_clause: &str,
2778    parameters: &[Value],
2779) -> Result<Vec<usize>, AwsServiceError> {
2780    let conditions = split_partiql_and_clauses(where_clause);
2781    let parsed_conditions = parse_partiql_equality_conditions(&conditions, parameters);
2782
2783    let mut indices = Vec::new();
2784    for (i, item) in table.items.iter().enumerate() {
2785        let all_match = parsed_conditions
2786            .iter()
2787            .all(|(attr, expected)| item.get(attr) == Some(expected));
2788        if all_match {
2789            indices.push(i);
2790        }
2791    }
2792
2793    Ok(indices)
2794}
2795
2796/// Split a PartiQL WHERE clause on case-insensitive ` AND ` boundaries.
2797fn split_partiql_and_clauses(where_clause: &str) -> Vec<&str> {
2798    let upper = where_clause.to_uppercase();
2799    if !upper.contains(" AND ") {
2800        return vec![where_clause.trim()];
2801    }
2802    let mut parts = Vec::new();
2803    let mut last = 0;
2804    for (i, _) in upper.match_indices(" AND ") {
2805        parts.push(where_clause[last..i].trim());
2806        last = i + 5;
2807    }
2808    parts.push(where_clause[last..].trim());
2809    parts
2810}
2811
2812/// Parse each `col = literal` (or `col = ?`) condition into an
2813/// `(attribute_name, expected_AttributeValue)` pair. Conditions that
2814/// don't parse as equality, or whose RHS literal can't be resolved, are
2815/// silently dropped — that mirrors the prior inline behavior.
2816fn parse_partiql_equality_conditions(
2817    conditions: &[&str],
2818    parameters: &[Value],
2819) -> Vec<(String, Value)> {
2820    let mut param_idx = 0usize;
2821    let mut parsed = Vec::new();
2822    for cond in conditions {
2823        let cond = cond.trim();
2824        if let Some((left, right)) = cond.split_once('=') {
2825            let attr = left.trim().trim_matches('"').to_string();
2826            let val_str = right.trim();
2827            if let Some(value) = parse_partiql_literal(val_str, parameters, &mut param_idx) {
2828                parsed.push((attr, value));
2829            }
2830        }
2831    }
2832    parsed
2833}
2834
2835/// Parse a PartiQL literal value. Supports:
2836/// - 'string' -> {"S": "string"}
2837/// - 123 -> {"N": "123"}
2838/// - ? -> parameter from list
2839fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
2840    let s = s.trim();
2841    if s == "?" {
2842        let idx = *param_idx;
2843        *param_idx += 1;
2844        parameters.get(idx).cloned()
2845    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
2846        let inner = &s[1..s.len() - 1];
2847        Some(json!({"S": inner}))
2848    } else if let Ok(n) = s.parse::<f64>() {
2849        let num_str = if n == n.trunc() {
2850            format!("{}", n as i64)
2851        } else {
2852            format!("{n}")
2853        };
2854        Some(json!({"N": num_str}))
2855    } else {
2856        None
2857    }
2858}
2859
2860/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
2861fn parse_partiql_value_object(
2862    s: &str,
2863    parameters: &[Value],
2864) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
2865    let s = s.trim();
2866    let inner = s
2867        .strip_prefix('{')
2868        .and_then(|s| s.strip_suffix('}'))
2869        .ok_or_else(|| {
2870            AwsServiceError::aws_error(
2871                StatusCode::BAD_REQUEST,
2872                "ValidationException",
2873                "Invalid VALUE: expected object literal",
2874            )
2875        })?;
2876
2877    let mut item = HashMap::new();
2878    let mut param_idx = 0usize;
2879
2880    // Simple comma-separated key:value parsing
2881    for pair in split_partiql_pairs(inner) {
2882        let pair = pair.trim();
2883        if pair.is_empty() {
2884            continue;
2885        }
2886        if let Some((key_part, val_part)) = pair.split_once(':') {
2887            let key = key_part
2888                .trim()
2889                .trim_matches('\'')
2890                .trim_matches('"')
2891                .to_string();
2892            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
2893                item.insert(key, val);
2894            }
2895        }
2896    }
2897
2898    Ok(item)
2899}
2900
2901/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
2902fn split_partiql_pairs(s: &str) -> Vec<&str> {
2903    let mut parts = Vec::new();
2904    let mut start = 0;
2905    let mut depth = 0;
2906    let mut in_quote = false;
2907
2908    for (i, c) in s.char_indices() {
2909        match c {
2910            '\'' if !in_quote => in_quote = true,
2911            '\'' if in_quote => in_quote = false,
2912            '{' if !in_quote => depth += 1,
2913            '}' if !in_quote => depth -= 1,
2914            ',' if !in_quote && depth == 0 => {
2915                parts.push(&s[start..i]);
2916                start = i + 1;
2917            }
2918            _ => {}
2919        }
2920    }
2921    parts.push(&s[start..]);
2922    parts
2923}
2924
2925/// Count ? parameters in a string.
2926fn count_params_in_str(s: &str) -> usize {
2927    s.chars().filter(|c| *c == '?').count()
2928}
2929
2930#[cfg(test)]
2931mod tests {
2932    use super::*;
2933    use serde_json::json;
2934
2935    #[test]
2936    fn test_parse_update_clauses_set() {
2937        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
2938        assert_eq!(clauses.len(), 1);
2939        assert_eq!(clauses[0].0, UpdateAction::Set);
2940        assert_eq!(clauses[0].1.len(), 2);
2941    }
2942
2943    #[test]
2944    fn test_parse_update_clauses_set_and_remove() {
2945        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
2946        assert_eq!(clauses.len(), 2);
2947        assert_eq!(clauses[0].0, UpdateAction::Set);
2948        assert_eq!(clauses[1].0, UpdateAction::Remove);
2949    }
2950
2951    #[test]
2952    fn test_parse_update_clauses_list_append_single_assignment() {
2953        // Before fix: naive comma split tore list_append(#0, :0) at the
2954        // inner comma, producing two bogus assignments instead of one.
2955        let clauses = parse_update_clauses("SET #0 = list_append(#0, :0)");
2956        assert_eq!(clauses.len(), 1);
2957        assert_eq!(clauses[0].0, UpdateAction::Set);
2958        assert_eq!(
2959            clauses[0].1.len(),
2960            1,
2961            "list_append(a, b) must be kept as a single assignment, not split at the inner comma"
2962        );
2963    }
2964
2965    #[test]
2966    fn test_parse_update_clauses_list_append_mixed_with_plain_set() {
2967        // list_append assignment followed by a plain SET — the comma between
2968        // the two assignments must still split them, while the comma inside
2969        // the list_append call must not.
2970        let clauses = parse_update_clauses("SET #0 = list_append(#0, :new), #1 = :other");
2971        assert_eq!(clauses.len(), 1);
2972        assert_eq!(clauses[0].0, UpdateAction::Set);
2973        assert_eq!(
2974            clauses[0].1.len(),
2975            2,
2976            "two SET assignments: one list_append and one plain"
2977        );
2978    }
2979
2980    #[test]
2981    fn test_evaluate_key_condition_simple() {
2982        let mut item = HashMap::new();
2983        item.insert("pk".to_string(), json!({"S": "user1"}));
2984        item.insert("sk".to_string(), json!({"S": "order1"}));
2985
2986        let mut expr_values = HashMap::new();
2987        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
2988
2989        assert!(evaluate_key_condition(
2990            "pk = :pk",
2991            &item,
2992            &HashMap::new(),
2993            &expr_values,
2994        ));
2995    }
2996
2997    #[test]
2998    fn test_compare_attribute_values_numbers() {
2999        let a = json!({"N": "10"});
3000        let b = json!({"N": "20"});
3001        assert_eq!(
3002            compare_attribute_values(Some(&a), Some(&b)),
3003            std::cmp::Ordering::Less
3004        );
3005    }
3006
3007    #[test]
3008    fn test_compare_attribute_values_strings() {
3009        let a = json!({"S": "apple"});
3010        let b = json!({"S": "banana"});
3011        assert_eq!(
3012            compare_attribute_values(Some(&a), Some(&b)),
3013            std::cmp::Ordering::Less
3014        );
3015    }
3016
3017    #[test]
3018    fn test_split_on_and() {
3019        let parts = split_on_and("pk = :pk AND sk > :sk");
3020        assert_eq!(parts.len(), 2);
3021        assert_eq!(parts[0].trim(), "pk = :pk");
3022        assert_eq!(parts[1].trim(), "sk > :sk");
3023    }
3024
3025    #[test]
3026    fn test_split_on_and_respects_parentheses() {
3027        // Before fix: split_on_and would split inside the parens
3028        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
3029        // Should NOT split on the AND inside parentheses
3030        assert_eq!(parts.len(), 1);
3031        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
3032    }
3033
3034    #[test]
3035    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
3036        // (a AND b) OR c — should match when c is true but a is false
3037        let mut item = HashMap::new();
3038        item.insert("x".to_string(), json!({"S": "no"}));
3039        item.insert("y".to_string(), json!({"S": "no"}));
3040        item.insert("z".to_string(), json!({"S": "yes"}));
3041
3042        let mut expr_values = HashMap::new();
3043        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
3044
3045        // x=yes AND y=yes => false, but z=yes => true => overall true
3046        let result = evaluate_filter_expression(
3047            "(x = :yes AND y = :yes) OR z = :yes",
3048            &item,
3049            &HashMap::new(),
3050            &expr_values,
3051        );
3052        assert!(result, "should match because z = :yes is true");
3053
3054        // x=yes AND y=yes => false, z=yes => false => overall false
3055        let mut item2 = HashMap::new();
3056        item2.insert("x".to_string(), json!({"S": "no"}));
3057        item2.insert("y".to_string(), json!({"S": "no"}));
3058        item2.insert("z".to_string(), json!({"S": "no"}));
3059
3060        let result2 = evaluate_filter_expression(
3061            "(x = :yes AND y = :yes) OR z = :yes",
3062            &item2,
3063            &HashMap::new(),
3064            &expr_values,
3065        );
3066        assert!(!result2, "should not match because nothing is true");
3067    }
3068
3069    #[test]
3070    fn test_project_item_nested_path() {
3071        // Item with a list attribute containing maps
3072        let mut item = HashMap::new();
3073        item.insert("pk".to_string(), json!({"S": "key1"}));
3074        item.insert(
3075            "data".to_string(),
3076            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
3077        );
3078
3079        let body = json!({
3080            "ProjectionExpression": "data[0].name"
3081        });
3082
3083        let projected = project_item(&item, &body);
3084        // Should contain data[0].name = "Alice", not the entire data[0] element
3085        let name = projected
3086            .get("data")
3087            .and_then(|v| v.get("L"))
3088            .and_then(|v| v.get(0))
3089            .and_then(|v| v.get("M"))
3090            .and_then(|v| v.get("name"))
3091            .and_then(|v| v.get("S"))
3092            .and_then(|v| v.as_str());
3093        assert_eq!(name, Some("Alice"));
3094
3095        // Should NOT contain the "age" field
3096        let age = projected
3097            .get("data")
3098            .and_then(|v| v.get("L"))
3099            .and_then(|v| v.get(0))
3100            .and_then(|v| v.get("M"))
3101            .and_then(|v| v.get("age"));
3102        assert!(age.is_none(), "age should not be present in projection");
3103    }
3104
3105    #[test]
3106    fn test_resolve_nested_path_map() {
3107        let mut item = HashMap::new();
3108        item.insert(
3109            "info".to_string(),
3110            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
3111        );
3112
3113        let result = resolve_nested_path(&item, "info.address.city");
3114        assert_eq!(result, Some(json!({"S": "NYC"})));
3115    }
3116
3117    #[test]
3118    fn test_resolve_nested_path_list_then_map() {
3119        let mut item = HashMap::new();
3120        item.insert(
3121            "items".to_string(),
3122            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
3123        );
3124
3125        let result = resolve_nested_path(&item, "items[0].sku");
3126        assert_eq!(result, Some(json!({"S": "ABC"})));
3127    }
3128
3129    #[test]
3130    fn test_resolve_path_alias_with_dot_is_top_level_attr() {
3131        // Top-level attribute name literally contains a dot; user aliases it
3132        // via ExpressionAttributeNames and references the alias. Must resolve
3133        // to the top-level attribute, NOT be walked as a nested path.
3134        let mut item = HashMap::new();
3135        item.insert("Safety.Warning".to_string(), json!({"S": "high"}));
3136        let mut names = HashMap::new();
3137        names.insert("#sw".to_string(), "Safety.Warning".to_string());
3138
3139        let result = resolve_path("#sw", &item, &names);
3140        assert_eq!(result, Some(json!({"S": "high"})));
3141    }
3142
3143    #[test]
3144    fn test_resolve_path_dotted_expression_still_walks_nested() {
3145        // When the expression itself contains `.`, we still walk the nested
3146        // path (the dot is a path separator, not part of an attribute name).
3147        let mut item = HashMap::new();
3148        item.insert("profile".to_string(), json!({"M": {"email": {"S": "x@y"}}}));
3149        let names = HashMap::new();
3150
3151        let result = resolve_path("profile.email", &item, &names);
3152        assert_eq!(result, Some(json!({"S": "x@y"})));
3153    }
3154
3155    #[test]
3156    fn test_project_item_alias_with_dot_is_top_level_attr() {
3157        // Same invariant must hold for ProjectionExpression.
3158        let mut item = HashMap::new();
3159        item.insert("Safety.Warning".to_string(), json!({"S": "high"}));
3160        item.insert("other".to_string(), json!({"S": "ignored"}));
3161        let body = json!({
3162            "ProjectionExpression": "#sw",
3163            "ExpressionAttributeNames": {"#sw": "Safety.Warning"},
3164        });
3165
3166        let projected = project_item(&item, &body);
3167        assert_eq!(projected.get("Safety.Warning"), Some(&json!({"S": "high"})));
3168        assert!(!projected.contains_key("other"));
3169    }
3170
3171    // -- Integration-style tests using DynamoDbService --
3172
3173    use crate::state::SharedDynamoDbState;
3174    use parking_lot::RwLock;
3175    use std::sync::Arc;
3176
3177    fn make_service() -> DynamoDbService {
3178        let state: SharedDynamoDbState = Arc::new(RwLock::new(
3179            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3180        ));
3181        DynamoDbService::new(state)
3182    }
3183
3184    fn make_request(action: &str, body: Value) -> AwsRequest {
3185        AwsRequest {
3186            service: "dynamodb".to_string(),
3187            action: action.to_string(),
3188            region: "us-east-1".to_string(),
3189            account_id: "123456789012".to_string(),
3190            request_id: "test-id".to_string(),
3191            headers: http::HeaderMap::new(),
3192            query_params: HashMap::new(),
3193            body: serde_json::to_vec(&body).unwrap().into(),
3194            body_stream: parking_lot::Mutex::new(None),
3195            path_segments: vec![],
3196            raw_path: "/".to_string(),
3197            raw_query: String::new(),
3198            method: http::Method::POST,
3199            is_query_protocol: false,
3200            access_key_id: None,
3201            principal: None,
3202        }
3203    }
3204
3205    fn create_test_table(svc: &DynamoDbService) {
3206        let req = make_request(
3207            "CreateTable",
3208            json!({
3209                "TableName": "test-table",
3210                "KeySchema": [
3211                    { "AttributeName": "pk", "KeyType": "HASH" }
3212                ],
3213                "AttributeDefinitions": [
3214                    { "AttributeName": "pk", "AttributeType": "S" }
3215                ],
3216                "BillingMode": "PAY_PER_REQUEST"
3217            }),
3218        );
3219        svc.create_table(&req).unwrap();
3220    }
3221
3222    #[test]
3223    fn describe_table_returns_stable_table_id_and_active_warm_throughput() {
3224        let svc = make_service();
3225        let req = make_request(
3226            "CreateTable",
3227            json!({
3228                "TableName": "warm-throughput-table",
3229                "KeySchema": [
3230                    { "AttributeName": "pk", "KeyType": "HASH" }
3231                ],
3232                "AttributeDefinitions": [
3233                    { "AttributeName": "pk", "AttributeType": "S" }
3234                ],
3235                "BillingMode": "PAY_PER_REQUEST"
3236            }),
3237        );
3238        let create_resp = svc.create_table(&req).unwrap();
3239        let create_body: Value = serde_json::from_slice(create_resp.body.expect_bytes()).unwrap();
3240        let create_table = &create_body["TableDescription"];
3241
3242        assert_eq!(create_table["TableStatus"], "ACTIVE");
3243        assert_eq!(create_table["WarmThroughput"]["Status"], "ACTIVE");
3244        let table_id = create_table["TableId"].as_str().unwrap().to_string();
3245        assert!(!table_id.is_empty());
3246
3247        let describe_req = make_request(
3248            "DescribeTable",
3249            json!({ "TableName": "warm-throughput-table" }),
3250        );
3251        let describe_resp = svc.describe_table(&describe_req).unwrap();
3252        let describe_body: Value =
3253            serde_json::from_slice(describe_resp.body.expect_bytes()).unwrap();
3254        let described_table = &describe_body["Table"];
3255
3256        assert_eq!(described_table["TableStatus"], "ACTIVE");
3257        assert_eq!(described_table["WarmThroughput"]["Status"], "ACTIVE");
3258        assert_eq!(described_table["TableId"], table_id);
3259
3260        let describe_resp_again = svc.describe_table(&describe_req).unwrap();
3261        let describe_body_again: Value =
3262            serde_json::from_slice(describe_resp_again.body.expect_bytes()).unwrap();
3263        assert_eq!(describe_body_again["Table"]["TableId"], table_id);
3264    }
3265
3266    #[test]
3267    fn delete_item_return_values_all_old() {
3268        let svc = make_service();
3269        create_test_table(&svc);
3270
3271        // Put an item
3272        let req = make_request(
3273            "PutItem",
3274            json!({
3275                "TableName": "test-table",
3276                "Item": {
3277                    "pk": { "S": "key1" },
3278                    "name": { "S": "Alice" },
3279                    "age": { "N": "30" }
3280                }
3281            }),
3282        );
3283        svc.put_item(&req).unwrap();
3284
3285        // Delete with ReturnValues=ALL_OLD
3286        let req = make_request(
3287            "DeleteItem",
3288            json!({
3289                "TableName": "test-table",
3290                "Key": { "pk": { "S": "key1" } },
3291                "ReturnValues": "ALL_OLD"
3292            }),
3293        );
3294        let resp = svc.delete_item(&req).unwrap();
3295        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3296
3297        // Verify the old item is returned
3298        let attrs = &body["Attributes"];
3299        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
3300        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
3301        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
3302
3303        // Verify the item is actually deleted
3304        let req = make_request(
3305            "GetItem",
3306            json!({
3307                "TableName": "test-table",
3308                "Key": { "pk": { "S": "key1" } }
3309            }),
3310        );
3311        let resp = svc.get_item(&req).unwrap();
3312        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3313        assert!(body.get("Item").is_none(), "item should be deleted");
3314    }
3315
3316    #[test]
3317    fn transact_get_items_returns_existing_and_missing() {
3318        let svc = make_service();
3319        create_test_table(&svc);
3320
3321        // Put one item
3322        let req = make_request(
3323            "PutItem",
3324            json!({
3325                "TableName": "test-table",
3326                "Item": {
3327                    "pk": { "S": "exists" },
3328                    "val": { "S": "hello" }
3329                }
3330            }),
3331        );
3332        svc.put_item(&req).unwrap();
3333
3334        let req = make_request(
3335            "TransactGetItems",
3336            json!({
3337                "TransactItems": [
3338                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
3339                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
3340                ]
3341            }),
3342        );
3343        let resp = svc.transact_get_items(&req).unwrap();
3344        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3345        let responses = body["Responses"].as_array().unwrap();
3346        assert_eq!(responses.len(), 2);
3347        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
3348        assert!(responses[1].get("Item").is_none());
3349    }
3350
3351    #[test]
3352    fn transact_write_items_put_and_delete() {
3353        let svc = make_service();
3354        create_test_table(&svc);
3355
3356        // Put initial item
3357        let req = make_request(
3358            "PutItem",
3359            json!({
3360                "TableName": "test-table",
3361                "Item": {
3362                    "pk": { "S": "to-delete" },
3363                    "val": { "S": "bye" }
3364                }
3365            }),
3366        );
3367        svc.put_item(&req).unwrap();
3368
3369        // TransactWrite: put new + delete existing
3370        let req = make_request(
3371            "TransactWriteItems",
3372            json!({
3373                "TransactItems": [
3374                    {
3375                        "Put": {
3376                            "TableName": "test-table",
3377                            "Item": {
3378                                "pk": { "S": "new-item" },
3379                                "val": { "S": "hi" }
3380                            }
3381                        }
3382                    },
3383                    {
3384                        "Delete": {
3385                            "TableName": "test-table",
3386                            "Key": { "pk": { "S": "to-delete" } }
3387                        }
3388                    }
3389                ]
3390            }),
3391        );
3392        let resp = svc.transact_write_items(&req).unwrap();
3393        assert_eq!(resp.status, StatusCode::OK);
3394
3395        // Verify new item exists
3396        let req = make_request(
3397            "GetItem",
3398            json!({
3399                "TableName": "test-table",
3400                "Key": { "pk": { "S": "new-item" } }
3401            }),
3402        );
3403        let resp = svc.get_item(&req).unwrap();
3404        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3405        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
3406
3407        // Verify deleted item is gone
3408        let req = make_request(
3409            "GetItem",
3410            json!({
3411                "TableName": "test-table",
3412                "Key": { "pk": { "S": "to-delete" } }
3413            }),
3414        );
3415        let resp = svc.get_item(&req).unwrap();
3416        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3417        assert!(body.get("Item").is_none());
3418    }
3419
3420    #[test]
3421    fn transact_write_items_condition_check_failure() {
3422        let svc = make_service();
3423        create_test_table(&svc);
3424
3425        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
3426        let req = make_request(
3427            "TransactWriteItems",
3428            json!({
3429                "TransactItems": [
3430                    {
3431                        "ConditionCheck": {
3432                            "TableName": "test-table",
3433                            "Key": { "pk": { "S": "nonexistent" } },
3434                            "ConditionExpression": "attribute_exists(pk)"
3435                        }
3436                    }
3437                ]
3438            }),
3439        );
3440        let resp = svc.transact_write_items(&req).unwrap();
3441        // Should be a 400 error response
3442        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
3443        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3444        assert_eq!(
3445            body["__type"].as_str().unwrap(),
3446            "TransactionCanceledException"
3447        );
3448        assert!(body["CancellationReasons"].as_array().is_some());
3449    }
3450
3451    #[test]
3452    fn update_and_describe_time_to_live() {
3453        let svc = make_service();
3454        create_test_table(&svc);
3455
3456        // Enable TTL
3457        let req = make_request(
3458            "UpdateTimeToLive",
3459            json!({
3460                "TableName": "test-table",
3461                "TimeToLiveSpecification": {
3462                    "AttributeName": "ttl",
3463                    "Enabled": true
3464                }
3465            }),
3466        );
3467        let resp = svc.update_time_to_live(&req).unwrap();
3468        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3469        assert_eq!(
3470            body["TimeToLiveSpecification"]["AttributeName"]
3471                .as_str()
3472                .unwrap(),
3473            "ttl"
3474        );
3475        assert!(body["TimeToLiveSpecification"]["Enabled"]
3476            .as_bool()
3477            .unwrap());
3478
3479        // Describe TTL
3480        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3481        let resp = svc.describe_time_to_live(&req).unwrap();
3482        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3483        assert_eq!(
3484            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3485                .as_str()
3486                .unwrap(),
3487            "ENABLED"
3488        );
3489        assert_eq!(
3490            body["TimeToLiveDescription"]["AttributeName"]
3491                .as_str()
3492                .unwrap(),
3493            "ttl"
3494        );
3495
3496        // Disable TTL
3497        let req = make_request(
3498            "UpdateTimeToLive",
3499            json!({
3500                "TableName": "test-table",
3501                "TimeToLiveSpecification": {
3502                    "AttributeName": "ttl",
3503                    "Enabled": false
3504                }
3505            }),
3506        );
3507        svc.update_time_to_live(&req).unwrap();
3508
3509        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3510        let resp = svc.describe_time_to_live(&req).unwrap();
3511        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3512        assert_eq!(
3513            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3514                .as_str()
3515                .unwrap(),
3516            "DISABLED"
3517        );
3518    }
3519
3520    #[test]
3521    fn resource_policy_lifecycle() {
3522        let svc = make_service();
3523        create_test_table(&svc);
3524
3525        let table_arn = {
3526            let __mas = svc.state.read();
3527            let state = __mas.default_ref();
3528            state.tables.get("test-table").unwrap().arn.clone()
3529        };
3530
3531        // Put policy
3532        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
3533        let req = make_request(
3534            "PutResourcePolicy",
3535            json!({
3536                "ResourceArn": table_arn,
3537                "Policy": policy_doc
3538            }),
3539        );
3540        let resp = svc.put_resource_policy(&req).unwrap();
3541        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3542        assert!(body["RevisionId"].as_str().is_some());
3543
3544        // Get policy
3545        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3546        let resp = svc.get_resource_policy(&req).unwrap();
3547        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3548        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
3549
3550        // Delete policy
3551        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
3552        svc.delete_resource_policy(&req).unwrap();
3553
3554        // Get should return null now
3555        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3556        let resp = svc.get_resource_policy(&req).unwrap();
3557        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3558        assert!(body["Policy"].is_null());
3559    }
3560
3561    #[test]
3562    fn describe_endpoints() {
3563        let svc = make_service();
3564        let req = make_request("DescribeEndpoints", json!({}));
3565        let resp = svc.describe_endpoints(&req).unwrap();
3566        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3567        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
3568    }
3569
3570    #[test]
3571    fn describe_limits() {
3572        let svc = make_service();
3573        let req = make_request("DescribeLimits", json!({}));
3574        let resp = svc.describe_limits(&req).unwrap();
3575        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3576        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
3577    }
3578
3579    #[test]
3580    fn backup_lifecycle() {
3581        let svc = make_service();
3582        create_test_table(&svc);
3583
3584        // Create backup
3585        let req = make_request(
3586            "CreateBackup",
3587            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
3588        );
3589        let resp = svc.create_backup(&req).unwrap();
3590        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3591        let backup_arn = body["BackupDetails"]["BackupArn"]
3592            .as_str()
3593            .unwrap()
3594            .to_string();
3595        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
3596
3597        // Describe backup
3598        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
3599        let resp = svc.describe_backup(&req).unwrap();
3600        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3601        assert_eq!(
3602            body["BackupDescription"]["BackupDetails"]["BackupName"],
3603            "my-backup"
3604        );
3605
3606        // List backups
3607        let req = make_request("ListBackups", json!({}));
3608        let resp = svc.list_backups(&req).unwrap();
3609        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3610        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
3611
3612        // Restore from backup
3613        let req = make_request(
3614            "RestoreTableFromBackup",
3615            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
3616        );
3617        svc.restore_table_from_backup(&req).unwrap();
3618
3619        // Verify restored table exists
3620        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
3621        let resp = svc.describe_table(&req).unwrap();
3622        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3623        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3624
3625        // Delete backup
3626        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
3627        svc.delete_backup(&req).unwrap();
3628
3629        // List should be empty
3630        let req = make_request("ListBackups", json!({}));
3631        let resp = svc.list_backups(&req).unwrap();
3632        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3633        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
3634    }
3635
3636    #[test]
3637    fn continuous_backups() {
3638        let svc = make_service();
3639        create_test_table(&svc);
3640
3641        // Initially disabled
3642        let req = make_request(
3643            "DescribeContinuousBackups",
3644            json!({ "TableName": "test-table" }),
3645        );
3646        let resp = svc.describe_continuous_backups(&req).unwrap();
3647        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3648        assert_eq!(
3649            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3650                ["PointInTimeRecoveryStatus"],
3651            "DISABLED"
3652        );
3653
3654        // Enable
3655        let req = make_request(
3656            "UpdateContinuousBackups",
3657            json!({
3658                "TableName": "test-table",
3659                "PointInTimeRecoverySpecification": {
3660                    "PointInTimeRecoveryEnabled": true
3661                }
3662            }),
3663        );
3664        svc.update_continuous_backups(&req).unwrap();
3665
3666        // Verify
3667        let req = make_request(
3668            "DescribeContinuousBackups",
3669            json!({ "TableName": "test-table" }),
3670        );
3671        let resp = svc.describe_continuous_backups(&req).unwrap();
3672        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3673        assert_eq!(
3674            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3675                ["PointInTimeRecoveryStatus"],
3676            "ENABLED"
3677        );
3678    }
3679
3680    #[test]
3681    fn restore_table_to_point_in_time() {
3682        let svc = make_service();
3683        create_test_table(&svc);
3684
3685        let req = make_request(
3686            "RestoreTableToPointInTime",
3687            json!({
3688                "SourceTableName": "test-table",
3689                "TargetTableName": "pitr-restored"
3690            }),
3691        );
3692        svc.restore_table_to_point_in_time(&req).unwrap();
3693
3694        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
3695        let resp = svc.describe_table(&req).unwrap();
3696        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3697        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3698    }
3699
3700    #[test]
3701    fn global_table_lifecycle() {
3702        let svc = make_service();
3703
3704        // Create global table
3705        let req = make_request(
3706            "CreateGlobalTable",
3707            json!({
3708                "GlobalTableName": "my-global",
3709                "ReplicationGroup": [
3710                    { "RegionName": "us-east-1" },
3711                    { "RegionName": "eu-west-1" }
3712                ]
3713            }),
3714        );
3715        let resp = svc.create_global_table(&req).unwrap();
3716        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3717        assert_eq!(
3718            body["GlobalTableDescription"]["GlobalTableStatus"],
3719            "ACTIVE"
3720        );
3721
3722        // Describe
3723        let req = make_request(
3724            "DescribeGlobalTable",
3725            json!({ "GlobalTableName": "my-global" }),
3726        );
3727        let resp = svc.describe_global_table(&req).unwrap();
3728        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3729        assert_eq!(
3730            body["GlobalTableDescription"]["ReplicationGroup"]
3731                .as_array()
3732                .unwrap()
3733                .len(),
3734            2
3735        );
3736
3737        // List
3738        let req = make_request("ListGlobalTables", json!({}));
3739        let resp = svc.list_global_tables(&req).unwrap();
3740        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3741        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
3742
3743        // Update - add a region
3744        let req = make_request(
3745            "UpdateGlobalTable",
3746            json!({
3747                "GlobalTableName": "my-global",
3748                "ReplicaUpdates": [
3749                    { "Create": { "RegionName": "ap-southeast-1" } }
3750                ]
3751            }),
3752        );
3753        let resp = svc.update_global_table(&req).unwrap();
3754        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3755        assert_eq!(
3756            body["GlobalTableDescription"]["ReplicationGroup"]
3757                .as_array()
3758                .unwrap()
3759                .len(),
3760            3
3761        );
3762
3763        // Describe settings
3764        let req = make_request(
3765            "DescribeGlobalTableSettings",
3766            json!({ "GlobalTableName": "my-global" }),
3767        );
3768        let resp = svc.describe_global_table_settings(&req).unwrap();
3769        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3770        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
3771
3772        // Update settings (no-op, just verify no error)
3773        let req = make_request(
3774            "UpdateGlobalTableSettings",
3775            json!({ "GlobalTableName": "my-global" }),
3776        );
3777        svc.update_global_table_settings(&req).unwrap();
3778    }
3779
3780    #[test]
3781    fn table_replica_auto_scaling() {
3782        let svc = make_service();
3783        create_test_table(&svc);
3784
3785        let req = make_request(
3786            "DescribeTableReplicaAutoScaling",
3787            json!({ "TableName": "test-table" }),
3788        );
3789        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
3790        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3791        assert_eq!(
3792            body["TableAutoScalingDescription"]["TableName"],
3793            "test-table"
3794        );
3795
3796        let req = make_request(
3797            "UpdateTableReplicaAutoScaling",
3798            json!({ "TableName": "test-table" }),
3799        );
3800        svc.update_table_replica_auto_scaling(&req).unwrap();
3801    }
3802
3803    #[test]
3804    fn kinesis_streaming_lifecycle() {
3805        let svc = make_service();
3806        create_test_table(&svc);
3807
3808        // Enable
3809        let req = make_request(
3810            "EnableKinesisStreamingDestination",
3811            json!({
3812                "TableName": "test-table",
3813                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3814            }),
3815        );
3816        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
3817        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3818        assert_eq!(body["DestinationStatus"], "ACTIVE");
3819
3820        // Describe
3821        let req = make_request(
3822            "DescribeKinesisStreamingDestination",
3823            json!({ "TableName": "test-table" }),
3824        );
3825        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
3826        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3827        assert_eq!(
3828            body["KinesisDataStreamDestinations"]
3829                .as_array()
3830                .unwrap()
3831                .len(),
3832            1
3833        );
3834
3835        // Update
3836        let req = make_request(
3837            "UpdateKinesisStreamingDestination",
3838            json!({
3839                "TableName": "test-table",
3840                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
3841                "UpdateKinesisStreamingConfiguration": {
3842                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
3843                }
3844            }),
3845        );
3846        svc.update_kinesis_streaming_destination(&req).unwrap();
3847
3848        // Disable
3849        let req = make_request(
3850            "DisableKinesisStreamingDestination",
3851            json!({
3852                "TableName": "test-table",
3853                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3854            }),
3855        );
3856        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
3857        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3858        assert_eq!(body["DestinationStatus"], "DISABLED");
3859    }
3860
3861    #[test]
3862    fn contributor_insights_lifecycle() {
3863        let svc = make_service();
3864        create_test_table(&svc);
3865
3866        // Initially disabled
3867        let req = make_request(
3868            "DescribeContributorInsights",
3869            json!({ "TableName": "test-table" }),
3870        );
3871        let resp = svc.describe_contributor_insights(&req).unwrap();
3872        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3873        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
3874
3875        // Enable
3876        let req = make_request(
3877            "UpdateContributorInsights",
3878            json!({
3879                "TableName": "test-table",
3880                "ContributorInsightsAction": "ENABLE"
3881            }),
3882        );
3883        let resp = svc.update_contributor_insights(&req).unwrap();
3884        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3885        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
3886
3887        // List
3888        let req = make_request("ListContributorInsights", json!({}));
3889        let resp = svc.list_contributor_insights(&req).unwrap();
3890        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3891        assert_eq!(
3892            body["ContributorInsightsSummaries"]
3893                .as_array()
3894                .unwrap()
3895                .len(),
3896            1
3897        );
3898    }
3899
3900    #[test]
3901    fn export_lifecycle() {
3902        let svc = make_service();
3903        create_test_table(&svc);
3904
3905        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
3906
3907        // Export
3908        let req = make_request(
3909            "ExportTableToPointInTime",
3910            json!({
3911                "TableArn": table_arn,
3912                "S3Bucket": "my-bucket"
3913            }),
3914        );
3915        let resp = svc.export_table_to_point_in_time(&req).unwrap();
3916        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3917        let export_arn = body["ExportDescription"]["ExportArn"]
3918            .as_str()
3919            .unwrap()
3920            .to_string();
3921        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
3922
3923        // Describe
3924        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
3925        let resp = svc.describe_export(&req).unwrap();
3926        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3927        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
3928
3929        // List
3930        let req = make_request("ListExports", json!({}));
3931        let resp = svc.list_exports(&req).unwrap();
3932        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3933        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
3934    }
3935
3936    #[test]
3937    fn import_lifecycle() {
3938        let svc = make_service();
3939
3940        let req = make_request(
3941            "ImportTable",
3942            json!({
3943                "InputFormat": "DYNAMODB_JSON",
3944                "S3BucketSource": { "S3Bucket": "import-bucket" },
3945                "TableCreationParameters": {
3946                    "TableName": "imported-table",
3947                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
3948                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
3949                }
3950            }),
3951        );
3952        let resp = svc.import_table(&req).unwrap();
3953        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3954        let import_arn = body["ImportTableDescription"]["ImportArn"]
3955            .as_str()
3956            .unwrap()
3957            .to_string();
3958        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3959
3960        // Describe import
3961        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
3962        let resp = svc.describe_import(&req).unwrap();
3963        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3964        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3965
3966        // List imports
3967        let req = make_request("ListImports", json!({}));
3968        let resp = svc.list_imports(&req).unwrap();
3969        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3970        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
3971
3972        // Verify the table was created
3973        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
3974        let resp = svc.describe_table(&req).unwrap();
3975        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3976        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3977    }
3978
3979    #[test]
3980    fn backup_restore_preserves_items() {
3981        let svc = make_service();
3982        create_test_table(&svc);
3983
3984        // Put 3 items
3985        for i in 1..=3 {
3986            let req = make_request(
3987                "PutItem",
3988                json!({
3989                    "TableName": "test-table",
3990                    "Item": {
3991                        "pk": { "S": format!("key{i}") },
3992                        "data": { "S": format!("value{i}") }
3993                    }
3994                }),
3995            );
3996            svc.put_item(&req).unwrap();
3997        }
3998
3999        // Create backup
4000        let req = make_request(
4001            "CreateBackup",
4002            json!({
4003                "TableName": "test-table",
4004                "BackupName": "my-backup"
4005            }),
4006        );
4007        let resp = svc.create_backup(&req).unwrap();
4008        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4009        let backup_arn = body["BackupDetails"]["BackupArn"]
4010            .as_str()
4011            .unwrap()
4012            .to_string();
4013
4014        // Delete all items from the original table
4015        for i in 1..=3 {
4016            let req = make_request(
4017                "DeleteItem",
4018                json!({
4019                    "TableName": "test-table",
4020                    "Key": { "pk": { "S": format!("key{i}") } }
4021                }),
4022            );
4023            svc.delete_item(&req).unwrap();
4024        }
4025
4026        // Verify original table is empty
4027        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4028        let resp = svc.scan(&req).unwrap();
4029        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4030        assert_eq!(body["Count"], 0);
4031
4032        // Restore from backup
4033        let req = make_request(
4034            "RestoreTableFromBackup",
4035            json!({
4036                "BackupArn": backup_arn,
4037                "TargetTableName": "restored-table"
4038            }),
4039        );
4040        svc.restore_table_from_backup(&req).unwrap();
4041
4042        // Scan restored table — should have 3 items
4043        let req = make_request("Scan", json!({ "TableName": "restored-table" }));
4044        let resp = svc.scan(&req).unwrap();
4045        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4046        assert_eq!(body["Count"], 3);
4047        assert_eq!(body["Items"].as_array().unwrap().len(), 3);
4048    }
4049
4050    #[test]
4051    fn global_table_replicates_writes() {
4052        let svc = make_service();
4053        create_test_table(&svc);
4054
4055        // Create global table with replicas
4056        let req = make_request(
4057            "CreateGlobalTable",
4058            json!({
4059                "GlobalTableName": "test-table",
4060                "ReplicationGroup": [
4061                    { "RegionName": "us-east-1" },
4062                    { "RegionName": "eu-west-1" }
4063                ]
4064            }),
4065        );
4066        let resp = svc.create_global_table(&req).unwrap();
4067        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4068        assert_eq!(
4069            body["GlobalTableDescription"]["GlobalTableStatus"],
4070            "ACTIVE"
4071        );
4072
4073        // Put an item
4074        let req = make_request(
4075            "PutItem",
4076            json!({
4077                "TableName": "test-table",
4078                "Item": {
4079                    "pk": { "S": "replicated-key" },
4080                    "data": { "S": "replicated-value" }
4081                }
4082            }),
4083        );
4084        svc.put_item(&req).unwrap();
4085
4086        // Verify the item is readable (since all replicas share the same table)
4087        let req = make_request(
4088            "GetItem",
4089            json!({
4090                "TableName": "test-table",
4091                "Key": { "pk": { "S": "replicated-key" } }
4092            }),
4093        );
4094        let resp = svc.get_item(&req).unwrap();
4095        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4096        assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
4097        assert_eq!(body["Item"]["data"]["S"], "replicated-value");
4098    }
4099
4100    #[test]
4101    fn contributor_insights_tracks_access() {
4102        let svc = make_service();
4103        create_test_table(&svc);
4104
4105        // Enable contributor insights
4106        let req = make_request(
4107            "UpdateContributorInsights",
4108            json!({
4109                "TableName": "test-table",
4110                "ContributorInsightsAction": "ENABLE"
4111            }),
4112        );
4113        svc.update_contributor_insights(&req).unwrap();
4114
4115        // Put items with different partition keys
4116        for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
4117            let req = make_request(
4118                "PutItem",
4119                json!({
4120                    "TableName": "test-table",
4121                    "Item": {
4122                        "pk": { "S": key },
4123                        "data": { "S": "value" }
4124                    }
4125                }),
4126            );
4127            svc.put_item(&req).unwrap();
4128        }
4129
4130        // Get items (to also track read access)
4131        for _ in 0..3 {
4132            let req = make_request(
4133                "GetItem",
4134                json!({
4135                    "TableName": "test-table",
4136                    "Key": { "pk": { "S": "alpha" } }
4137                }),
4138            );
4139            svc.get_item(&req).unwrap();
4140        }
4141
4142        // Describe contributor insights — should show top contributors
4143        let req = make_request(
4144            "DescribeContributorInsights",
4145            json!({ "TableName": "test-table" }),
4146        );
4147        let resp = svc.describe_contributor_insights(&req).unwrap();
4148        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4149        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
4150
4151        let contributors = body["TopContributors"].as_array().unwrap();
4152        assert!(
4153            !contributors.is_empty(),
4154            "TopContributors should not be empty"
4155        );
4156
4157        // alpha was accessed 3 (put) + 3 (get) = 6 times, beta 2 times
4158        // alpha should be the top contributor
4159        let top = &contributors[0];
4160        assert!(top["Count"].as_u64().unwrap() > 0);
4161
4162        // Verify the rule list is populated
4163        let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
4164        assert!(!rules.is_empty());
4165    }
4166
4167    #[test]
4168    fn contributor_insights_not_tracked_when_disabled() {
4169        let svc = make_service();
4170        create_test_table(&svc);
4171
4172        // Put items without enabling insights
4173        let req = make_request(
4174            "PutItem",
4175            json!({
4176                "TableName": "test-table",
4177                "Item": {
4178                    "pk": { "S": "key1" },
4179                    "data": { "S": "value" }
4180                }
4181            }),
4182        );
4183        svc.put_item(&req).unwrap();
4184
4185        // Describe — should show empty contributors
4186        let req = make_request(
4187            "DescribeContributorInsights",
4188            json!({ "TableName": "test-table" }),
4189        );
4190        let resp = svc.describe_contributor_insights(&req).unwrap();
4191        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4192        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
4193
4194        let contributors = body["TopContributors"].as_array().unwrap();
4195        assert!(contributors.is_empty());
4196    }
4197
4198    #[test]
4199    fn contributor_insights_disabled_table_no_counters_after_scan() {
4200        let svc = make_service();
4201        create_test_table(&svc);
4202
4203        // Put items
4204        for key in &["alpha", "beta"] {
4205            let req = make_request(
4206                "PutItem",
4207                json!({
4208                    "TableName": "test-table",
4209                    "Item": { "pk": { "S": key } }
4210                }),
4211            );
4212            svc.put_item(&req).unwrap();
4213        }
4214
4215        // Enable insights, then scan, then disable, then check counters are cleared
4216        let req = make_request(
4217            "UpdateContributorInsights",
4218            json!({
4219                "TableName": "test-table",
4220                "ContributorInsightsAction": "ENABLE"
4221            }),
4222        );
4223        svc.update_contributor_insights(&req).unwrap();
4224
4225        // Scan to trigger counter collection
4226        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4227        svc.scan(&req).unwrap();
4228
4229        // Verify counters were collected
4230        let req = make_request(
4231            "DescribeContributorInsights",
4232            json!({ "TableName": "test-table" }),
4233        );
4234        let resp = svc.describe_contributor_insights(&req).unwrap();
4235        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4236        let contributors = body["TopContributors"].as_array().unwrap();
4237        assert!(
4238            !contributors.is_empty(),
4239            "counters should be non-empty while enabled"
4240        );
4241
4242        // Disable insights (this clears counters)
4243        let req = make_request(
4244            "UpdateContributorInsights",
4245            json!({
4246                "TableName": "test-table",
4247                "ContributorInsightsAction": "DISABLE"
4248            }),
4249        );
4250        svc.update_contributor_insights(&req).unwrap();
4251
4252        // Scan again -- should NOT accumulate counters since insights is disabled
4253        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4254        svc.scan(&req).unwrap();
4255
4256        // Verify counters are still empty
4257        let req = make_request(
4258            "DescribeContributorInsights",
4259            json!({ "TableName": "test-table" }),
4260        );
4261        let resp = svc.describe_contributor_insights(&req).unwrap();
4262        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4263        let contributors = body["TopContributors"].as_array().unwrap();
4264        assert!(
4265            contributors.is_empty(),
4266            "counters should be empty after disabling insights"
4267        );
4268    }
4269
4270    #[test]
4271    fn scan_pagination_with_limit() {
4272        let svc = make_service();
4273        create_test_table(&svc);
4274
4275        // Insert 5 items
4276        for i in 0..5 {
4277            let req = make_request(
4278                "PutItem",
4279                json!({
4280                    "TableName": "test-table",
4281                    "Item": {
4282                        "pk": { "S": format!("item{i}") },
4283                        "data": { "S": format!("value{i}") }
4284                    }
4285                }),
4286            );
4287            svc.put_item(&req).unwrap();
4288        }
4289
4290        // Scan with limit=2
4291        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
4292        let resp = svc.scan(&req).unwrap();
4293        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4294        assert_eq!(body["Count"], 2);
4295        assert!(
4296            body["LastEvaluatedKey"].is_object(),
4297            "should have LastEvaluatedKey when limit < total items"
4298        );
4299        assert!(body["LastEvaluatedKey"]["pk"].is_object());
4300
4301        // Page through all items
4302        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
4303        let mut lek = body["LastEvaluatedKey"].clone();
4304
4305        while lek.is_object() {
4306            let req = make_request(
4307                "Scan",
4308                json!({
4309                    "TableName": "test-table",
4310                    "Limit": 2,
4311                    "ExclusiveStartKey": lek
4312                }),
4313            );
4314            let resp = svc.scan(&req).unwrap();
4315            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4316            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
4317            lek = body["LastEvaluatedKey"].clone();
4318        }
4319
4320        assert_eq!(
4321            all_items.len(),
4322            5,
4323            "should retrieve all 5 items via pagination"
4324        );
4325    }
4326
4327    #[test]
4328    fn scan_no_pagination_when_all_fit() {
4329        let svc = make_service();
4330        create_test_table(&svc);
4331
4332        for i in 0..3 {
4333            let req = make_request(
4334                "PutItem",
4335                json!({
4336                    "TableName": "test-table",
4337                    "Item": {
4338                        "pk": { "S": format!("item{i}") }
4339                    }
4340                }),
4341            );
4342            svc.put_item(&req).unwrap();
4343        }
4344
4345        // Scan with limit > item count
4346        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
4347        let resp = svc.scan(&req).unwrap();
4348        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4349        assert_eq!(body["Count"], 3);
4350        assert!(
4351            body["LastEvaluatedKey"].is_null(),
4352            "should not have LastEvaluatedKey when all items fit"
4353        );
4354
4355        // Scan without limit
4356        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4357        let resp = svc.scan(&req).unwrap();
4358        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4359        assert_eq!(body["Count"], 3);
4360        assert!(body["LastEvaluatedKey"].is_null());
4361    }
4362
4363    fn create_composite_table(svc: &DynamoDbService) {
4364        let req = make_request(
4365            "CreateTable",
4366            json!({
4367                "TableName": "composite-table",
4368                "KeySchema": [
4369                    { "AttributeName": "pk", "KeyType": "HASH" },
4370                    { "AttributeName": "sk", "KeyType": "RANGE" }
4371                ],
4372                "AttributeDefinitions": [
4373                    { "AttributeName": "pk", "AttributeType": "S" },
4374                    { "AttributeName": "sk", "AttributeType": "S" }
4375                ],
4376                "BillingMode": "PAY_PER_REQUEST"
4377            }),
4378        );
4379        svc.create_table(&req).unwrap();
4380    }
4381
4382    #[test]
4383    fn query_pagination_with_composite_key() {
4384        let svc = make_service();
4385        create_composite_table(&svc);
4386
4387        // Insert 5 items under the same partition key
4388        for i in 0..5 {
4389            let req = make_request(
4390                "PutItem",
4391                json!({
4392                    "TableName": "composite-table",
4393                    "Item": {
4394                        "pk": { "S": "user1" },
4395                        "sk": { "S": format!("item{i:03}") },
4396                        "data": { "S": format!("value{i}") }
4397                    }
4398                }),
4399            );
4400            svc.put_item(&req).unwrap();
4401        }
4402
4403        // Query with limit=2
4404        let req = make_request(
4405            "Query",
4406            json!({
4407                "TableName": "composite-table",
4408                "KeyConditionExpression": "pk = :pk",
4409                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4410                "Limit": 2
4411            }),
4412        );
4413        let resp = svc.query(&req).unwrap();
4414        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4415        assert_eq!(body["Count"], 2);
4416        assert!(body["LastEvaluatedKey"].is_object());
4417        assert!(body["LastEvaluatedKey"]["pk"].is_object());
4418        assert!(body["LastEvaluatedKey"]["sk"].is_object());
4419
4420        // Page through all items
4421        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
4422        let mut lek = body["LastEvaluatedKey"].clone();
4423
4424        while lek.is_object() {
4425            let req = make_request(
4426                "Query",
4427                json!({
4428                    "TableName": "composite-table",
4429                    "KeyConditionExpression": "pk = :pk",
4430                    "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4431                    "Limit": 2,
4432                    "ExclusiveStartKey": lek
4433                }),
4434            );
4435            let resp = svc.query(&req).unwrap();
4436            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4437            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
4438            lek = body["LastEvaluatedKey"].clone();
4439        }
4440
4441        assert_eq!(
4442            all_items.len(),
4443            5,
4444            "should retrieve all 5 items via pagination"
4445        );
4446
4447        // Verify items came back sorted by sort key
4448        let sks: Vec<String> = all_items
4449            .iter()
4450            .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
4451            .collect();
4452        let mut sorted = sks.clone();
4453        sorted.sort();
4454        assert_eq!(sks, sorted, "items should be sorted by sort key");
4455    }
4456
4457    #[test]
4458    fn query_no_pagination_when_all_fit() {
4459        let svc = make_service();
4460        create_composite_table(&svc);
4461
4462        for i in 0..2 {
4463            let req = make_request(
4464                "PutItem",
4465                json!({
4466                    "TableName": "composite-table",
4467                    "Item": {
4468                        "pk": { "S": "user1" },
4469                        "sk": { "S": format!("item{i}") }
4470                    }
4471                }),
4472            );
4473            svc.put_item(&req).unwrap();
4474        }
4475
4476        let req = make_request(
4477            "Query",
4478            json!({
4479                "TableName": "composite-table",
4480                "KeyConditionExpression": "pk = :pk",
4481                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4482                "Limit": 10
4483            }),
4484        );
4485        let resp = svc.query(&req).unwrap();
4486        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4487        assert_eq!(body["Count"], 2);
4488        assert!(
4489            body["LastEvaluatedKey"].is_null(),
4490            "should not have LastEvaluatedKey when all items fit"
4491        );
4492    }
4493
4494    fn create_gsi_table(svc: &DynamoDbService) {
4495        let req = make_request(
4496            "CreateTable",
4497            json!({
4498                "TableName": "gsi-table",
4499                "KeySchema": [
4500                    { "AttributeName": "pk", "KeyType": "HASH" }
4501                ],
4502                "AttributeDefinitions": [
4503                    { "AttributeName": "pk", "AttributeType": "S" },
4504                    { "AttributeName": "gsi_pk", "AttributeType": "S" },
4505                    { "AttributeName": "gsi_sk", "AttributeType": "S" }
4506                ],
4507                "BillingMode": "PAY_PER_REQUEST",
4508                "GlobalSecondaryIndexes": [
4509                    {
4510                        "IndexName": "gsi-index",
4511                        "KeySchema": [
4512                            { "AttributeName": "gsi_pk", "KeyType": "HASH" },
4513                            { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
4514                        ],
4515                        "Projection": { "ProjectionType": "ALL" }
4516                    }
4517                ]
4518            }),
4519        );
4520        svc.create_table(&req).unwrap();
4521    }
4522
4523    #[test]
4524    fn gsi_query_last_evaluated_key_includes_table_pk() {
4525        let svc = make_service();
4526        create_gsi_table(&svc);
4527
4528        // Insert 3 items with the SAME GSI key but different table PKs
4529        for i in 0..3 {
4530            let req = make_request(
4531                "PutItem",
4532                json!({
4533                    "TableName": "gsi-table",
4534                    "Item": {
4535                        "pk": { "S": format!("item{i}") },
4536                        "gsi_pk": { "S": "shared" },
4537                        "gsi_sk": { "S": "sort" }
4538                    }
4539                }),
4540            );
4541            svc.put_item(&req).unwrap();
4542        }
4543
4544        // Query GSI with Limit=1 to trigger pagination
4545        let req = make_request(
4546            "Query",
4547            json!({
4548                "TableName": "gsi-table",
4549                "IndexName": "gsi-index",
4550                "KeyConditionExpression": "gsi_pk = :v",
4551                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4552                "Limit": 1
4553            }),
4554        );
4555        let resp = svc.query(&req).unwrap();
4556        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4557        assert_eq!(body["Count"], 1);
4558        let lek = &body["LastEvaluatedKey"];
4559        assert!(lek.is_object(), "should have LastEvaluatedKey");
4560        // Must contain the index keys
4561        assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
4562        assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
4563        // Must also contain the table PK
4564        assert!(
4565            lek["pk"].is_object(),
4566            "LEK must contain table PK for GSI queries"
4567        );
4568    }
4569
4570    #[test]
4571    fn gsi_query_pagination_returns_all_items() {
4572        let svc = make_service();
4573        create_gsi_table(&svc);
4574
4575        // Insert 4 items with the SAME GSI key but different table PKs
4576        for i in 0..4 {
4577            let req = make_request(
4578                "PutItem",
4579                json!({
4580                    "TableName": "gsi-table",
4581                    "Item": {
4582                        "pk": { "S": format!("item{i:03}") },
4583                        "gsi_pk": { "S": "shared" },
4584                        "gsi_sk": { "S": "sort" }
4585                    }
4586                }),
4587            );
4588            svc.put_item(&req).unwrap();
4589        }
4590
4591        // Paginate through all items with Limit=2
4592        let mut all_pks = Vec::new();
4593        let mut lek: Option<Value> = None;
4594
4595        loop {
4596            let mut query = json!({
4597                "TableName": "gsi-table",
4598                "IndexName": "gsi-index",
4599                "KeyConditionExpression": "gsi_pk = :v",
4600                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4601                "Limit": 2
4602            });
4603            if let Some(ref start_key) = lek {
4604                query["ExclusiveStartKey"] = start_key.clone();
4605            }
4606
4607            let req = make_request("Query", query);
4608            let resp = svc.query(&req).unwrap();
4609            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4610
4611            for item in body["Items"].as_array().unwrap() {
4612                let pk = item["pk"]["S"].as_str().unwrap().to_string();
4613                all_pks.push(pk);
4614            }
4615
4616            if body["LastEvaluatedKey"].is_object() {
4617                lek = Some(body["LastEvaluatedKey"].clone());
4618            } else {
4619                break;
4620            }
4621        }
4622
4623        all_pks.sort();
4624        assert_eq!(
4625            all_pks,
4626            vec!["item000", "item001", "item002", "item003"],
4627            "pagination should return all items without duplicates"
4628        );
4629    }
4630
4631    fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
4632        pairs
4633            .iter()
4634            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4635            .collect()
4636    }
4637
4638    fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
4639        pairs
4640            .iter()
4641            .map(|(k, v)| (k.to_string(), v.to_string()))
4642            .collect()
4643    }
4644
4645    fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
4646        pairs
4647            .iter()
4648            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4649            .collect()
4650    }
4651
4652    #[test]
4653    fn test_evaluate_condition_bare_not_equal() {
4654        let item = cond_item(&[("state", "active")]);
4655        let names = cond_names(&[("#s", "state")]);
4656        let values = cond_values(&[(":c", "complete")]);
4657
4658        assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
4659
4660        let item2 = cond_item(&[("state", "complete")]);
4661        assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
4662    }
4663
4664    #[test]
4665    fn test_evaluate_condition_parenthesized_not_equal() {
4666        let item = cond_item(&[("state", "active")]);
4667        let names = cond_names(&[("#s", "state")]);
4668        let values = cond_values(&[(":c", "complete")]);
4669
4670        assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
4671    }
4672
4673    #[test]
4674    fn test_evaluate_condition_parenthesized_equal_mismatch() {
4675        let item = cond_item(&[("state", "active")]);
4676        let names = cond_names(&[("#s", "state")]);
4677        let values = cond_values(&[(":c", "complete")]);
4678
4679        assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
4680    }
4681
4682    #[test]
4683    fn test_evaluate_condition_compound_and() {
4684        let item = cond_item(&[("state", "active")]);
4685        let names = cond_names(&[("#s", "state")]);
4686        let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
4687
4688        // active <> complete AND active <> failed => true
4689        assert!(
4690            evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
4691        );
4692    }
4693
4694    #[test]
4695    fn test_evaluate_condition_compound_and_mismatch() {
4696        let item = cond_item(&[("state", "inactive")]);
4697        let names = cond_names(&[("#s", "state")]);
4698        let values = cond_values(&[(":a", "active"), (":b", "active")]);
4699
4700        // inactive = active AND inactive = active => false
4701        assert!(
4702            evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
4703        );
4704    }
4705
4706    #[test]
4707    fn test_evaluate_condition_compound_or() {
4708        let item = cond_item(&[("state", "running")]);
4709        let names = cond_names(&[("#s", "state")]);
4710        let values = cond_values(&[(":a", "active"), (":b", "idle")]);
4711
4712        // running = active OR running = idle => false
4713        assert!(
4714            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
4715        );
4716
4717        // running = active OR running = running => true
4718        let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
4719        assert!(
4720            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
4721        );
4722    }
4723
4724    #[test]
4725    fn test_evaluate_condition_not_operator() {
4726        let item = cond_item(&[("state", "active")]);
4727        let names = cond_names(&[("#s", "state")]);
4728        let values = cond_values(&[(":c", "complete")]);
4729
4730        // NOT (active = complete) => NOT false => true
4731        assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
4732
4733        // NOT (active <> complete) => NOT true => false
4734        assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
4735
4736        // NOT attribute_exists(#s) on existing item => NOT true => false
4737        assert!(
4738            evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
4739        );
4740
4741        // NOT attribute_exists(#s) on missing item => NOT false => true
4742        assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
4743    }
4744
4745    #[test]
4746    fn test_evaluate_condition_begins_with() {
4747        // After unification, conditions support begins_with via
4748        // evaluate_single_filter_condition (previously only filters had it).
4749        let item = cond_item(&[("name", "fakecloud-dynamodb")]);
4750        let names = cond_names(&[("#n", "name")]);
4751        let values = cond_values(&[(":p", "fakecloud")]);
4752
4753        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
4754
4755        let values2 = cond_values(&[(":p", "realcloud")]);
4756        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
4757    }
4758
4759    #[test]
4760    fn test_evaluate_condition_contains() {
4761        let item = cond_item(&[("tags", "alpha,beta,gamma")]);
4762        let names = cond_names(&[("#t", "tags")]);
4763        let values = cond_values(&[(":v", "beta")]);
4764
4765        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
4766
4767        let values2 = cond_values(&[(":v", "delta")]);
4768        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
4769    }
4770
4771    #[test]
4772    fn test_evaluate_condition_no_existing_item() {
4773        // When no item exists (PutItem with condition), attribute_not_exists
4774        // should succeed and attribute_exists should fail.
4775        let names = cond_names(&[("#s", "state")]);
4776        let values = cond_values(&[(":v", "active")]);
4777
4778        assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
4779        assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
4780        // Comparison against missing item: None != Some(val) => true for <>
4781        assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
4782        // None == Some(val) => false for =
4783        assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
4784    }
4785
4786    #[test]
4787    fn test_evaluate_filter_not_operator() {
4788        let item = cond_item(&[("status", "pending")]);
4789        let names = cond_names(&[("#s", "status")]);
4790        let values = cond_values(&[(":v", "pending")]);
4791
4792        assert!(!evaluate_filter_expression(
4793            "NOT (#s = :v)",
4794            &item,
4795            &names,
4796            &values
4797        ));
4798        assert!(evaluate_filter_expression(
4799            "NOT (#s <> :v)",
4800            &item,
4801            &names,
4802            &values
4803        ));
4804    }
4805
4806    #[test]
4807    fn test_evaluate_filter_expression_in_match() {
4808        // aws-sdk-go v2's expression.Name("state").In(Value("active"), Value("pending"))
4809        // emits "#0 IN (:0, :1)". Before fix: neither evaluate_single_filter_condition
4810        // nor evaluate_single_key_condition handled IN, so the filter leaf fell through
4811        // to the simple-comparison loop, hit no operators, and returned `true` — meaning
4812        // every item matched every IN filter regardless of value.
4813        let item = cond_item(&[("state", "active")]);
4814        let names = cond_names(&[("#s", "state")]);
4815        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4816
4817        assert!(
4818            evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4819            "state=active should match IN (active, pending)"
4820        );
4821    }
4822
4823    #[test]
4824    fn test_evaluate_filter_expression_in_no_match() {
4825        let item = cond_item(&[("state", "complete")]);
4826        let names = cond_names(&[("#s", "state")]);
4827        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4828
4829        assert!(
4830            !evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4831            "state=complete should not match IN (active, pending)"
4832        );
4833    }
4834
4835    #[test]
4836    fn test_evaluate_filter_expression_in_no_spaces() {
4837        // orderbot emits the raw form
4838        //     "#status IN (" + strings.Join(keys, ",") + ")"
4839        // which produces "IN (:v0,:v1,:v2)" — no spaces after commas. Must parse.
4840        let item = cond_item(&[("status", "shipped")]);
4841        let names = cond_names(&[("#s", "status")]);
4842        let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
4843
4844        assert!(
4845            evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
4846            "no-space IN list should still parse"
4847        );
4848    }
4849
4850    #[test]
4851    fn test_evaluate_filter_expression_in_missing_attribute() {
4852        // A missing attribute must not match any IN list — the silent-true
4853        // fallthrough would wrongly accept these items.
4854        let item: HashMap<String, AttributeValue> = HashMap::new();
4855        let names = cond_names(&[("#s", "state")]);
4856        let values = cond_values(&[(":a", "active")]);
4857
4858        assert!(
4859            !evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
4860            "missing attribute should not match any IN list"
4861        );
4862    }
4863
4864    #[test]
4865    fn test_evaluate_filter_expression_compound_in_and_eq() {
4866        // Shape emitted by `Name("state").In(...).And(Name("priority").Equal(...))`:
4867        //     "(#0 IN (:0, :1)) AND (#1 = :2)"
4868        // split_on_and handles the outer parens, but the IN leaf had the
4869        // silent-true fallthrough, so any item with priority=high would match
4870        // regardless of state.
4871        let item = cond_item(&[("state", "active"), ("priority", "high")]);
4872        let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
4873        let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
4874
4875        assert!(
4876            evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
4877            "(active IN (active, pending)) AND (high = high) should match"
4878        );
4879
4880        let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
4881        assert!(
4882            !evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
4883            "(complete IN (active, pending)) AND (high = high) should not match"
4884        );
4885    }
4886
4887    #[test]
4888    fn test_evaluate_condition_attribute_exists_with_space() {
4889        // aws-sdk-go v2's expression.NewBuilder emits function calls with a
4890        // space between the name and the opening paren:
4891        //     "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))"
4892        // Before fix: extract_function_arg used strip_prefix("attribute_exists(")
4893        // with no space, so these fell through the filter leaf entirely and
4894        // hit evaluate_single_key_condition's silent-true fallthrough —
4895        // every conditional write was silently accepted.
4896        let item = cond_item(&[("store_id", "s-1")]);
4897        let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
4898        let values = cond_values(&[(":0", "tab-A")]);
4899
4900        // On an existing item without active_viewer_tab_id: exists(store_id)
4901        // is true, not_exists(active_viewer_tab_id) is true → OK.
4902        assert!(
4903            evaluate_condition(
4904                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4905                Some(&item),
4906                &names,
4907                &values,
4908            )
4909            .is_ok(),
4910            "claim-lease compound on free item should succeed"
4911        );
4912
4913        // On a missing item: exists(store_id) is false → whole AND false → Err.
4914        assert!(
4915            evaluate_condition(
4916                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4917                None,
4918                &names,
4919                &values,
4920            )
4921            .is_err(),
4922            "claim-lease compound on missing item must fail attribute_exists branch"
4923        );
4924
4925        // On an item already held by tab-B: exists ✓, not_exists ✗, #1 = :0 ✗
4926        // → (✓) AND ((✗) OR (✗)) → false → Err.
4927        let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
4928        assert!(
4929            evaluate_condition(
4930                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4931                Some(&held),
4932                &names,
4933                &values,
4934            )
4935            .is_err(),
4936            "claim-lease compound on item held by another tab must fail"
4937        );
4938
4939        // Same tab re-claiming: exists ✓, not_exists ✗, #1 = :0 ✓
4940        // → (✓) AND ((✗) OR (✓)) → true → Ok.
4941        let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
4942        assert!(
4943            evaluate_condition(
4944                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4945                Some(&self_held),
4946                &names,
4947                &values,
4948            )
4949            .is_ok(),
4950            "same-tab re-claim must succeed"
4951        );
4952    }
4953
4954    #[test]
4955    fn test_evaluate_condition_in_match() {
4956        // evaluate_condition delegates to evaluate_filter_expression, so this
4957        // also proves the ConditionExpression path. Before fix: silently Ok.
4958        let item = cond_item(&[("state", "active")]);
4959        let names = cond_names(&[("#s", "state")]);
4960        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4961
4962        assert!(
4963            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
4964            "IN should succeed when actual value is in the list"
4965        );
4966    }
4967
4968    #[test]
4969    fn test_evaluate_condition_in_no_match() {
4970        // Before fix: evaluate_condition silently returned Ok(()) for IN — any
4971        // conditional write was accepted regardless of actual state, the
4972        // opposite of what the caller asked for.
4973        let item = cond_item(&[("state", "complete")]);
4974        let names = cond_names(&[("#s", "state")]);
4975        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4976
4977        assert!(
4978            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
4979            "IN should fail when actual value is not in the list"
4980        );
4981    }
4982
4983    #[test]
4984    fn test_apply_update_set_list_index_replaces_existing() {
4985        // Shape emitted by orderbot's order-item update retry loop:
4986        //     UpdateExpression: fmt.Sprintf("SET #items[%d] = :item", index)
4987        // Before fix: apply_set_assignment called resolve_attr_name on the
4988        // whole "#items[0]" token, which misses the name map, and then
4989        // item.insert("#items[0]", :item), producing a top-level key
4990        // literally named "#items[0]" rather than mutating the list.
4991        let mut item = HashMap::new();
4992        item.insert(
4993            "items".to_string(),
4994            json!({"L": [
4995                {"M": {"sku": {"S": "OLD-A"}}},
4996                {"M": {"sku": {"S": "OLD-B"}}},
4997            ]}),
4998        );
4999
5000        let names = cond_names(&[("#items", "items")]);
5001        let mut values = HashMap::new();
5002        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
5003
5004        apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
5005
5006        let items_list = item
5007            .get("items")
5008            .and_then(|v| v.get("L"))
5009            .and_then(|v| v.as_array())
5010            .expect("items should still be a list");
5011        assert_eq!(items_list.len(), 2, "list length should be unchanged");
5012        let sku0 = items_list[0]
5013            .get("M")
5014            .and_then(|m| m.get("sku"))
5015            .and_then(|s| s.get("S"))
5016            .and_then(|s| s.as_str());
5017        assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
5018        let sku1 = items_list[1]
5019            .get("M")
5020            .and_then(|m| m.get("sku"))
5021            .and_then(|s| s.get("S"))
5022            .and_then(|s| s.as_str());
5023        assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
5024
5025        assert!(!item.contains_key("items[0]"));
5026        assert!(!item.contains_key("#items[0]"));
5027    }
5028
5029    #[test]
5030    fn test_apply_update_set_list_index_second_slot() {
5031        let mut item = HashMap::new();
5032        item.insert(
5033            "items".to_string(),
5034            json!({"L": [
5035                {"M": {"sku": {"S": "A"}}},
5036                {"M": {"sku": {"S": "B"}}},
5037                {"M": {"sku": {"S": "C"}}},
5038            ]}),
5039        );
5040
5041        let names = cond_names(&[("#items", "items")]);
5042        let mut values = HashMap::new();
5043        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
5044
5045        apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
5046
5047        let items_list = item
5048            .get("items")
5049            .and_then(|v| v.get("L"))
5050            .and_then(|v| v.as_array())
5051            .unwrap();
5052        let skus: Vec<&str> = items_list
5053            .iter()
5054            .map(|v| {
5055                v.get("M")
5056                    .and_then(|m| m.get("sku"))
5057                    .and_then(|s| s.get("S"))
5058                    .and_then(|s| s.as_str())
5059                    .unwrap()
5060            })
5061            .collect();
5062        assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
5063    }
5064
5065    #[test]
5066    fn test_apply_update_set_list_index_without_name_ref() {
5067        // Same fix must also work when the LHS is a literal attribute name,
5068        // not an expression attribute name ref.
5069        let mut item = HashMap::new();
5070        item.insert(
5071            "tags".to_string(),
5072            json!({"L": [{"S": "red"}, {"S": "blue"}]}),
5073        );
5074
5075        let names: HashMap<String, String> = HashMap::new();
5076        let mut values = HashMap::new();
5077        values.insert(":t".to_string(), json!({"S": "green"}));
5078
5079        apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
5080
5081        let tags = item
5082            .get("tags")
5083            .and_then(|v| v.get("L"))
5084            .and_then(|v| v.as_array())
5085            .unwrap();
5086        assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
5087        assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
5088    }
5089
5090    #[test]
5091    fn test_list_append_into_empty_list() {
5092        // Regression: UpdateItem with `SET #0 = list_append(#0, :0)` where
5093        // the attribute already exists as an empty list silently no-oped.
5094        // Root cause: parse_update_clauses split `list_append(#0, :0)` at
5095        // the inner comma, so apply_set_list_append received a truncated
5096        // `rest` with no closing ')' and returned early without writing.
5097        let mut item = HashMap::new();
5098        item.insert("files".to_string(), json!({"L": []}));
5099
5100        let names = cond_names(&[("#0", "files")]);
5101        let mut values = HashMap::new();
5102        values.insert(
5103            ":0".to_string(),
5104            json!({"L": [{"M": {"field": {"S": "value"}}}]}),
5105        );
5106
5107        apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
5108            .unwrap();
5109
5110        let list = item
5111            .get("files")
5112            .and_then(|v| v.get("L"))
5113            .and_then(|v| v.as_array())
5114            .expect("files should be an L-typed attribute");
5115        assert_eq!(list.len(), 1, "one element should have been appended");
5116    }
5117
5118    #[test]
5119    fn test_list_append_into_nonempty_list() {
5120        // Verifies the same fix works when the existing list already has elements.
5121        let mut item = HashMap::new();
5122        item.insert(
5123            "files".to_string(),
5124            json!({"L": [{"M": {"field": {"S": "existing"}}}]}),
5125        );
5126
5127        let names = cond_names(&[("#0", "files")]);
5128        let mut values = HashMap::new();
5129        values.insert(
5130            ":0".to_string(),
5131            json!({"L": [{"M": {"field": {"S": "new"}}}]}),
5132        );
5133
5134        apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
5135            .unwrap();
5136
5137        let list = item
5138            .get("files")
5139            .and_then(|v| v.get("L"))
5140            .and_then(|v| v.as_array())
5141            .expect("files should be an L-typed attribute");
5142        assert_eq!(list.len(), 2, "existing element plus one new element");
5143    }
5144
5145    #[test]
5146    fn test_list_append_combined_with_plain_set() {
5147        // Verifies that a mixed expression like
5148        // `SET #a = list_append(#a, :v), #b = :other` correctly applies
5149        // both assignments after the paren-aware comma split fix.
5150        let mut item = HashMap::new();
5151        item.insert("logs".to_string(), json!({"L": []}));
5152        item.insert("count".to_string(), json!({"N": "0"}));
5153
5154        let names = cond_names(&[("#a", "logs"), ("#b", "count")]);
5155        let mut values = HashMap::new();
5156        values.insert(":v".to_string(), json!({"L": [{"S": "entry"}]}));
5157        values.insert(":other".to_string(), json!({"N": "1"}));
5158
5159        apply_update_expression(
5160            &mut item,
5161            "SET #a = list_append(#a, :v), #b = :other",
5162            &names,
5163            &values,
5164        )
5165        .unwrap();
5166
5167        let list = item
5168            .get("logs")
5169            .and_then(|v| v.get("L"))
5170            .and_then(|v| v.as_array())
5171            .expect("logs should be an L-typed attribute");
5172        assert_eq!(list.len(), 1, "one log entry appended");
5173
5174        let count = item
5175            .get("count")
5176            .and_then(|v| v.get("N"))
5177            .and_then(|v| v.as_str())
5178            .expect("count should be an N-typed attribute");
5179        assert_eq!(count, "1", "count updated to 1");
5180    }
5181
5182    #[test]
5183    fn test_unrecognized_expression_returns_false() {
5184        // evaluate_single_key_condition must fail-closed: an expression shape
5185        // it doesn't recognize should return false (reject), not true (accept).
5186        let item = cond_item(&[("x", "1")]);
5187        let names: HashMap<String, String> = HashMap::new();
5188        let values: HashMap<String, Value> = HashMap::new();
5189
5190        assert!(
5191            !evaluate_single_key_condition("GARBAGE NONSENSE", &item, &names, &values),
5192            "unrecognized expression must return false"
5193        );
5194    }
5195
5196    #[test]
5197    fn test_set_list_index_out_of_range_returns_error() {
5198        // SET list[N] where N > len must return a ValidationException,
5199        // not silently no-op.
5200        let mut item = HashMap::new();
5201        item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
5202
5203        let names: HashMap<String, String> = HashMap::new();
5204        let mut values = HashMap::new();
5205        values.insert(":v".to_string(), json!({"S": "z"}));
5206
5207        let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
5208        assert!(
5209            result.is_err(),
5210            "out-of-range list index must return an error"
5211        );
5212
5213        // List should be unchanged
5214        let list = item
5215            .get("items")
5216            .and_then(|v| v.get("L"))
5217            .and_then(|v| v.as_array())
5218            .unwrap();
5219        assert_eq!(list.len(), 2);
5220    }
5221
5222    #[test]
5223    fn test_set_list_index_on_non_list_returns_error() {
5224        // SET attr[0] = :v where attr is a string (not a list) must return
5225        // a ValidationException.
5226        let mut item = HashMap::new();
5227        item.insert("name".to_string(), json!({"S": "hello"}));
5228
5229        let names: HashMap<String, String> = HashMap::new();
5230        let mut values = HashMap::new();
5231        values.insert(":v".to_string(), json!({"S": "z"}));
5232
5233        let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
5234        assert!(
5235            result.is_err(),
5236            "list index on non-list attribute must return an error"
5237        );
5238    }
5239
5240    #[test]
5241    fn test_unrecognized_update_action_returns_error() {
5242        let mut item = HashMap::new();
5243        item.insert("name".to_string(), json!({"S": "hello"}));
5244
5245        let names: HashMap<String, String> = HashMap::new();
5246        let mut values = HashMap::new();
5247        values.insert(":bar".to_string(), json!({"S": "baz"}));
5248
5249        let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
5250        assert!(
5251            result.is_err(),
5252            "unrecognized UpdateExpression action must return an error"
5253        );
5254        let err_msg = format!("{}", result.unwrap_err());
5255        assert!(
5256            err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
5257            "error should mention Invalid UpdateExpression, got: {err_msg}"
5258        );
5259    }
5260
5261    // ── size() function tests ──────────────────────────────────────────
5262
5263    #[test]
5264    fn test_size_string() {
5265        let mut item = HashMap::new();
5266        item.insert("name".to_string(), json!({"S": "hello"}));
5267        let names = HashMap::new();
5268        let mut values = HashMap::new();
5269        values.insert(":limit".to_string(), json!({"N": "5"}));
5270
5271        assert!(evaluate_single_filter_condition(
5272            "size(name) = :limit",
5273            &item,
5274            &names,
5275            &values,
5276        ));
5277        values.insert(":limit".to_string(), json!({"N": "4"}));
5278        assert!(evaluate_single_filter_condition(
5279            "size(name) > :limit",
5280            &item,
5281            &names,
5282            &values,
5283        ));
5284    }
5285
5286    #[test]
5287    fn test_size_list() {
5288        let mut item = HashMap::new();
5289        item.insert(
5290            "items".to_string(),
5291            json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
5292        );
5293        let names = HashMap::new();
5294        let mut values = HashMap::new();
5295        values.insert(":limit".to_string(), json!({"N": "3"}));
5296
5297        assert!(evaluate_single_filter_condition(
5298            "size(items) = :limit",
5299            &item,
5300            &names,
5301            &values,
5302        ));
5303    }
5304
5305    #[test]
5306    fn test_size_map() {
5307        let mut item = HashMap::new();
5308        item.insert(
5309            "data".to_string(),
5310            json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
5311        );
5312        let names = HashMap::new();
5313        let mut values = HashMap::new();
5314        values.insert(":limit".to_string(), json!({"N": "2"}));
5315
5316        assert!(evaluate_single_filter_condition(
5317            "size(data) = :limit",
5318            &item,
5319            &names,
5320            &values,
5321        ));
5322    }
5323
5324    #[test]
5325    fn test_size_set() {
5326        let mut item = HashMap::new();
5327        item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
5328        let names = HashMap::new();
5329        let mut values = HashMap::new();
5330        values.insert(":limit".to_string(), json!({"N": "3"}));
5331
5332        assert!(evaluate_single_filter_condition(
5333            "size(tags) > :limit",
5334            &item,
5335            &names,
5336            &values,
5337        ));
5338    }
5339
5340    // ── attribute_type() function tests ────────────────────────────────
5341
5342    #[test]
5343    fn test_attribute_type_string() {
5344        let mut item = HashMap::new();
5345        item.insert("name".to_string(), json!({"S": "hello"}));
5346        let names = HashMap::new();
5347        let mut values = HashMap::new();
5348        values.insert(":t".to_string(), json!({"S": "S"}));
5349
5350        assert!(evaluate_single_filter_condition(
5351            "attribute_type(name, :t)",
5352            &item,
5353            &names,
5354            &values,
5355        ));
5356
5357        values.insert(":t".to_string(), json!({"S": "N"}));
5358        assert!(!evaluate_single_filter_condition(
5359            "attribute_type(name, :t)",
5360            &item,
5361            &names,
5362            &values,
5363        ));
5364    }
5365
5366    #[test]
5367    fn test_attribute_type_number() {
5368        let mut item = HashMap::new();
5369        item.insert("age".to_string(), json!({"N": "42"}));
5370        let names = HashMap::new();
5371        let mut values = HashMap::new();
5372        values.insert(":t".to_string(), json!({"S": "N"}));
5373
5374        assert!(evaluate_single_filter_condition(
5375            "attribute_type(age, :t)",
5376            &item,
5377            &names,
5378            &values,
5379        ));
5380    }
5381
5382    #[test]
5383    fn test_attribute_type_list() {
5384        let mut item = HashMap::new();
5385        item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
5386        let names = HashMap::new();
5387        let mut values = HashMap::new();
5388        values.insert(":t".to_string(), json!({"S": "L"}));
5389
5390        assert!(evaluate_single_filter_condition(
5391            "attribute_type(items, :t)",
5392            &item,
5393            &names,
5394            &values,
5395        ));
5396    }
5397
5398    #[test]
5399    fn test_attribute_type_map() {
5400        let mut item = HashMap::new();
5401        item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
5402        let names = HashMap::new();
5403        let mut values = HashMap::new();
5404        values.insert(":t".to_string(), json!({"S": "M"}));
5405
5406        assert!(evaluate_single_filter_condition(
5407            "attribute_type(data, :t)",
5408            &item,
5409            &names,
5410            &values,
5411        ));
5412    }
5413
5414    #[test]
5415    fn test_attribute_type_bool() {
5416        let mut item = HashMap::new();
5417        item.insert("active".to_string(), json!({"BOOL": true}));
5418        let names = HashMap::new();
5419        let mut values = HashMap::new();
5420        values.insert(":t".to_string(), json!({"S": "BOOL"}));
5421
5422        assert!(evaluate_single_filter_condition(
5423            "attribute_type(active, :t)",
5424            &item,
5425            &names,
5426            &values,
5427        ));
5428    }
5429
5430    // ── begins_with rejects non-string types ───────────────────────────
5431
5432    #[test]
5433    fn test_begins_with_rejects_number_type() {
5434        let mut item = HashMap::new();
5435        item.insert("code".to_string(), json!({"N": "12345"}));
5436        let names = HashMap::new();
5437        let mut values = HashMap::new();
5438        values.insert(":prefix".to_string(), json!({"S": "123"}));
5439
5440        assert!(
5441            !evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
5442            "begins_with must return false for N-type attributes"
5443        );
5444    }
5445
5446    #[test]
5447    fn test_begins_with_works_on_string_type() {
5448        let mut item = HashMap::new();
5449        item.insert("code".to_string(), json!({"S": "abc123"}));
5450        let names = HashMap::new();
5451        let mut values = HashMap::new();
5452        values.insert(":prefix".to_string(), json!({"S": "abc"}));
5453
5454        assert!(evaluate_single_filter_condition(
5455            "begins_with(code, :prefix)",
5456            &item,
5457            &names,
5458            &values,
5459        ));
5460    }
5461
5462    // ── contains on sets ───────────────────────────────────────────────
5463
5464    #[test]
5465    fn test_contains_string_set() {
5466        let mut item = HashMap::new();
5467        item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
5468        let names = HashMap::new();
5469        let mut values = HashMap::new();
5470        values.insert(":val".to_string(), json!({"S": "blue"}));
5471
5472        assert!(evaluate_single_filter_condition(
5473            "contains(tags, :val)",
5474            &item,
5475            &names,
5476            &values,
5477        ));
5478
5479        values.insert(":val".to_string(), json!({"S": "yellow"}));
5480        assert!(!evaluate_single_filter_condition(
5481            "contains(tags, :val)",
5482            &item,
5483            &names,
5484            &values,
5485        ));
5486    }
5487
5488    #[test]
5489    fn test_contains_number_set() {
5490        let mut item = HashMap::new();
5491        item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
5492        let names = HashMap::new();
5493        let mut values = HashMap::new();
5494        values.insert(":val".to_string(), json!({"N": "2"}));
5495
5496        assert!(evaluate_single_filter_condition(
5497            "contains(scores, :val)",
5498            &item,
5499            &names,
5500            &values,
5501        ));
5502    }
5503
5504    // ── SET arithmetic type validation ─────────────────────────────────
5505
5506    #[test]
5507    fn test_set_arithmetic_rejects_string_operand() {
5508        let mut item = HashMap::new();
5509        item.insert("name".to_string(), json!({"S": "hello"}));
5510        let names = HashMap::new();
5511        let mut values = HashMap::new();
5512        values.insert(":val".to_string(), json!({"N": "1"}));
5513
5514        let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
5515        assert!(
5516            result.is_err(),
5517            "arithmetic on S-type attribute must return a ValidationException"
5518        );
5519    }
5520
5521    #[test]
5522    fn test_set_arithmetic_rejects_string_value() {
5523        let mut item = HashMap::new();
5524        item.insert("count".to_string(), json!({"N": "5"}));
5525        let names = HashMap::new();
5526        let mut values = HashMap::new();
5527        values.insert(":val".to_string(), json!({"S": "notanumber"}));
5528
5529        let result =
5530            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5531        assert!(
5532            result.is_err(),
5533            "arithmetic with S-type value must return a ValidationException"
5534        );
5535    }
5536
5537    #[test]
5538    fn test_set_arithmetic_valid_numbers() {
5539        let mut item = HashMap::new();
5540        item.insert("count".to_string(), json!({"N": "10"}));
5541        let names = HashMap::new();
5542        let mut values = HashMap::new();
5543        values.insert(":val".to_string(), json!({"N": "3"}));
5544
5545        let result =
5546            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5547        assert!(result.is_ok());
5548        assert_eq!(item["count"], json!({"N": "13"}));
5549    }
5550
5551    // ── Binary Set (BS) support in ADD/DELETE ──────────────────────────
5552
5553    #[test]
5554    fn test_add_binary_set() {
5555        let mut item = HashMap::new();
5556        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
5557        let names = HashMap::new();
5558        let mut values = HashMap::new();
5559        values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
5560
5561        let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
5562        assert!(result.is_ok());
5563        let bs = item["data"]["BS"].as_array().unwrap();
5564        assert_eq!(bs.len(), 3, "should merge sets without duplicates");
5565        assert!(bs.contains(&json!("YQ==")));
5566        assert!(bs.contains(&json!("Yg==")));
5567        assert!(bs.contains(&json!("Yw==")));
5568    }
5569
5570    #[test]
5571    fn test_delete_binary_set() {
5572        let mut item = HashMap::new();
5573        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
5574        let names = HashMap::new();
5575        let mut values = HashMap::new();
5576        values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
5577
5578        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5579        assert!(result.is_ok());
5580        let bs = item["data"]["BS"].as_array().unwrap();
5581        assert_eq!(bs.len(), 2);
5582        assert!(!bs.contains(&json!("Yg==")));
5583    }
5584
5585    #[test]
5586    fn test_delete_binary_set_removes_attr_when_empty() {
5587        let mut item = HashMap::new();
5588        item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
5589        let names = HashMap::new();
5590        let mut values = HashMap::new();
5591        values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
5592
5593        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5594        assert!(result.is_ok());
5595        assert!(
5596            !item.contains_key("data"),
5597            "attribute should be removed when set becomes empty"
5598        );
5599    }
5600
5601    fn body_json(resp: &AwsResponse) -> Value {
5602        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
5603    }
5604
5605    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
5606        match result {
5607            Err(e) => e,
5608            Ok(_) => panic!("expected error, got Ok"),
5609        }
5610    }
5611
5612    // ── CreateTable ──
5613
5614    #[test]
5615    fn create_table_basic() {
5616        let svc = make_service();
5617        let req = make_request(
5618            "CreateTable",
5619            json!({
5620                "TableName": "my-table",
5621                "KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
5622                "AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"}],
5623                "BillingMode": "PAY_PER_REQUEST",
5624            }),
5625        );
5626        let resp = svc.create_table(&req).unwrap();
5627        let b = body_json(&resp);
5628        assert_eq!(b["TableDescription"]["TableName"], "my-table");
5629        assert_eq!(b["TableDescription"]["TableStatus"], "ACTIVE");
5630        assert!(b["TableDescription"]["TableArn"].as_str().is_some());
5631    }
5632
5633    #[test]
5634    fn create_table_with_sort_key_and_gsi() {
5635        let svc = make_service();
5636        let req = make_request(
5637            "CreateTable",
5638            json!({
5639                "TableName": "gsi-table",
5640                "KeySchema": [
5641                    {"AttributeName": "pk", "KeyType": "HASH"},
5642                    {"AttributeName": "sk", "KeyType": "RANGE"},
5643                ],
5644                "AttributeDefinitions": [
5645                    {"AttributeName": "pk", "AttributeType": "S"},
5646                    {"AttributeName": "sk", "AttributeType": "S"},
5647                    {"AttributeName": "gsi_key", "AttributeType": "N"},
5648                ],
5649                "GlobalSecondaryIndexes": [{
5650                    "IndexName": "gsi1",
5651                    "KeySchema": [{"AttributeName": "gsi_key", "KeyType": "HASH"}],
5652                    "Projection": {"ProjectionType": "ALL"},
5653                }],
5654                "BillingMode": "PAY_PER_REQUEST",
5655            }),
5656        );
5657        let resp = svc.create_table(&req).unwrap();
5658        let b = body_json(&resp);
5659        let gsi = b["TableDescription"]["GlobalSecondaryIndexes"]
5660            .as_array()
5661            .unwrap();
5662        assert_eq!(gsi.len(), 1);
5663        assert_eq!(gsi[0]["IndexName"], "gsi1");
5664    }
5665
5666    #[test]
5667    fn create_table_duplicate_fails() {
5668        let svc = make_service();
5669        create_test_table(&svc);
5670
5671        let req = make_request(
5672            "CreateTable",
5673            json!({
5674                "TableName": "test-table",
5675                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
5676                "AttributeDefinitions": [{"AttributeName": "pk", "AttributeType": "S"}],
5677                "BillingMode": "PAY_PER_REQUEST",
5678            }),
5679        );
5680        let err = expect_err(svc.create_table(&req));
5681        assert!(err.to_string().contains("ResourceInUseException"));
5682    }
5683
5684    #[test]
5685    fn create_table_missing_key_attr_in_definitions() {
5686        let svc = make_service();
5687        let req = make_request(
5688            "CreateTable",
5689            json!({
5690                "TableName": "bad",
5691                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
5692                "AttributeDefinitions": [{"AttributeName": "other", "AttributeType": "S"}],
5693                "BillingMode": "PAY_PER_REQUEST",
5694            }),
5695        );
5696        let err = expect_err(svc.create_table(&req));
5697        assert!(err.to_string().contains("ValidationException"));
5698    }
5699
5700    // ── DescribeTable ──
5701
5702    #[test]
5703    fn describe_table_found() {
5704        let svc = make_service();
5705        create_test_table(&svc);
5706
5707        let req = make_request("DescribeTable", json!({"TableName": "test-table"}));
5708        let resp = svc.describe_table(&req).unwrap();
5709        let b = body_json(&resp);
5710        assert_eq!(b["Table"]["TableName"], "test-table");
5711        assert_eq!(b["Table"]["TableStatus"], "ACTIVE");
5712    }
5713
5714    #[test]
5715    fn describe_table_not_found() {
5716        let svc = make_service();
5717        let req = make_request("DescribeTable", json!({"TableName": "nope"}));
5718        let err = expect_err(svc.describe_table(&req));
5719        assert!(err.to_string().contains("ResourceNotFoundException"));
5720    }
5721
5722    // ── DeleteTable ──
5723
5724    #[test]
5725    fn delete_table_removes_table() {
5726        let svc = make_service();
5727        create_test_table(&svc);
5728
5729        let req = make_request("DeleteTable", json!({"TableName": "test-table"}));
5730        let resp = svc.delete_table(&req).unwrap();
5731        let b = body_json(&resp);
5732        assert_eq!(b["TableDescription"]["TableName"], "test-table");
5733
5734        // Should be gone
5735        let req = make_request("DescribeTable", json!({"TableName": "test-table"}));
5736        assert!(svc.describe_table(&req).is_err());
5737    }
5738
5739    // ── ListTables ──
5740
5741    #[test]
5742    fn list_tables_returns_names() {
5743        let svc = make_service();
5744        create_test_table(&svc);
5745
5746        let req = make_request("ListTables", json!({}));
5747        let resp = svc.list_tables(&req).unwrap();
5748        let b = body_json(&resp);
5749        let names = b["TableNames"].as_array().unwrap();
5750        assert!(names.iter().any(|n| n == "test-table"));
5751    }
5752
5753    // ── PutItem / GetItem / DeleteItem ──
5754
5755    #[test]
5756    fn put_and_get_item() {
5757        let svc = make_service();
5758        create_test_table(&svc);
5759
5760        let req = make_request(
5761            "PutItem",
5762            json!({
5763                "TableName": "test-table",
5764                "Item": {
5765                    "pk": {"S": "key1"},
5766                    "name": {"S": "Alice"},
5767                    "age": {"N": "30"},
5768                },
5769            }),
5770        );
5771        svc.put_item(&req).unwrap();
5772
5773        let req = make_request(
5774            "GetItem",
5775            json!({
5776                "TableName": "test-table",
5777                "Key": {"pk": {"S": "key1"}},
5778            }),
5779        );
5780        let resp = svc.get_item(&req).unwrap();
5781        let b = body_json(&resp);
5782        assert_eq!(b["Item"]["name"]["S"], "Alice");
5783        assert_eq!(b["Item"]["age"]["N"], "30");
5784    }
5785
5786    #[test]
5787    fn get_item_not_found() {
5788        let svc = make_service();
5789        create_test_table(&svc);
5790
5791        let req = make_request(
5792            "GetItem",
5793            json!({
5794                "TableName": "test-table",
5795                "Key": {"pk": {"S": "nonexistent"}},
5796            }),
5797        );
5798        let resp = svc.get_item(&req).unwrap();
5799        let b = body_json(&resp);
5800        assert!(b.get("Item").is_none() || b["Item"].is_null());
5801    }
5802
5803    #[test]
5804    fn delete_item_removes_item() {
5805        let svc = make_service();
5806        create_test_table(&svc);
5807
5808        let req = make_request(
5809            "PutItem",
5810            json!({
5811                "TableName": "test-table",
5812                "Item": {"pk": {"S": "del-me"}},
5813            }),
5814        );
5815        svc.put_item(&req).unwrap();
5816
5817        let req = make_request(
5818            "DeleteItem",
5819            json!({
5820                "TableName": "test-table",
5821                "Key": {"pk": {"S": "del-me"}},
5822            }),
5823        );
5824        svc.delete_item(&req).unwrap();
5825
5826        let req = make_request(
5827            "GetItem",
5828            json!({
5829                "TableName": "test-table",
5830                "Key": {"pk": {"S": "del-me"}},
5831            }),
5832        );
5833        let resp = svc.get_item(&req).unwrap();
5834        let b = body_json(&resp);
5835        assert!(b.get("Item").is_none() || b["Item"].is_null());
5836    }
5837
5838    #[test]
5839    fn put_item_returns_old_item() {
5840        let svc = make_service();
5841        create_test_table(&svc);
5842
5843        let req = make_request(
5844            "PutItem",
5845            json!({
5846                "TableName": "test-table",
5847                "Item": {"pk": {"S": "overwrite"}, "v": {"N": "1"}},
5848            }),
5849        );
5850        svc.put_item(&req).unwrap();
5851
5852        let req = make_request(
5853            "PutItem",
5854            json!({
5855                "TableName": "test-table",
5856                "Item": {"pk": {"S": "overwrite"}, "v": {"N": "2"}},
5857                "ReturnValues": "ALL_OLD",
5858            }),
5859        );
5860        let resp = svc.put_item(&req).unwrap();
5861        let b = body_json(&resp);
5862        assert_eq!(b["Attributes"]["v"]["N"], "1");
5863    }
5864
5865    // ── UpdateItem ──
5866
5867    #[test]
5868    fn update_item_set_attribute() {
5869        let svc = make_service();
5870        create_test_table(&svc);
5871
5872        let req = make_request(
5873            "PutItem",
5874            json!({
5875                "TableName": "test-table",
5876                "Item": {"pk": {"S": "upd"}, "count": {"N": "0"}},
5877            }),
5878        );
5879        svc.put_item(&req).unwrap();
5880
5881        let req = make_request(
5882            "UpdateItem",
5883            json!({
5884                "TableName": "test-table",
5885                "Key": {"pk": {"S": "upd"}},
5886                "UpdateExpression": "SET #c = :val",
5887                "ExpressionAttributeNames": {"#c": "count"},
5888                "ExpressionAttributeValues": {":val": {"N": "42"}},
5889                "ReturnValues": "ALL_NEW",
5890            }),
5891        );
5892        let resp = svc.update_item(&req).unwrap();
5893        let b = body_json(&resp);
5894        assert_eq!(b["Attributes"]["count"]["N"], "42");
5895    }
5896
5897    // ── Query ──
5898
5899    #[test]
5900    fn query_returns_matching_items() {
5901        let svc = make_service();
5902        // Table with hash+range
5903        let req = make_request(
5904            "CreateTable",
5905            json!({
5906                "TableName": "query-table",
5907                "KeySchema": [
5908                    {"AttributeName": "pk", "KeyType": "HASH"},
5909                    {"AttributeName": "sk", "KeyType": "RANGE"},
5910                ],
5911                "AttributeDefinitions": [
5912                    {"AttributeName": "pk", "AttributeType": "S"},
5913                    {"AttributeName": "sk", "AttributeType": "S"},
5914                ],
5915                "BillingMode": "PAY_PER_REQUEST",
5916            }),
5917        );
5918        svc.create_table(&req).unwrap();
5919
5920        for i in 0..3 {
5921            let req = make_request(
5922                "PutItem",
5923                json!({
5924                    "TableName": "query-table",
5925                    "Item": {
5926                        "pk": {"S": "user1"},
5927                        "sk": {"S": format!("item-{i}")},
5928                    },
5929                }),
5930            );
5931            svc.put_item(&req).unwrap();
5932        }
5933        // Different partition
5934        let req = make_request(
5935            "PutItem",
5936            json!({
5937                "TableName": "query-table",
5938                "Item": {"pk": {"S": "user2"}, "sk": {"S": "item-0"}},
5939            }),
5940        );
5941        svc.put_item(&req).unwrap();
5942
5943        let req = make_request(
5944            "Query",
5945            json!({
5946                "TableName": "query-table",
5947                "KeyConditionExpression": "pk = :pk",
5948                "ExpressionAttributeValues": {":pk": {"S": "user1"}},
5949            }),
5950        );
5951        let resp = svc.query(&req).unwrap();
5952        let b = body_json(&resp);
5953        assert_eq!(b["Count"], 3);
5954        assert_eq!(b["Items"].as_array().unwrap().len(), 3);
5955    }
5956
5957    // ── Scan ──
5958
5959    #[test]
5960    fn scan_returns_all_items() {
5961        let svc = make_service();
5962        create_test_table(&svc);
5963
5964        for i in 0..5 {
5965            let req = make_request(
5966                "PutItem",
5967                json!({
5968                    "TableName": "test-table",
5969                    "Item": {"pk": {"S": format!("scan-{i}")}},
5970                }),
5971            );
5972            svc.put_item(&req).unwrap();
5973        }
5974
5975        let req = make_request("Scan", json!({"TableName": "test-table"}));
5976        let resp = svc.scan(&req).unwrap();
5977        let b = body_json(&resp);
5978        assert_eq!(b["Count"], 5);
5979    }
5980
5981    // ── BatchWriteItem / BatchGetItem ──
5982
5983    #[test]
5984    fn batch_write_and_get_items() {
5985        let svc = make_service();
5986        create_test_table(&svc);
5987
5988        let req = make_request(
5989            "BatchWriteItem",
5990            json!({
5991                "RequestItems": {
5992                    "test-table": [
5993                        {"PutRequest": {"Item": {"pk": {"S": "b1"}, "val": {"S": "v1"}}}},
5994                        {"PutRequest": {"Item": {"pk": {"S": "b2"}, "val": {"S": "v2"}}}},
5995                        {"PutRequest": {"Item": {"pk": {"S": "b3"}, "val": {"S": "v3"}}}},
5996                    ]
5997                }
5998            }),
5999        );
6000        let resp = svc.batch_write_item(&req).unwrap();
6001        let b = body_json(&resp);
6002        // Unprocessed should be empty
6003        assert!(
6004            b["UnprocessedItems"].as_object().unwrap().is_empty()
6005                || b["UnprocessedItems"]["test-table"]
6006                    .as_array()
6007                    .is_none_or(|a| a.is_empty())
6008        );
6009
6010        // BatchGetItem
6011        let req = make_request(
6012            "BatchGetItem",
6013            json!({
6014                "RequestItems": {
6015                    "test-table": {
6016                        "Keys": [
6017                            {"pk": {"S": "b1"}},
6018                            {"pk": {"S": "b2"}},
6019                            {"pk": {"S": "b3"}},
6020                        ]
6021                    }
6022                }
6023            }),
6024        );
6025        let resp = svc.batch_get_item(&req).unwrap();
6026        let b = body_json(&resp);
6027        let items = b["Responses"]["test-table"].as_array().unwrap();
6028        assert_eq!(items.len(), 3);
6029    }
6030
6031    // ── TransactWriteItems / TransactGetItems ──
6032
6033    #[test]
6034    fn transact_write_and_get() {
6035        let svc = make_service();
6036        create_test_table(&svc);
6037
6038        let req = make_request(
6039            "TransactWriteItems",
6040            json!({
6041                "TransactItems": [
6042                    {"Put": {"TableName": "test-table", "Item": {"pk": {"S": "tx1"}}}},
6043                    {"Put": {"TableName": "test-table", "Item": {"pk": {"S": "tx2"}}}},
6044                ]
6045            }),
6046        );
6047        svc.transact_write_items(&req).unwrap();
6048
6049        let req = make_request(
6050            "TransactGetItems",
6051            json!({
6052                "TransactItems": [
6053                    {"Get": {"TableName": "test-table", "Key": {"pk": {"S": "tx1"}}}},
6054                    {"Get": {"TableName": "test-table", "Key": {"pk": {"S": "tx2"}}}},
6055                ]
6056            }),
6057        );
6058        let resp = svc.transact_get_items(&req).unwrap();
6059        let b = body_json(&resp);
6060        let responses = b["Responses"].as_array().unwrap();
6061        assert_eq!(responses.len(), 2);
6062    }
6063
6064    // ── TagResource / UntagResource / ListTagsOfResource ──
6065
6066    #[test]
6067    fn tag_operations() {
6068        let svc = make_service();
6069        create_test_table(&svc);
6070        let arn = {
6071            let s = svc.state.read();
6072            s.default_ref()
6073                .tables
6074                .get("test-table")
6075                .unwrap()
6076                .arn
6077                .clone()
6078        };
6079
6080        let req = make_request(
6081            "TagResource",
6082            json!({
6083                "ResourceArn": arn,
6084                "Tags": [{"Key": "env", "Value": "test"}],
6085            }),
6086        );
6087        svc.tag_resource(&req).unwrap();
6088
6089        let req = make_request("ListTagsOfResource", json!({"ResourceArn": arn}));
6090        let resp = svc.list_tags_of_resource(&req).unwrap();
6091        let b = body_json(&resp);
6092        let tags = b["Tags"].as_array().unwrap();
6093        assert_eq!(tags.len(), 1);
6094        assert_eq!(tags[0]["Key"], "env");
6095
6096        let req = make_request(
6097            "UntagResource",
6098            json!({
6099                "ResourceArn": arn,
6100                "TagKeys": ["env"],
6101            }),
6102        );
6103        svc.untag_resource(&req).unwrap();
6104
6105        let req = make_request("ListTagsOfResource", json!({"ResourceArn": arn}));
6106        let resp = svc.list_tags_of_resource(&req).unwrap();
6107        let b = body_json(&resp);
6108        assert!(b["Tags"].as_array().unwrap().is_empty());
6109    }
6110
6111    // ── UpdateTable ──
6112
6113    #[test]
6114    fn update_table_add_gsi() {
6115        let svc = make_service();
6116        let req = make_request(
6117            "CreateTable",
6118            json!({
6119                "TableName": "upd-table",
6120                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
6121                "AttributeDefinitions": [
6122                    {"AttributeName": "pk", "AttributeType": "S"},
6123                    {"AttributeName": "gk", "AttributeType": "S"},
6124                ],
6125                "BillingMode": "PAY_PER_REQUEST",
6126            }),
6127        );
6128        svc.create_table(&req).unwrap();
6129
6130        let req = make_request(
6131            "UpdateTable",
6132            json!({
6133                "TableName": "upd-table",
6134                "GlobalSecondaryIndexUpdates": [{
6135                    "Create": {
6136                        "IndexName": "new-gsi",
6137                        "KeySchema": [{"AttributeName": "gk", "KeyType": "HASH"}],
6138                        "Projection": {"ProjectionType": "ALL"},
6139                    }
6140                }],
6141            }),
6142        );
6143        let resp = svc.update_table(&req).unwrap();
6144        let b = body_json(&resp);
6145        let gsi = b["TableDescription"]["GlobalSecondaryIndexes"]
6146            .as_array()
6147            .unwrap();
6148        assert_eq!(gsi.len(), 1);
6149        assert_eq!(gsi[0]["IndexName"], "new-gsi");
6150    }
6151
6152    // ── Scan with FilterExpression ──
6153
6154    #[test]
6155    fn scan_with_filter_expression() {
6156        let svc = make_service();
6157        create_test_table(&svc);
6158
6159        for i in 0..5 {
6160            let req = make_request(
6161                "PutItem",
6162                json!({
6163                    "TableName": "test-table",
6164                    "Item": {
6165                        "pk": {"S": format!("f-{i}")},
6166                        "status": {"S": if i % 2 == 0 { "active" } else { "inactive" }},
6167                    },
6168                }),
6169            );
6170            svc.put_item(&req).unwrap();
6171        }
6172
6173        let req = make_request(
6174            "Scan",
6175            json!({
6176                "TableName": "test-table",
6177                "FilterExpression": "#s = :val",
6178                "ExpressionAttributeNames": {"#s": "status"},
6179                "ExpressionAttributeValues": {":val": {"S": "active"}},
6180            }),
6181        );
6182        let resp = svc.scan(&req).unwrap();
6183        let b = body_json(&resp);
6184        assert_eq!(b["Count"], 3);
6185    }
6186
6187    // ── PartiQL operations (batch.rs coverage) ──
6188
6189    #[test]
6190    fn execute_statement_select() {
6191        let svc = make_service();
6192        create_test_table(&svc);
6193
6194        let req = make_request(
6195            "PutItem",
6196            json!({"TableName": "test-table", "Item": {"pk": {"S": "qs1"}, "val": {"S": "hello"}}}),
6197        );
6198        svc.put_item(&req).unwrap();
6199
6200        let req = make_request(
6201            "ExecuteStatement",
6202            json!({"Statement": "SELECT * FROM \"test-table\" WHERE pk='qs1'"}),
6203        );
6204        let resp = svc.execute_statement(&req).unwrap();
6205        let b = body_json(&resp);
6206        assert!(!b["Items"].as_array().unwrap().is_empty());
6207    }
6208
6209    #[test]
6210    fn execute_statement_insert() {
6211        let svc = make_service();
6212        create_test_table(&svc);
6213
6214        let req = make_request(
6215            "ExecuteStatement",
6216            json!({"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'ins1', 'data': 'val'}"}),
6217        );
6218        svc.execute_statement(&req).unwrap();
6219
6220        let req = make_request(
6221            "GetItem",
6222            json!({"TableName": "test-table", "Key": {"pk": {"S": "ins1"}}}),
6223        );
6224        let resp = svc.get_item(&req).unwrap();
6225        let b = body_json(&resp);
6226        assert_eq!(b["Item"]["data"]["S"], "val");
6227    }
6228
6229    #[test]
6230    fn batch_execute_statement() {
6231        let svc = make_service();
6232        create_test_table(&svc);
6233
6234        let req = make_request(
6235            "PutItem",
6236            json!({"TableName": "test-table", "Item": {"pk": {"S": "be1"}}}),
6237        );
6238        svc.put_item(&req).unwrap();
6239
6240        let req = make_request(
6241            "BatchExecuteStatement",
6242            json!({
6243                "Statements": [
6244                    {"Statement": "SELECT * FROM \"test-table\" WHERE pk='be1'"},
6245                ]
6246            }),
6247        );
6248        let resp = svc.batch_execute_statement(&req).unwrap();
6249        let b = body_json(&resp);
6250        assert!(b["Responses"].as_array().is_some());
6251    }
6252
6253    #[test]
6254    fn execute_transaction() {
6255        let svc = make_service();
6256        create_test_table(&svc);
6257
6258        let req = make_request(
6259            "ExecuteTransaction",
6260            json!({
6261                "TransactStatements": [
6262                    {"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'tx1'}"},
6263                    {"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'tx2'}"},
6264                ]
6265            }),
6266        );
6267        svc.execute_transaction(&req).unwrap();
6268
6269        let req = make_request(
6270            "GetItem",
6271            json!({"TableName": "test-table", "Key": {"pk": {"S": "tx1"}}}),
6272        );
6273        let resp = svc.get_item(&req).unwrap();
6274        let b = body_json(&resp);
6275        assert!(b["Item"].is_object());
6276    }
6277
6278    // ── Batch write with delete ──
6279
6280    #[test]
6281    fn batch_write_with_delete_requests() {
6282        let svc = make_service();
6283        create_test_table(&svc);
6284
6285        // Put items first
6286        for key in &["bwd1", "bwd2", "bwd3"] {
6287            let req = make_request(
6288                "PutItem",
6289                json!({"TableName": "test-table", "Item": {"pk": {"S": key}}}),
6290            );
6291            svc.put_item(&req).unwrap();
6292        }
6293
6294        // Batch delete two
6295        let req = make_request(
6296            "BatchWriteItem",
6297            json!({
6298                "RequestItems": {
6299                    "test-table": [
6300                        {"DeleteRequest": {"Key": {"pk": {"S": "bwd1"}}}},
6301                        {"DeleteRequest": {"Key": {"pk": {"S": "bwd2"}}}},
6302                    ]
6303                }
6304            }),
6305        );
6306        svc.batch_write_item(&req).unwrap();
6307
6308        // bwd3 should still exist
6309        let req = make_request(
6310            "GetItem",
6311            json!({"TableName": "test-table", "Key": {"pk": {"S": "bwd3"}}}),
6312        );
6313        let resp = svc.get_item(&req).unwrap();
6314        let b = body_json(&resp);
6315        assert!(b["Item"].is_object());
6316
6317        // bwd1 should be gone
6318        let req = make_request(
6319            "GetItem",
6320            json!({"TableName": "test-table", "Key": {"pk": {"S": "bwd1"}}}),
6321        );
6322        let resp = svc.get_item(&req).unwrap();
6323        let b = body_json(&resp);
6324        assert!(b.get("Item").is_none() || b["Item"].is_null());
6325    }
6326
6327    // ── Query with sort key condition ──
6328
6329    #[test]
6330    fn query_with_sort_key_begins_with() {
6331        let svc = make_service();
6332        // Table with hash+range
6333        let req = make_request(
6334            "CreateTable",
6335            json!({
6336                "TableName": "sk-table",
6337                "KeySchema": [
6338                    {"AttributeName": "pk", "KeyType": "HASH"},
6339                    {"AttributeName": "sk", "KeyType": "RANGE"},
6340                ],
6341                "AttributeDefinitions": [
6342                    {"AttributeName": "pk", "AttributeType": "S"},
6343                    {"AttributeName": "sk", "AttributeType": "S"},
6344                ],
6345                "BillingMode": "PAY_PER_REQUEST",
6346            }),
6347        );
6348        svc.create_table(&req).unwrap();
6349
6350        for sk in &["order#001", "order#002", "profile#main"] {
6351            let req = make_request(
6352                "PutItem",
6353                json!({"TableName": "sk-table", "Item": {"pk": {"S": "u1"}, "sk": {"S": sk}}}),
6354            );
6355            svc.put_item(&req).unwrap();
6356        }
6357
6358        let req = make_request(
6359            "Query",
6360            json!({
6361                "TableName": "sk-table",
6362                "KeyConditionExpression": "pk = :pk AND begins_with(sk, :prefix)",
6363                "ExpressionAttributeValues": {":pk": {"S": "u1"}, ":prefix": {"S": "order#"}},
6364            }),
6365        );
6366        let resp = svc.query(&req).unwrap();
6367        let b = body_json(&resp);
6368        assert_eq!(b["Count"], 2);
6369    }
6370
6371    // ── Scan with limit ──
6372
6373    #[test]
6374    fn scan_with_limit() {
6375        let svc = make_service();
6376        create_test_table(&svc);
6377
6378        for i in 0..10 {
6379            let req = make_request(
6380                "PutItem",
6381                json!({"TableName": "test-table", "Item": {"pk": {"S": format!("lim{i}")}}}),
6382            );
6383            svc.put_item(&req).unwrap();
6384        }
6385
6386        let req = make_request("Scan", json!({"TableName": "test-table", "Limit": 3}));
6387        let resp = svc.scan(&req).unwrap();
6388        let b = body_json(&resp);
6389        assert_eq!(b["Count"], 3);
6390        assert!(b["LastEvaluatedKey"].is_object());
6391    }
6392
6393    // ── Error branches ──
6394
6395    #[test]
6396    fn batch_get_item_table_not_found() {
6397        let svc = make_service();
6398        let req = make_request(
6399            "BatchGetItem",
6400            json!({"RequestItems": {"ghost": {"Keys": [{"pk": {"S": "k"}}]}}}),
6401        );
6402        assert!(svc.batch_get_item(&req).is_err());
6403    }
6404
6405    #[test]
6406    fn batch_write_item_table_not_found() {
6407        let svc = make_service();
6408        let req = make_request(
6409            "BatchWriteItem",
6410            json!({"RequestItems": {"ghost": [{"PutRequest": {"Item": {"pk": {"S": "k"}}}}]}}),
6411        );
6412        assert!(svc.batch_write_item(&req).is_err());
6413    }
6414
6415    // ── Global tables ──
6416
6417    #[test]
6418    fn create_and_describe_global_table() {
6419        let svc = make_service();
6420        create_test_table(&svc);
6421
6422        let req = make_request(
6423            "CreateGlobalTable",
6424            json!({
6425                "GlobalTableName": "test-table",
6426                "ReplicationGroup": [{"RegionName": "us-east-1"}, {"RegionName": "eu-west-1"}],
6427            }),
6428        );
6429        svc.create_global_table(&req).unwrap();
6430
6431        let req = make_request(
6432            "DescribeGlobalTable",
6433            json!({"GlobalTableName": "test-table"}),
6434        );
6435        let resp = svc.describe_global_table(&req).unwrap();
6436        let b = body_json(&resp);
6437        assert!(b["GlobalTableDescription"].is_object());
6438    }
6439
6440    #[test]
6441    fn list_global_tables() {
6442        let svc = make_service();
6443        let req = make_request("ListGlobalTables", json!({}));
6444        let resp = svc.list_global_tables(&req).unwrap();
6445        let b = body_json(&resp);
6446        assert!(b["GlobalTables"].as_array().is_some());
6447    }
6448
6449    // ── Backup operations ──
6450
6451    #[test]
6452    fn create_and_list_backups() {
6453        let svc = make_service();
6454        create_test_table(&svc);
6455
6456        let req = make_request(
6457            "CreateBackup",
6458            json!({"TableName": "test-table", "BackupName": "bak1"}),
6459        );
6460        let resp = svc.create_backup(&req).unwrap();
6461        let b = body_json(&resp);
6462        assert!(b["BackupDetails"]["BackupArn"].as_str().is_some());
6463
6464        let req = make_request("ListBackups", json!({}));
6465        let resp = svc.list_backups(&req).unwrap();
6466        let b = body_json(&resp);
6467        assert!(!b["BackupSummaries"].as_array().unwrap().is_empty());
6468    }
6469
6470    // ── Import/Export ──
6471
6472    #[test]
6473    fn describe_import_not_found() {
6474        let svc = make_service();
6475        let req = make_request(
6476            "DescribeImport",
6477            json!({"ImportArn": "arn:aws:dynamodb:us-east-1:123:table/t/import/ghost"}),
6478        );
6479        assert!(svc.describe_import(&req).is_err());
6480    }
6481
6482    #[test]
6483    fn describe_export_not_found() {
6484        let svc = make_service();
6485        let req = make_request(
6486            "DescribeExport",
6487            json!({"ExportArn": "arn:aws:dynamodb:us-east-1:123:table/t/export/ghost"}),
6488        );
6489        assert!(svc.describe_export(&req).is_err());
6490    }
6491
6492    // ── tables.rs error branches ──
6493
6494    #[test]
6495    fn create_table_missing_name_errors() {
6496        let svc = make_service();
6497        let req = make_request(
6498            "CreateTable",
6499            json!({
6500                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6501                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6502                "BillingMode": "PAY_PER_REQUEST"
6503            }),
6504        );
6505        assert!(svc.create_table(&req).is_err());
6506    }
6507
6508    #[test]
6509    fn create_table_duplicate_errors() {
6510        let svc = make_service();
6511        let req = make_request(
6512            "CreateTable",
6513            json!({
6514                "TableName": "dup",
6515                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6516                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6517                "BillingMode": "PAY_PER_REQUEST"
6518            }),
6519        );
6520        svc.create_table(&req).unwrap();
6521        assert!(svc.create_table(&req).is_err());
6522    }
6523
6524    #[test]
6525    fn delete_table_missing_name_errors() {
6526        let svc = make_service();
6527        let req = make_request("DeleteTable", json!({}));
6528        assert!(svc.delete_table(&req).is_err());
6529    }
6530
6531    #[test]
6532    fn delete_table_not_found_errors() {
6533        let svc = make_service();
6534        let req = make_request("DeleteTable", json!({"TableName": "ghost"}));
6535        assert!(svc.delete_table(&req).is_err());
6536    }
6537
6538    #[test]
6539    fn describe_table_missing_name_errors() {
6540        let svc = make_service();
6541        let req = make_request("DescribeTable", json!({}));
6542        assert!(svc.describe_table(&req).is_err());
6543    }
6544
6545    #[test]
6546    fn describe_table_not_found_errors() {
6547        let svc = make_service();
6548        let req = make_request("DescribeTable", json!({"TableName": "ghost"}));
6549        assert!(svc.describe_table(&req).is_err());
6550    }
6551
6552    #[test]
6553    fn update_table_missing_name_errors() {
6554        let svc = make_service();
6555        let req = make_request("UpdateTable", json!({}));
6556        assert!(svc.update_table(&req).is_err());
6557    }
6558
6559    #[test]
6560    fn update_table_not_found_errors() {
6561        let svc = make_service();
6562        let req = make_request("UpdateTable", json!({"TableName": "ghost"}));
6563        assert!(svc.update_table(&req).is_err());
6564    }
6565
6566    #[test]
6567    fn list_tables_pagination() {
6568        let svc = make_service();
6569        for i in 0..5 {
6570            let req = make_request(
6571                "CreateTable",
6572                json!({
6573                    "TableName": format!("pt{i}"),
6574                    "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6575                    "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6576                    "BillingMode": "PAY_PER_REQUEST"
6577                }),
6578            );
6579            svc.create_table(&req).unwrap();
6580        }
6581        let req = make_request("ListTables", json!({"Limit": 2}));
6582        let resp = svc.list_tables(&req).unwrap();
6583        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6584        assert_eq!(body["TableNames"].as_array().unwrap().len(), 2);
6585        assert!(body["LastEvaluatedTableName"].is_string());
6586    }
6587
6588    #[test]
6589    fn list_tables_start_exclusive() {
6590        let svc = make_service();
6591        for i in 0..3 {
6592            let req = make_request(
6593                "CreateTable",
6594                json!({
6595                    "TableName": format!("pt{i}"),
6596                    "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6597                    "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6598                    "BillingMode": "PAY_PER_REQUEST"
6599                }),
6600            );
6601            svc.create_table(&req).unwrap();
6602        }
6603        let req = make_request("ListTables", json!({"ExclusiveStartTableName": "pt0"}));
6604        let resp = svc.list_tables(&req).unwrap();
6605        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6606        let names = body["TableNames"].as_array().unwrap();
6607        assert!(!names.iter().any(|n| n == "pt0"));
6608    }
6609
6610    #[test]
6611    fn update_time_to_live_unknown_table_errors() {
6612        let svc = make_service();
6613        let req = make_request(
6614            "UpdateTimeToLive",
6615            json!({
6616                "TableName": "ghost",
6617                "TimeToLiveSpecification": {"Enabled": true, "AttributeName": "ttl"}
6618            }),
6619        );
6620        assert!(svc.update_time_to_live(&req).is_err());
6621    }
6622
6623    #[test]
6624    fn describe_time_to_live_unknown_table_errors() {
6625        let svc = make_service();
6626        let req = make_request("DescribeTimeToLive", json!({"TableName": "ghost"}));
6627        assert!(svc.describe_time_to_live(&req).is_err());
6628    }
6629
6630    // ── resource policy ──
6631
6632    #[test]
6633    fn put_resource_policy_missing_policy_errors() {
6634        let svc = make_service();
6635        let req = make_request(
6636            "CreateTable",
6637            json!({
6638                "TableName": "rp",
6639                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6640                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6641                "BillingMode": "PAY_PER_REQUEST"
6642            }),
6643        );
6644        svc.create_table(&req).unwrap();
6645        let req = make_request(
6646            "PutResourcePolicy",
6647            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/rp"}),
6648        );
6649        assert!(svc.put_resource_policy(&req).is_err());
6650    }
6651
6652    #[test]
6653    fn get_resource_policy_unknown_table_errors() {
6654        let svc = make_service();
6655        let req = make_request(
6656            "GetResourcePolicy",
6657            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost"}),
6658        );
6659        assert!(svc.get_resource_policy(&req).is_err());
6660    }
6661
6662    // ── tags ──
6663
6664    #[test]
6665    fn tag_resource_unknown_table_errors() {
6666        let svc = make_service();
6667        let req = make_request(
6668            "TagResource",
6669            json!({
6670                "ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost",
6671                "Tags": [{"Key": "k", "Value": "v"}]
6672            }),
6673        );
6674        assert!(svc.tag_resource(&req).is_err());
6675    }
6676
6677    #[test]
6678    fn list_tags_unknown_table_errors() {
6679        let svc = make_service();
6680        let req = make_request(
6681            "ListTagsOfResource",
6682            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost"}),
6683        );
6684        assert!(svc.list_tags_of_resource(&req).is_err());
6685    }
6686
6687    // ── backups ──
6688
6689    #[test]
6690    fn create_backup_unknown_table_errors() {
6691        let svc = make_service();
6692        let req = make_request(
6693            "CreateBackup",
6694            json!({"TableName": "ghost", "BackupName": "b1"}),
6695        );
6696        assert!(svc.create_backup(&req).is_err());
6697    }
6698
6699    #[test]
6700    fn delete_backup_not_found_errors() {
6701        let svc = make_service();
6702        let req = make_request(
6703            "DeleteBackup",
6704            json!({"BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"}),
6705        );
6706        assert!(svc.delete_backup(&req).is_err());
6707    }
6708
6709    #[test]
6710    fn describe_backup_not_found_errors() {
6711        let svc = make_service();
6712        let req = make_request(
6713            "DescribeBackup",
6714            json!({"BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"}),
6715        );
6716        assert!(svc.describe_backup(&req).is_err());
6717    }
6718
6719    #[test]
6720    fn restore_table_from_backup_not_found_errors() {
6721        let svc = make_service();
6722        let req = make_request(
6723            "RestoreTableFromBackup",
6724            json!({
6725                "TargetTableName": "restored",
6726                "BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"
6727            }),
6728        );
6729        assert!(svc.restore_table_from_backup(&req).is_err());
6730    }
6731
6732    #[test]
6733    fn update_continuous_backups_unknown_table_errors() {
6734        let svc = make_service();
6735        let req = make_request(
6736            "UpdateContinuousBackups",
6737            json!({
6738                "TableName": "ghost",
6739                "PointInTimeRecoverySpecification": {"PointInTimeRecoveryEnabled": true}
6740            }),
6741        );
6742        assert!(svc.update_continuous_backups(&req).is_err());
6743    }
6744
6745    // ── items.rs: put_item error branches ──
6746
6747    #[test]
6748    fn put_item_unknown_table_errors() {
6749        let svc = make_service();
6750        let req = make_request(
6751            "PutItem",
6752            json!({
6753                "TableName": "ghost",
6754                "Item": {"k": {"S": "v"}}
6755            }),
6756        );
6757        assert!(svc.put_item(&req).is_err());
6758    }
6759
6760    #[test]
6761    fn put_item_missing_key_attribute_errors() {
6762        let svc = make_service();
6763        svc.create_table(&make_request(
6764            "CreateTable",
6765            json!({
6766                "TableName": "pmk",
6767                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6768                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6769                "BillingMode": "PAY_PER_REQUEST"
6770            }),
6771        ))
6772        .unwrap();
6773        let req = make_request(
6774            "PutItem",
6775            json!({
6776                "TableName": "pmk",
6777                "Item": {"other": {"S": "v"}}
6778            }),
6779        );
6780        assert!(svc.put_item(&req).is_err());
6781    }
6782
6783    #[test]
6784    fn get_item_unknown_table_errors() {
6785        let svc = make_service();
6786        let req = make_request(
6787            "GetItem",
6788            json!({"TableName": "ghost", "Key": {"k": {"S": "1"}}}),
6789        );
6790        assert!(svc.get_item(&req).is_err());
6791    }
6792
6793    #[test]
6794    fn delete_item_unknown_table_errors() {
6795        let svc = make_service();
6796        let req = make_request(
6797            "DeleteItem",
6798            json!({"TableName": "ghost", "Key": {"k": {"S": "1"}}}),
6799        );
6800        assert!(svc.delete_item(&req).is_err());
6801    }
6802
6803    #[test]
6804    fn update_item_unknown_table_errors() {
6805        let svc = make_service();
6806        let req = make_request(
6807            "UpdateItem",
6808            json!({
6809                "TableName": "ghost",
6810                "Key": {"k": {"S": "1"}},
6811                "UpdateExpression": "SET x = :v",
6812                "ExpressionAttributeValues": {":v": {"S": "val"}}
6813            }),
6814        );
6815        assert!(svc.update_item(&req).is_err());
6816    }
6817
6818    #[test]
6819    fn query_unknown_table_errors() {
6820        let svc = make_service();
6821        let req = make_request(
6822            "Query",
6823            json!({
6824                "TableName": "ghost",
6825                "KeyConditionExpression": "k = :v",
6826                "ExpressionAttributeValues": {":v": {"S": "x"}}
6827            }),
6828        );
6829        assert!(svc.query(&req).is_err());
6830    }
6831
6832    #[test]
6833    fn scan_unknown_table_errors() {
6834        let svc = make_service();
6835        let req = make_request("Scan", json!({"TableName": "ghost"}));
6836        assert!(svc.scan(&req).is_err());
6837    }
6838
6839    #[test]
6840    fn scan_with_limit_returns_ok() {
6841        let svc = make_service();
6842        svc.create_table(&make_request(
6843            "CreateTable",
6844            json!({
6845                "TableName": "slt",
6846                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6847                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6848                "BillingMode": "PAY_PER_REQUEST"
6849            }),
6850        ))
6851        .unwrap();
6852        for i in 0..5 {
6853            svc.put_item(&make_request(
6854                "PutItem",
6855                json!({
6856                    "TableName": "slt",
6857                    "Item": {"k": {"S": format!("key-{i}")}}
6858                }),
6859            ))
6860            .unwrap();
6861        }
6862        let req = make_request("Scan", json!({"TableName": "slt", "Limit": 2}));
6863        let resp = svc.scan(&req).unwrap();
6864        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6865        assert_eq!(body["Count"], 2);
6866    }
6867
6868    #[test]
6869    fn batch_get_item_unknown_table_errors() {
6870        let svc = make_service();
6871        let req = make_request(
6872            "BatchGetItem",
6873            json!({
6874                "RequestItems": {
6875                    "ghost": {"Keys": [{"k": {"S": "1"}}]}
6876                }
6877            }),
6878        );
6879        assert!(svc.batch_get_item(&req).is_err());
6880    }
6881
6882    #[test]
6883    fn batch_write_item_unknown_table_errors() {
6884        let svc = make_service();
6885        let req = make_request(
6886            "BatchWriteItem",
6887            json!({
6888                "RequestItems": {
6889                    "ghost": [{"PutRequest": {"Item": {"k": {"S": "1"}}}}]
6890                }
6891            }),
6892        );
6893        assert!(svc.batch_write_item(&req).is_err());
6894    }
6895
6896    #[test]
6897    fn transact_write_items_unknown_table_errors() {
6898        let svc = make_service();
6899        let req = make_request(
6900            "TransactWriteItems",
6901            json!({
6902                "TransactItems": [{
6903                    "Put": {"TableName": "ghost", "Item": {"k": {"S": "1"}}}
6904                }]
6905            }),
6906        );
6907        assert!(svc.transact_write_items(&req).is_err());
6908    }
6909
6910    #[test]
6911    fn transact_get_items_unknown_table_errors() {
6912        let svc = make_service();
6913        let req = make_request(
6914            "TransactGetItems",
6915            json!({
6916                "TransactItems": [{
6917                    "Get": {"TableName": "ghost", "Key": {"k": {"S": "1"}}}
6918                }]
6919            }),
6920        );
6921        assert!(svc.transact_get_items(&req).is_err());
6922    }
6923
6924    #[test]
6925    fn describe_global_table_not_found_b() {
6926        let svc = make_service();
6927        let req = make_request("DescribeGlobalTable", json!({"GlobalTableName": "ghost"}));
6928        assert!(svc.describe_global_table(&req).is_err());
6929    }
6930
6931    #[test]
6932    fn list_global_tables_empty_ok() {
6933        let svc = make_service();
6934        let req = make_request("ListGlobalTables", json!({}));
6935        let resp = svc.list_global_tables(&req).unwrap();
6936        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6937        assert!(body["GlobalTables"].is_array());
6938    }
6939
6940    #[test]
6941    fn split_on_top_level_keyword_between_swallows_inner_and() {
6942        let parts = split_on_top_level_keyword("x = :a AND y BETWEEN :lo AND :hi", "AND");
6943        assert_eq!(
6944            parts.len(),
6945            2,
6946            "BETWEEN's inner AND must not split; got parts = {parts:?}"
6947        );
6948    }
6949
6950    #[test]
6951    fn split_on_top_level_keyword_between_nested_parens() {
6952        let parts = split_on_top_level_keyword("(x = :a) AND (y BETWEEN :lo AND :hi)", "AND");
6953        assert_eq!(parts.len(), 2);
6954    }
6955
6956    #[test]
6957    fn split_on_top_level_keyword_whitespace_variants() {
6958        for expr in [
6959            "x = :a AND y = :b",
6960            "x=:a AND y=:b",
6961            "  x = :a   AND   y = :b  ",
6962            "x\t=\t:a\tAND\ty\t=\t:b",
6963            "x = :a\nAND\ny = :b",
6964        ] {
6965            let parts = split_on_top_level_keyword(expr, "AND");
6966            assert_eq!(parts.len(), 2, "whitespace variant failed: {expr:?}");
6967        }
6968    }
6969
6970    #[test]
6971    fn split_on_top_level_keyword_case_insensitive() {
6972        let parts = split_on_top_level_keyword("x = :a and y = :b", "AND");
6973        assert_eq!(parts.len(), 2);
6974        let parts = split_on_top_level_keyword("x = :a OR y = :b", "OR");
6975        assert_eq!(parts.len(), 2);
6976    }
6977
6978    #[test]
6979    fn split_on_top_level_keyword_does_not_match_inside_identifiers() {
6980        // `land` contains "AND" but isn't word-bounded — must not split.
6981        let parts = split_on_top_level_keyword("land = :a", "AND");
6982        assert_eq!(parts.len(), 1);
6983    }
6984}