Skip to main content

fakecloud_dynamodb/service/
mod.rs

1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use base64::Engine;
15use http::StatusCode;
16use serde_json::{json, Value};
17
18use fakecloud_core::delivery::DeliveryBus;
19use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
20
21use fakecloud_persistence::{S3Store, SnapshotStore};
22use fakecloud_s3::state::SharedS3State;
23
24use crate::state::{
25    attribute_type_and_value, AttributeDefinition, AttributeValue, DynamoDbSnapshot, DynamoTable,
26    GlobalSecondaryIndex, KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection,
27    ProvisionedThroughput, SharedDynamoDbState, DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
28};
29
30/// Minimal subset of a ``DynamoTable`` that Kinesis streaming delivery needs.
31///
32/// A table can carry megabytes of items; cloning the whole table just to
33/// release the write lock and deliver one change record is extremely wasteful.
34/// Extracting only the fields the delivery path actually reads (destinations,
35/// arn, name) keeps the clone small.
36pub(super) struct KinesisDeliveryTarget {
37    pub destinations: Vec<KinesisDestination>,
38    pub arn: String,
39    pub name: String,
40}
41
42pub struct DynamoDbService {
43    state: SharedDynamoDbState,
44    pub(crate) s3_state: Option<SharedS3State>,
45    pub(crate) s3_store: Option<Arc<dyn S3Store>>,
46    delivery: Option<Arc<DeliveryBus>>,
47    snapshot_store: Option<Arc<dyn SnapshotStore>>,
48    /// Serializes concurrent snapshot writes so the newest observed
49    /// state always wins on disk. Without it, two tasks could race
50    /// between state.read().clone() and store.save() and leave older
51    /// bytes as the final on-disk state.
52    snapshot_lock: Arc<tokio::sync::Mutex<()>>,
53}
54
55impl DynamoDbService {
56    pub fn new(state: SharedDynamoDbState) -> Self {
57        Self {
58            state,
59            s3_state: None,
60            s3_store: None,
61            delivery: None,
62            snapshot_store: None,
63            snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
64        }
65    }
66
67    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
68        self.s3_state = Some(s3_state);
69        self
70    }
71
72    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
73        self.s3_store = Some(store);
74        self
75    }
76
77    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
78        self.delivery = Some(delivery);
79        self
80    }
81
82    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
83        self.snapshot_store = Some(store);
84        self
85    }
86
87    /// Persist the current in-memory state as a snapshot. Called after
88    /// every state-mutating action. A noop when no snapshot store is
89    /// configured (i.e. `StorageMode::Memory`).
90    ///
91    /// The snapshot lock serializes the full clone + serialize + write
92    /// so concurrent mutators cannot leave older bytes on disk, and
93    /// serialization + the blocking file write are offloaded to the
94    /// blocking pool to keep Tokio workers responsive.
95    async fn save_snapshot(&self) {
96        let Some(store) = self.snapshot_store.clone() else {
97            return;
98        };
99        let _guard = self.snapshot_lock.lock().await;
100        let snapshot = DynamoDbSnapshot {
101            schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
102            accounts: Some(self.state.read().clone()),
103            state: None,
104        };
105        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
106            let bytes = serde_json::to_vec(&snapshot)
107                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
108            store.save(&bytes)
109        })
110        .await;
111        match join {
112            Ok(Ok(())) => {}
113            Ok(Err(err)) => tracing::error!(%err, "failed to write dynamodb snapshot"),
114            Err(err) => tracing::error!(%err, "dynamodb snapshot task panicked"),
115        }
116    }
117
118    fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
119        if table
120            .kinesis_destinations
121            .iter()
122            .any(|d| d.destination_status == "ACTIVE")
123        {
124            Some(KinesisDeliveryTarget {
125                destinations: table.kinesis_destinations.clone(),
126                arn: table.arn.clone(),
127                name: table.name.clone(),
128            })
129        } else {
130            None
131        }
132    }
133
134    /// Deliver a change record to all active Kinesis streaming destinations for a table.
135    pub(super) fn deliver_to_kinesis_destinations(
136        &self,
137        target: &KinesisDeliveryTarget,
138        event_name: &str,
139        keys: &HashMap<String, AttributeValue>,
140        old_image: Option<&HashMap<String, AttributeValue>>,
141        new_image: Option<&HashMap<String, AttributeValue>>,
142    ) {
143        let delivery = match &self.delivery {
144            Some(d) => d,
145            None => return,
146        };
147
148        let active_destinations: Vec<_> = target
149            .destinations
150            .iter()
151            .filter(|d| d.destination_status == "ACTIVE")
152            .collect();
153
154        if active_destinations.is_empty() {
155            return;
156        }
157
158        let mut record = json!({
159            "eventID": uuid::Uuid::new_v4().to_string(),
160            "eventName": event_name,
161            "eventVersion": "1.1",
162            "eventSource": "aws:dynamodb",
163            "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
164            "dynamodb": {
165                "Keys": keys,
166                "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
167                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
168                "StreamViewType": "NEW_AND_OLD_IMAGES",
169            },
170            "eventSourceARN": &target.arn,
171            "tableName": &target.name,
172        });
173
174        if let Some(old) = old_image {
175            record["dynamodb"]["OldImage"] = json!(old);
176        }
177        if let Some(new) = new_image {
178            record["dynamodb"]["NewImage"] = json!(new);
179        }
180
181        let record_str = serde_json::to_string(&record).unwrap_or_default();
182        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
183        let partition_key = serde_json::to_string(keys).unwrap_or_default();
184
185        for dest in active_destinations {
186            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
187        }
188    }
189
190    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
191        serde_json::from_slice(&req.body).map_err(|e| {
192            AwsServiceError::aws_error(
193                StatusCode::BAD_REQUEST,
194                "SerializationException",
195                format!("Invalid JSON: {e}"),
196            )
197        })
198    }
199
200    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
201        Ok(AwsResponse::ok_json(body))
202    }
203}
204
205#[async_trait]
206impl AwsService for DynamoDbService {
207    fn service_name(&self) -> &str {
208        "dynamodb"
209    }
210
211    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
212        let mutates = is_mutating_action(req.action.as_str());
213        let result = match req.action.as_str() {
214            "CreateTable" => self.create_table(&req),
215            "DeleteTable" => self.delete_table(&req),
216            "DescribeTable" => self.describe_table(&req),
217            "ListTables" => self.list_tables(&req),
218            "UpdateTable" => self.update_table(&req),
219            "PutItem" => self.put_item(&req),
220            "GetItem" => self.get_item(&req),
221            "DeleteItem" => self.delete_item(&req),
222            "UpdateItem" => self.update_item(&req),
223            "Query" => self.query(&req),
224            "Scan" => self.scan(&req),
225            "BatchGetItem" => self.batch_get_item(&req),
226            "BatchWriteItem" => self.batch_write_item(&req),
227            "TagResource" => self.tag_resource(&req),
228            "UntagResource" => self.untag_resource(&req),
229            "ListTagsOfResource" => self.list_tags_of_resource(&req),
230            "TransactGetItems" => self.transact_get_items(&req),
231            "TransactWriteItems" => self.transact_write_items(&req),
232            "ExecuteStatement" => self.execute_statement(&req),
233            "BatchExecuteStatement" => self.batch_execute_statement(&req),
234            "ExecuteTransaction" => self.execute_transaction(&req),
235            "UpdateTimeToLive" => self.update_time_to_live(&req),
236            "DescribeTimeToLive" => self.describe_time_to_live(&req),
237            "PutResourcePolicy" => self.put_resource_policy(&req),
238            "GetResourcePolicy" => self.get_resource_policy(&req),
239            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
240            // Stubs
241            "DescribeEndpoints" => self.describe_endpoints(&req),
242            "DescribeLimits" => self.describe_limits(&req),
243            // Backups
244            "CreateBackup" => self.create_backup(&req),
245            "DeleteBackup" => self.delete_backup(&req),
246            "DescribeBackup" => self.describe_backup(&req),
247            "ListBackups" => self.list_backups(&req),
248            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
249            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
250            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
251            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
252            // Global tables
253            "CreateGlobalTable" => self.create_global_table(&req),
254            "DescribeGlobalTable" => self.describe_global_table(&req),
255            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
256            "ListGlobalTables" => self.list_global_tables(&req),
257            "UpdateGlobalTable" => self.update_global_table(&req),
258            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
259            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
260            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
261            // Kinesis streaming
262            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
263            "DisableKinesisStreamingDestination" => {
264                self.disable_kinesis_streaming_destination(&req)
265            }
266            "DescribeKinesisStreamingDestination" => {
267                self.describe_kinesis_streaming_destination(&req)
268            }
269            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
270            // Contributor insights
271            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
272            "UpdateContributorInsights" => self.update_contributor_insights(&req),
273            "ListContributorInsights" => self.list_contributor_insights(&req),
274            // Import/Export
275            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
276            "DescribeExport" => self.describe_export(&req),
277            "ListExports" => self.list_exports(&req),
278            "ImportTable" => self.import_table(&req),
279            "DescribeImport" => self.describe_import(&req),
280            "ListImports" => self.list_imports(&req),
281            _ => Err(AwsServiceError::action_not_implemented(
282                "dynamodb",
283                &req.action,
284            )),
285        };
286        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
287            self.save_snapshot().await;
288        }
289        result
290    }
291
292    fn supported_actions(&self) -> &[&str] {
293        &[
294            "CreateTable",
295            "DeleteTable",
296            "DescribeTable",
297            "ListTables",
298            "UpdateTable",
299            "PutItem",
300            "GetItem",
301            "DeleteItem",
302            "UpdateItem",
303            "Query",
304            "Scan",
305            "BatchGetItem",
306            "BatchWriteItem",
307            "TagResource",
308            "UntagResource",
309            "ListTagsOfResource",
310            "TransactGetItems",
311            "TransactWriteItems",
312            "ExecuteStatement",
313            "BatchExecuteStatement",
314            "ExecuteTransaction",
315            "UpdateTimeToLive",
316            "DescribeTimeToLive",
317            "PutResourcePolicy",
318            "GetResourcePolicy",
319            "DeleteResourcePolicy",
320            "DescribeEndpoints",
321            "DescribeLimits",
322            "CreateBackup",
323            "DeleteBackup",
324            "DescribeBackup",
325            "ListBackups",
326            "RestoreTableFromBackup",
327            "RestoreTableToPointInTime",
328            "UpdateContinuousBackups",
329            "DescribeContinuousBackups",
330            "CreateGlobalTable",
331            "DescribeGlobalTable",
332            "DescribeGlobalTableSettings",
333            "ListGlobalTables",
334            "UpdateGlobalTable",
335            "UpdateGlobalTableSettings",
336            "DescribeTableReplicaAutoScaling",
337            "UpdateTableReplicaAutoScaling",
338            "EnableKinesisStreamingDestination",
339            "DisableKinesisStreamingDestination",
340            "DescribeKinesisStreamingDestination",
341            "UpdateKinesisStreamingDestination",
342            "DescribeContributorInsights",
343            "UpdateContributorInsights",
344            "ListContributorInsights",
345            "ExportTableToPointInTime",
346            "DescribeExport",
347            "ListExports",
348            "ImportTable",
349            "DescribeImport",
350            "ListImports",
351        ]
352    }
353}
354/// Actions that mutate DynamoDB state and therefore require a snapshot
355/// write after success. Kept in sync with the dispatch table above.
356fn is_mutating_action(action: &str) -> bool {
357    matches!(
358        action,
359        "CreateTable"
360            | "DeleteTable"
361            | "UpdateTable"
362            | "PutItem"
363            | "DeleteItem"
364            | "UpdateItem"
365            | "BatchWriteItem"
366            | "TagResource"
367            | "UntagResource"
368            | "TransactWriteItems"
369            | "ExecuteStatement"
370            | "BatchExecuteStatement"
371            | "ExecuteTransaction"
372            | "UpdateTimeToLive"
373            | "PutResourcePolicy"
374            | "DeleteResourcePolicy"
375            | "CreateBackup"
376            | "DeleteBackup"
377            | "RestoreTableFromBackup"
378            | "RestoreTableToPointInTime"
379            | "UpdateContinuousBackups"
380            | "CreateGlobalTable"
381            | "UpdateGlobalTable"
382            | "UpdateGlobalTableSettings"
383            | "UpdateTableReplicaAutoScaling"
384            | "EnableKinesisStreamingDestination"
385            | "DisableKinesisStreamingDestination"
386            | "UpdateKinesisStreamingDestination"
387            | "UpdateContributorInsights"
388            | "ExportTableToPointInTime"
389            | "ImportTable"
390    )
391}
392
393// ── Helper functions ────────────────────────────────────────────────────
394
395fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
396    body[field].as_str().ok_or_else(|| {
397        AwsServiceError::aws_error(
398            StatusCode::BAD_REQUEST,
399            "ValidationException",
400            format!("{field} is required"),
401        )
402    })
403}
404
405fn require_object(
406    body: &Value,
407    field: &str,
408) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
409    let obj = body[field].as_object().ok_or_else(|| {
410        AwsServiceError::aws_error(
411            StatusCode::BAD_REQUEST,
412            "ValidationException",
413            format!("{field} is required"),
414        )
415    })?;
416    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
417}
418
419fn get_table<'a>(
420    tables: &'a HashMap<String, DynamoTable>,
421    name: &str,
422) -> Result<&'a DynamoTable, AwsServiceError> {
423    tables.get(name).ok_or_else(|| {
424        AwsServiceError::aws_error(
425            StatusCode::BAD_REQUEST,
426            "ResourceNotFoundException",
427            format!("Requested resource not found: Table: {name} not found"),
428        )
429    })
430}
431
432fn get_table_mut<'a>(
433    tables: &'a mut HashMap<String, DynamoTable>,
434    name: &str,
435) -> Result<&'a mut DynamoTable, AwsServiceError> {
436    tables.get_mut(name).ok_or_else(|| {
437        AwsServiceError::aws_error(
438            StatusCode::BAD_REQUEST,
439            "ResourceNotFoundException",
440            format!("Requested resource not found: Table: {name} not found"),
441        )
442    })
443}
444
445fn find_table_by_arn<'a>(
446    tables: &'a HashMap<String, DynamoTable>,
447    arn: &str,
448) -> Result<&'a DynamoTable, AwsServiceError> {
449    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
450        AwsServiceError::aws_error(
451            StatusCode::BAD_REQUEST,
452            "ResourceNotFoundException",
453            format!("Requested resource not found: {arn}"),
454        )
455    })
456}
457
458fn find_table_by_arn_mut<'a>(
459    tables: &'a mut HashMap<String, DynamoTable>,
460    arn: &str,
461) -> Result<&'a mut DynamoTable, AwsServiceError> {
462    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
463        AwsServiceError::aws_error(
464            StatusCode::BAD_REQUEST,
465            "ResourceNotFoundException",
466            format!("Requested resource not found: {arn}"),
467        )
468    })
469}
470
471fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
472    let arr = val.as_array().ok_or_else(|| {
473        AwsServiceError::aws_error(
474            StatusCode::BAD_REQUEST,
475            "ValidationException",
476            "KeySchema is required",
477        )
478    })?;
479    Ok(arr
480        .iter()
481        .map(|elem| KeySchemaElement {
482            attribute_name: elem["AttributeName"]
483                .as_str()
484                .unwrap_or_default()
485                .to_string(),
486            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
487        })
488        .collect())
489}
490
491fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
492    let arr = val.as_array().ok_or_else(|| {
493        AwsServiceError::aws_error(
494            StatusCode::BAD_REQUEST,
495            "ValidationException",
496            "AttributeDefinitions is required",
497        )
498    })?;
499    Ok(arr
500        .iter()
501        .map(|elem| AttributeDefinition {
502            attribute_name: elem["AttributeName"]
503                .as_str()
504                .unwrap_or_default()
505                .to_string(),
506            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
507        })
508        .collect())
509}
510
511fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
512    Ok(ProvisionedThroughput {
513        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
514        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
515    })
516}
517
518fn parse_gsi(val: &Value, billing_mode: &str) -> Vec<GlobalSecondaryIndex> {
519    let Some(arr) = val.as_array() else {
520        return Vec::new();
521    };
522    arr.iter()
523        .filter_map(|g| {
524            Some(GlobalSecondaryIndex {
525                index_name: g["IndexName"].as_str()?.to_string(),
526                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
527                projection: parse_projection(&g["Projection"]),
528                provisioned_throughput: Some(parse_gsi_throughput(
529                    &g["ProvisionedThroughput"],
530                    billing_mode,
531                )),
532                on_demand_throughput: parse_on_demand_throughput(&g["OnDemandThroughput"]),
533            })
534        })
535        .collect()
536}
537
538/// Parse an `OnDemandThroughput` block. Absent fields default to `-1`,
539/// which is AWS's sentinel for "no cap" — and the value real AWS echoes
540/// back on DescribeTable when the caller omitted either axis.
541pub(super) fn parse_on_demand_throughput(val: &Value) -> Option<crate::state::OnDemandThroughput> {
542    if !val.is_object() {
543        return None;
544    }
545    Some(crate::state::OnDemandThroughput {
546        max_read_request_units: val["MaxReadRequestUnits"].as_i64().unwrap_or(-1),
547        max_write_request_units: val["MaxWriteRequestUnits"].as_i64().unwrap_or(-1),
548    })
549}
550
551/// Resolve the provisioned-throughput slot for a GSI on a CreateTable or
552/// UpdateTable Create action. Real DynamoDB returns `{0, 0}` for GSIs on
553/// PAY_PER_REQUEST tables regardless of whether the caller sent a
554/// `ProvisionedThroughput` block, and the Terraform provider's `flatten`
555/// code keys `name`/`read_capacity`/`write_capacity` off the presence of
556/// that field — returning `None` would desynchronise state.
557fn parse_gsi_throughput(val: &Value, billing_mode: &str) -> ProvisionedThroughput {
558    if billing_mode == "PAY_PER_REQUEST" {
559        return ProvisionedThroughput {
560            read_capacity_units: 0,
561            write_capacity_units: 0,
562        };
563    }
564    ProvisionedThroughput {
565        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
566        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
567    }
568}
569
570fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
571    let Some(arr) = val.as_array() else {
572        return Vec::new();
573    };
574    arr.iter()
575        .filter_map(|l| {
576            Some(LocalSecondaryIndex {
577                index_name: l["IndexName"].as_str()?.to_string(),
578                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
579                projection: parse_projection(&l["Projection"]),
580            })
581        })
582        .collect()
583}
584
585pub(super) fn parse_projection(val: &Value) -> Projection {
586    Projection {
587        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
588        non_key_attributes: val["NonKeyAttributes"]
589            .as_array()
590            .map(|arr| {
591                arr.iter()
592                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
593                    .collect()
594            })
595            .unwrap_or_default(),
596    }
597}
598
599fn parse_tags(val: &Value) -> HashMap<String, String> {
600    let mut tags = HashMap::new();
601    if let Some(arr) = val.as_array() {
602        for tag in arr {
603            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
604                tags.insert(k.to_string(), v.to_string());
605            }
606        }
607    }
608    tags
609}
610
611fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
612    let mut names = HashMap::new();
613    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
614        for (k, v) in obj {
615            if let Some(s) = v.as_str() {
616                names.insert(k.clone(), s.to_string());
617            }
618        }
619    }
620    names
621}
622
623fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
624    let mut values = HashMap::new();
625    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
626        for (k, v) in obj {
627            values.insert(k.clone(), v.clone());
628        }
629    }
630    values
631}
632
633fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
634    if name.starts_with('#') {
635        expr_attr_names
636            .get(name)
637            .cloned()
638            .unwrap_or_else(|| name.to_string())
639    } else {
640        name.to_string()
641    }
642}
643
644/// Resolve a (possibly dotted, possibly `#name`-containing) document path to
645/// the leaf `AttributeValue` inside `item`. Single-segment paths (`foo`,
646/// `#foo`) resolve to a top-level attribute. Dotted paths (`profile.email`,
647/// `#p.#e`, `items[0].sku`) walk into `M`/`L` containers. Returns `None` if
648/// any segment is missing or the intermediate value isn't a map/list.
649fn resolve_path(
650    path: &str,
651    item: &HashMap<String, AttributeValue>,
652    expr_attr_names: &HashMap<String, String>,
653) -> Option<Value> {
654    let resolved = resolve_projection_path(path, expr_attr_names);
655    resolve_nested_path(item, &resolved)
656}
657
658fn extract_key(
659    table: &DynamoTable,
660    item: &HashMap<String, AttributeValue>,
661) -> HashMap<String, AttributeValue> {
662    let mut key = HashMap::new();
663    let hash_key = table.hash_key_name();
664    if let Some(v) = item.get(hash_key) {
665        key.insert(hash_key.to_string(), v.clone());
666    }
667    if let Some(range_key) = table.range_key_name() {
668        if let Some(v) = item.get(range_key) {
669            key.insert(range_key.to_string(), v.clone());
670        }
671    }
672    key
673}
674
675/// Parse a JSON object into a key map (used for ExclusiveStartKey).
676fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
677    let obj = value.as_object()?;
678    if obj.is_empty() {
679        return None;
680    }
681    Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
682}
683
684/// Check whether an item's key attributes match the given key map.
685fn item_matches_key(
686    item: &HashMap<String, AttributeValue>,
687    key: &HashMap<String, AttributeValue>,
688    hash_key_name: &str,
689    range_key_name: Option<&str>,
690) -> bool {
691    let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
692        (Some(a), Some(b)) => a == b,
693        _ => false,
694    };
695    if !hash_match {
696        return false;
697    }
698    match range_key_name {
699        Some(rk) => match (item.get(rk), key.get(rk)) {
700            (Some(a), Some(b)) => a == b,
701            (None, None) => true,
702            _ => false,
703        },
704        None => true,
705    }
706}
707
708/// Extract the primary key from an item given explicit key attribute names.
709fn extract_key_for_schema(
710    item: &HashMap<String, AttributeValue>,
711    hash_key_name: &str,
712    range_key_name: Option<&str>,
713) -> HashMap<String, AttributeValue> {
714    let mut key = HashMap::new();
715    if let Some(v) = item.get(hash_key_name) {
716        key.insert(hash_key_name.to_string(), v.clone());
717    }
718    if let Some(rk) = range_key_name {
719        if let Some(v) = item.get(rk) {
720            key.insert(rk.to_string(), v.clone());
721        }
722    }
723    key
724}
725
726fn validate_key_in_item(
727    table: &DynamoTable,
728    item: &HashMap<String, AttributeValue>,
729) -> Result<(), AwsServiceError> {
730    let hash_key = table.hash_key_name();
731    if !item.contains_key(hash_key) {
732        return Err(AwsServiceError::aws_error(
733            StatusCode::BAD_REQUEST,
734            "ValidationException",
735            format!("Missing the key {hash_key} in the item"),
736        ));
737    }
738    if let Some(range_key) = table.range_key_name() {
739        if !item.contains_key(range_key) {
740            return Err(AwsServiceError::aws_error(
741                StatusCode::BAD_REQUEST,
742                "ValidationException",
743                format!("Missing the key {range_key} in the item"),
744            ));
745        }
746    }
747    Ok(())
748}
749
750fn validate_key_attributes_in_key(
751    table: &DynamoTable,
752    key: &HashMap<String, AttributeValue>,
753) -> Result<(), AwsServiceError> {
754    let hash_key = table.hash_key_name();
755    if !key.contains_key(hash_key) {
756        return Err(AwsServiceError::aws_error(
757            StatusCode::BAD_REQUEST,
758            "ValidationException",
759            format!("Missing the key {hash_key} in the item"),
760        ));
761    }
762    Ok(())
763}
764
765fn project_item(
766    item: &HashMap<String, AttributeValue>,
767    body: &Value,
768) -> HashMap<String, AttributeValue> {
769    let projection = body["ProjectionExpression"].as_str();
770    match projection {
771        Some(proj) if !proj.is_empty() => {
772            let expr_attr_names = parse_expression_attribute_names(body);
773            let attrs: Vec<String> = proj
774                .split(',')
775                .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
776                .collect();
777            let mut result = HashMap::new();
778            for attr in &attrs {
779                if let Some(v) = resolve_nested_path(item, attr) {
780                    insert_nested_value(&mut result, attr, v);
781                }
782            }
783            result
784        }
785        _ => item.clone(),
786    }
787}
788
789/// Resolve expression attribute names within each segment of a projection path.
790/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
791fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
792    // Split on dots, resolve each part, rejoin
793    let mut result = String::new();
794    for (i, segment) in path.split('.').enumerate() {
795        if i > 0 {
796            result.push('.');
797        }
798        // A segment might be like "#n" or "people[0]" or "#attr[0]"
799        if let Some(bracket_pos) = segment.find('[') {
800            let key_part = &segment[..bracket_pos];
801            let index_part = &segment[bracket_pos..];
802            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
803            result.push_str(index_part);
804        } else {
805            result.push_str(&resolve_attr_name(segment, expr_attr_names));
806        }
807    }
808    result
809}
810
811/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
812fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
813    let segments = parse_path_segments(path);
814    if segments.is_empty() {
815        return None;
816    }
817
818    let first = &segments[0];
819    let top_key = match first {
820        PathSegment::Key(k) => k.as_str(),
821        _ => return None,
822    };
823
824    let mut current = item.get(top_key)?.clone();
825
826    for segment in &segments[1..] {
827        match segment {
828            PathSegment::Key(k) => {
829                // Navigate into a Map: {"M": {"key": ...}}
830                current = current.get("M")?.get(k)?.clone();
831            }
832            PathSegment::Index(idx) => {
833                // Navigate into a List: {"L": [...]}
834                current = current.get("L")?.get(*idx)?.clone();
835            }
836        }
837    }
838
839    Some(current)
840}
841
842#[derive(Debug)]
843enum PathSegment {
844    Key(String),
845    Index(usize),
846}
847
848/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
849fn parse_path_segments(path: &str) -> Vec<PathSegment> {
850    let mut segments = Vec::new();
851    let mut current = String::new();
852
853    let chars: Vec<char> = path.chars().collect();
854    let mut i = 0;
855    while i < chars.len() {
856        match chars[i] {
857            '.' => {
858                if !current.is_empty() {
859                    segments.push(PathSegment::Key(current.clone()));
860                    current.clear();
861                }
862            }
863            '[' => {
864                if !current.is_empty() {
865                    segments.push(PathSegment::Key(current.clone()));
866                    current.clear();
867                }
868                i += 1;
869                let mut num = String::new();
870                while i < chars.len() && chars[i] != ']' {
871                    num.push(chars[i]);
872                    i += 1;
873                }
874                if let Ok(idx) = num.parse::<usize>() {
875                    segments.push(PathSegment::Index(idx));
876                }
877                // skip ']'
878            }
879            c => {
880                current.push(c);
881            }
882        }
883        i += 1;
884    }
885    if !current.is_empty() {
886        segments.push(PathSegment::Key(current));
887    }
888    segments
889}
890
891/// Insert a value at a nested path in the result HashMap.
892/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
893fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
894    // Simple case: no nesting
895    if !path.contains('.') && !path.contains('[') {
896        result.insert(path.to_string(), value);
897        return;
898    }
899
900    let segments = parse_path_segments(path);
901    if segments.is_empty() {
902        return;
903    }
904
905    let top_key = match &segments[0] {
906        PathSegment::Key(k) => k.clone(),
907        _ => return,
908    };
909
910    if segments.len() == 1 {
911        result.insert(top_key, value);
912        return;
913    }
914
915    // For nested paths, wrap the value back into the nested structure
916    let wrapped = wrap_value_in_path(&segments[1..], value);
917    // Merge into existing value if present
918    let existing = result.remove(&top_key);
919    let merged = match existing {
920        Some(existing) => merge_attribute_values(existing, wrapped),
921        None => wrapped,
922    };
923    result.insert(top_key, merged);
924}
925
926/// Wrap a value in the nested path structure.
927fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
928    if segments.is_empty() {
929        return value;
930    }
931    let inner = wrap_value_in_path(&segments[1..], value);
932    match &segments[0] {
933        PathSegment::Key(k) => {
934            json!({"M": {k.clone(): inner}})
935        }
936        PathSegment::Index(idx) => {
937            let mut arr = vec![Value::Null; idx + 1];
938            arr[*idx] = inner;
939            json!({"L": arr})
940        }
941    }
942}
943
944/// Merge two attribute values (for overlapping projections).
945fn merge_attribute_values(a: Value, b: Value) -> Value {
946    if let (Some(a_map), Some(b_map)) = (
947        a.get("M").and_then(|v| v.as_object()),
948        b.get("M").and_then(|v| v.as_object()),
949    ) {
950        let mut merged = a_map.clone();
951        for (k, v) in b_map {
952            if let Some(existing) = merged.get(k) {
953                merged.insert(
954                    k.clone(),
955                    merge_attribute_values(existing.clone(), v.clone()),
956                );
957            } else {
958                merged.insert(k.clone(), v.clone());
959            }
960        }
961        json!({"M": merged})
962    } else {
963        b
964    }
965}
966
967fn evaluate_condition(
968    condition: &str,
969    existing: Option<&HashMap<String, AttributeValue>>,
970    expr_attr_names: &HashMap<String, String>,
971    expr_attr_values: &HashMap<String, Value>,
972) -> Result<(), AwsServiceError> {
973    // ConditionExpression and FilterExpression share the same DynamoDB grammar,
974    // so we delegate to evaluate_filter_expression. An empty map models "item
975    // doesn't exist" correctly: attribute_exists → false, attribute_not_exists
976    // → true, comparisons against missing attributes → None vs Some(val).
977    let empty = HashMap::new();
978    let item = existing.unwrap_or(&empty);
979    if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
980        Ok(())
981    } else {
982        Err(AwsServiceError::aws_error(
983            StatusCode::BAD_REQUEST,
984            "ConditionalCheckFailedException",
985            "The conditional request failed",
986        ))
987    }
988}
989
990fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
991    // aws-sdk-go v2's expression builder emits function calls with a space
992    // between the name and the opening paren (`attribute_exists (#0)`),
993    // while hand-written expressions usually don't — accept both.
994    let with_paren = format!("{func_name}(");
995    let with_space = format!("{func_name} (");
996    let rest = expr
997        .strip_prefix(&with_paren)
998        .or_else(|| expr.strip_prefix(&with_space))?;
999    let inner = rest.strip_suffix(')')?;
1000    Some(inner.trim())
1001}
1002
1003fn evaluate_key_condition(
1004    expr: &str,
1005    item: &HashMap<String, AttributeValue>,
1006    expr_attr_names: &HashMap<String, String>,
1007    expr_attr_values: &HashMap<String, Value>,
1008) -> bool {
1009    let trimmed = expr.trim();
1010
1011    let parts = split_on_and(trimmed);
1012    if parts.len() > 1 {
1013        return parts.iter().all(|part| {
1014            evaluate_key_condition(part.trim(), item, expr_attr_names, expr_attr_values)
1015        });
1016    }
1017
1018    let stripped = strip_outer_parens(trimmed);
1019    if stripped != trimmed {
1020        return evaluate_key_condition(stripped, item, expr_attr_names, expr_attr_values);
1021    }
1022
1023    evaluate_single_key_condition(trimmed, item, expr_attr_names, expr_attr_values)
1024}
1025
1026/// Split a DynamoDB condition expression on a top-level keyword (``AND`` /
1027/// ``OR``), case-insensitive, with ASCII-whitespace word boundaries so
1028/// ``:s\tAND\t:o`` and ``:s\nAND\n:o`` split the same as ``:s AND :o``.
1029///
1030/// Parenthesised groups are skipped so only unparenthesised occurrences of the
1031/// keyword act as separators. When splitting on ``AND``, each top-level
1032/// ``BETWEEN`` keyword consumes the next top-level ``AND`` as its own inner
1033/// separator (``x BETWEEN :lo AND :hi``) rather than letting it split the
1034/// expression.
1035fn split_on_top_level_keyword<'a>(expr: &'a str, keyword: &str) -> Vec<&'a str> {
1036    let bytes = expr.as_bytes();
1037    let len = bytes.len();
1038    let kw = keyword.as_bytes();
1039    let is_and = keyword.eq_ignore_ascii_case("AND");
1040
1041    let mut parts: Vec<&str> = Vec::new();
1042    let mut start = 0usize;
1043    let mut depth: i32 = 0;
1044    let mut between_skip: u32 = 0;
1045    let mut i = 0usize;
1046
1047    while i < len {
1048        let ch = bytes[i];
1049        if ch == b'(' {
1050            depth += 1;
1051            i += 1;
1052            continue;
1053        }
1054        if ch == b')' {
1055            if depth > 0 {
1056                depth -= 1;
1057            }
1058            i += 1;
1059            continue;
1060        }
1061        if depth == 0 {
1062            if is_and {
1063                if let Some(end) = match_keyword(bytes, i, b"BETWEEN") {
1064                    between_skip = between_skip.saturating_add(1);
1065                    i = end;
1066                    continue;
1067                }
1068            }
1069            if let Some(end) = match_keyword(bytes, i, kw) {
1070                if is_and && between_skip > 0 {
1071                    between_skip -= 1;
1072                    i = end;
1073                    continue;
1074                }
1075                parts.push(&expr[start..i]);
1076                start = end;
1077                i = end;
1078                continue;
1079            }
1080        }
1081        i += 1;
1082    }
1083    parts.push(&expr[start..]);
1084    parts
1085}
1086
1087/// Case-insensitive keyword match. For alphanumeric keywords (``AND``,
1088/// ``OR``, ``BETWEEN``) the match also requires ASCII-whitespace word
1089/// boundaries so substrings of identifiers are not mistaken for keywords.
1090/// Punctuation keywords (``,``) match literally.
1091fn match_keyword(bytes: &[u8], i: usize, keyword: &[u8]) -> Option<usize> {
1092    let end = i + keyword.len();
1093    if end > bytes.len() {
1094        return None;
1095    }
1096    for k in 0..keyword.len() {
1097        if !bytes[i + k].eq_ignore_ascii_case(&keyword[k]) {
1098            return None;
1099        }
1100    }
1101    let needs_word_boundary = keyword.iter().all(|b| b.is_ascii_alphanumeric());
1102    if needs_word_boundary {
1103        let left_ok = i == 0 || bytes[i - 1].is_ascii_whitespace();
1104        if !left_ok {
1105            return None;
1106        }
1107        let right_ok = end == bytes.len() || bytes[end].is_ascii_whitespace();
1108        if !right_ok {
1109            return None;
1110        }
1111    }
1112    Some(end)
1113}
1114
1115fn split_on_and(expr: &str) -> Vec<&str> {
1116    split_on_top_level_keyword(expr, "AND")
1117}
1118
1119fn split_on_or(expr: &str) -> Vec<&str> {
1120    split_on_top_level_keyword(expr, "OR")
1121}
1122
1123fn evaluate_single_key_condition(
1124    part: &str,
1125    item: &HashMap<String, AttributeValue>,
1126    expr_attr_names: &HashMap<String, String>,
1127    expr_attr_values: &HashMap<String, Value>,
1128) -> bool {
1129    let part = part.trim();
1130
1131    if let Some(rest) = part
1132        .strip_prefix("begins_with(")
1133        .or_else(|| part.strip_prefix("begins_with ("))
1134    {
1135        return key_cond_begins_with(rest, item, expr_attr_names, expr_attr_values);
1136    }
1137
1138    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
1139        return key_cond_between(part, between_pos, item, expr_attr_names, expr_attr_values);
1140    }
1141
1142    key_cond_simple_comparison(part, item, expr_attr_names, expr_attr_values)
1143}
1144
1145/// `begins_with(attr, :val)` — KeyCondition variant: supports only
1146/// S-typed attributes (mirrors AWS's behavior of returning false for
1147/// type mismatches). The filter-expression evaluator has its own
1148/// `eval_begins_with` because it operates on filter-grammar inputs.
1149fn key_cond_begins_with(
1150    rest: &str,
1151    item: &HashMap<String, AttributeValue>,
1152    expr_attr_names: &HashMap<String, String>,
1153    expr_attr_values: &HashMap<String, Value>,
1154) -> bool {
1155    let Some(inner) = rest.strip_suffix(')') else {
1156        return false;
1157    };
1158    let mut split = inner.splitn(2, ',');
1159    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1160        return false;
1161    };
1162    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1163    let expected = expr_attr_values.get(val_ref.trim());
1164    let actual = item.get(&attr_name);
1165    match (actual, expected) {
1166        (Some(a), Some(e)) => {
1167            let a_str = a.get("S").and_then(|v| v.as_str());
1168            let e_str = e.get("S").and_then(|v| v.as_str());
1169            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
1170        }
1171        _ => false,
1172    }
1173}
1174
1175/// `attr BETWEEN :lo AND :hi` — inclusive range comparison via the
1176/// shared `compare_attribute_values` ordering.
1177fn key_cond_between(
1178    part: &str,
1179    between_pos: usize,
1180    item: &HashMap<String, AttributeValue>,
1181    expr_attr_names: &HashMap<String, String>,
1182    expr_attr_values: &HashMap<String, Value>,
1183) -> bool {
1184    let attr_part = part[..between_pos].trim();
1185    let attr_name = resolve_attr_name(attr_part, expr_attr_names);
1186    let range_part = &part[between_pos + 7..];
1187    let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") else {
1188        return false;
1189    };
1190    let lo_ref = range_part[..and_pos].trim();
1191    let hi_ref = range_part[and_pos + 5..].trim();
1192    let lo = expr_attr_values.get(lo_ref);
1193    let hi = expr_attr_values.get(hi_ref);
1194    let actual = item.get(&attr_name);
1195    match (actual, lo, hi) {
1196        (Some(a), Some(l), Some(h)) => {
1197            compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
1198                && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
1199        }
1200        _ => false,
1201    }
1202}
1203
1204/// `attr <op> :val` — six operators (`=`, `<>`, `<`, `>`, `<=`, `>=`).
1205/// Multi-character operators come first in the search list so that `<=`
1206/// is not mistakenly matched as `<`.
1207fn key_cond_simple_comparison(
1208    part: &str,
1209    item: &HashMap<String, AttributeValue>,
1210    expr_attr_names: &HashMap<String, String>,
1211    expr_attr_values: &HashMap<String, Value>,
1212) -> bool {
1213    for op in &["<=", ">=", "<>", "=", "<", ">"] {
1214        let Some(pos) = part.find(op) else {
1215            continue;
1216        };
1217        let left = part[..pos].trim();
1218        let right = part[pos + op.len()..].trim();
1219        let actual_owned = resolve_path(left, item, expr_attr_names);
1220        let actual = actual_owned.as_ref();
1221        let expected = expr_attr_values.get(right);
1222
1223        return match *op {
1224            "=" => actual == expected,
1225            "<>" => actual != expected,
1226            "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
1227            ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
1228            "<=" => {
1229                let cmp = compare_attribute_values(actual, expected);
1230                cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
1231            }
1232            ">=" => {
1233                let cmp = compare_attribute_values(actual, expected);
1234                cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
1235            }
1236            _ => false,
1237        };
1238    }
1239    false
1240}
1241
1242/// Returns the "size" of a DynamoDB attribute value per AWS docs:
1243/// - S → character count
1244/// - B → decoded byte count
1245/// - SS/NS/BS → element count
1246/// - L → element count
1247/// - M → element count
1248///
1249/// `size()` is not valid on N, BOOL, or NULL per AWS; returns None for those so
1250/// the enclosing comparison evaluates to false (matching AWS's behavior of
1251/// silently filtering type-mismatched rows in FilterExpression context).
1252fn attribute_size(val: &Value) -> Option<usize> {
1253    if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
1254        return Some(s.len());
1255    }
1256    if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
1257        // B is base64-encoded — return decoded byte count
1258        let decoded_len = base64::engine::general_purpose::STANDARD
1259            .decode(b)
1260            .map(|v| v.len())
1261            .unwrap_or(b.len());
1262        return Some(decoded_len);
1263    }
1264    if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
1265        return Some(arr.len());
1266    }
1267    if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
1268        return Some(arr.len());
1269    }
1270    if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
1271        return Some(arr.len());
1272    }
1273    if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
1274        return Some(arr.len());
1275    }
1276    if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
1277        return Some(obj.len());
1278    }
1279    None
1280}
1281
1282/// Evaluate a `size(path) op :val` comparison expression.
1283fn evaluate_size_comparison(
1284    part: &str,
1285    item: &HashMap<String, AttributeValue>,
1286    expr_attr_names: &HashMap<String, String>,
1287    expr_attr_values: &HashMap<String, Value>,
1288) -> Option<bool> {
1289    // Find the closing paren of size(...)
1290    let open = part.find('(')?;
1291    let close = part[open..].find(')')? + open;
1292    let path = part[open + 1..close].trim();
1293    let remainder = part[close + 1..].trim();
1294
1295    // Parse operator and value ref
1296    let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
1297        ("<=", rest.trim())
1298    } else if let Some(rest) = remainder.strip_prefix(">=") {
1299        (">=", rest.trim())
1300    } else if let Some(rest) = remainder.strip_prefix("<>") {
1301        ("<>", rest.trim())
1302    } else if let Some(rest) = remainder.strip_prefix('<') {
1303        ("<", rest.trim())
1304    } else if let Some(rest) = remainder.strip_prefix('>') {
1305        (">", rest.trim())
1306    } else if let Some(rest) = remainder.strip_prefix('=') {
1307        ("=", rest.trim())
1308    } else {
1309        return None;
1310    };
1311
1312    let actual_owned = resolve_path(path, item, expr_attr_names)?;
1313    let size = attribute_size(&actual_owned)? as f64;
1314
1315    let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
1316
1317    Some(match op {
1318        "=" => (size - expected).abs() < f64::EPSILON,
1319        "<>" => (size - expected).abs() >= f64::EPSILON,
1320        "<" => size < expected,
1321        ">" => size > expected,
1322        "<=" => size <= expected,
1323        ">=" => size >= expected,
1324        _ => false,
1325    })
1326}
1327
1328fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
1329    match (a, b) {
1330        (None, None) => std::cmp::Ordering::Equal,
1331        (None, Some(_)) => std::cmp::Ordering::Less,
1332        (Some(_), None) => std::cmp::Ordering::Greater,
1333        (Some(a), Some(b)) => {
1334            let a_type = attribute_type_and_value(a);
1335            let b_type = attribute_type_and_value(b);
1336            match (a_type, b_type) {
1337                (Some(("S", a_val)), Some(("S", b_val))) => {
1338                    let a_str = a_val.as_str().unwrap_or("");
1339                    let b_str = b_val.as_str().unwrap_or("");
1340                    a_str.cmp(b_str)
1341                }
1342                (Some(("N", a_val)), Some(("N", b_val))) => {
1343                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1344                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1345                    a_num
1346                        .partial_cmp(&b_num)
1347                        .unwrap_or(std::cmp::Ordering::Equal)
1348                }
1349                (Some(("B", a_val)), Some(("B", b_val))) => {
1350                    let a_str = a_val.as_str().unwrap_or("");
1351                    let b_str = b_val.as_str().unwrap_or("");
1352                    a_str.cmp(b_str)
1353                }
1354                _ => std::cmp::Ordering::Equal,
1355            }
1356        }
1357    }
1358}
1359
1360fn evaluate_filter_expression(
1361    expr: &str,
1362    item: &HashMap<String, AttributeValue>,
1363    expr_attr_names: &HashMap<String, String>,
1364    expr_attr_values: &HashMap<String, Value>,
1365) -> bool {
1366    let trimmed = expr.trim();
1367
1368    // Split on OR first (lower precedence), respecting parentheses
1369    let or_parts = split_on_or(trimmed);
1370    if or_parts.len() > 1 {
1371        return or_parts.iter().any(|part| {
1372            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1373        });
1374    }
1375
1376    // Then split on AND (higher precedence), respecting parentheses
1377    let and_parts = split_on_and(trimmed);
1378    if and_parts.len() > 1 {
1379        return and_parts.iter().all(|part| {
1380            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1381        });
1382    }
1383
1384    // Strip outer parentheses if present
1385    let stripped = strip_outer_parens(trimmed);
1386    if stripped != trimmed {
1387        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
1388    }
1389
1390    // Handle NOT prefix (case-insensitive)
1391    if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
1392        return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
1393    }
1394
1395    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
1396}
1397
1398/// Strip matching outer parentheses from an expression.
1399fn strip_outer_parens(expr: &str) -> &str {
1400    let trimmed = expr.trim();
1401    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
1402        return trimmed;
1403    }
1404    // Verify the outer parens actually match each other
1405    let inner = &trimmed[1..trimmed.len() - 1];
1406    let mut depth = 0;
1407    for ch in inner.bytes() {
1408        match ch {
1409            b'(' => depth += 1,
1410            b')' => {
1411                if depth == 0 {
1412                    return trimmed; // closing paren matches something inside, not the outer one
1413                }
1414                depth -= 1;
1415            }
1416            _ => {}
1417        }
1418    }
1419    if depth == 0 {
1420        inner
1421    } else {
1422        trimmed
1423    }
1424}
1425
1426fn evaluate_single_filter_condition(
1427    part: &str,
1428    item: &HashMap<String, AttributeValue>,
1429    expr_attr_names: &HashMap<String, String>,
1430    expr_attr_values: &HashMap<String, Value>,
1431) -> bool {
1432    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
1433        return resolve_path(inner, item, expr_attr_names).is_some();
1434    }
1435
1436    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
1437        return resolve_path(inner, item, expr_attr_names).is_none();
1438    }
1439
1440    if let Some(rest) = part
1441        .strip_prefix("begins_with(")
1442        .or_else(|| part.strip_prefix("begins_with ("))
1443    {
1444        return eval_begins_with(rest, item, expr_attr_names, expr_attr_values);
1445    }
1446
1447    if let Some(rest) = part
1448        .strip_prefix("contains(")
1449        .or_else(|| part.strip_prefix("contains ("))
1450    {
1451        return eval_contains(rest, item, expr_attr_names, expr_attr_values);
1452    }
1453
1454    if part.starts_with("size(") || part.starts_with("size (") {
1455        if let Some(result) =
1456            evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
1457        {
1458            return result;
1459        }
1460    }
1461
1462    if let Some(rest) = part
1463        .strip_prefix("attribute_type(")
1464        .or_else(|| part.strip_prefix("attribute_type ("))
1465    {
1466        return eval_attribute_type(rest, item, expr_attr_names, expr_attr_values);
1467    }
1468
1469    if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
1470        let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
1471        let actual = item.get(&attr_name);
1472        return evaluate_in_match(actual, &value_refs, expr_attr_values);
1473    }
1474
1475    evaluate_single_key_condition(part, item, expr_attr_names, expr_attr_values)
1476}
1477
1478/// `begins_with(path, :val)` — only S (string) operands. Returns false on
1479/// any parse failure or type mismatch (this is the same shape DynamoDB
1480/// returns: a malformed predicate is silently false rather than an error).
1481fn eval_begins_with(
1482    rest: &str,
1483    item: &HashMap<String, AttributeValue>,
1484    expr_attr_names: &HashMap<String, String>,
1485    expr_attr_values: &HashMap<String, Value>,
1486) -> bool {
1487    let Some(inner) = rest.strip_suffix(')') else {
1488        return false;
1489    };
1490    let mut split = inner.splitn(2, ',');
1491    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1492        return false;
1493    };
1494    let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
1495    let expected = expr_attr_values.get(val_ref.trim());
1496    match (actual.as_ref(), expected) {
1497        (Some(a), Some(e)) => {
1498            let a_str = a.get("S").and_then(|v| v.as_str());
1499            let e_str = e.get("S").and_then(|v| v.as_str());
1500            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
1501        }
1502        _ => false,
1503    }
1504}
1505
1506/// `contains(path, :val)` — substring check on S, set membership on
1507/// SS/NS/BS, and element membership on L. Other type pairings return
1508/// false.
1509fn eval_contains(
1510    rest: &str,
1511    item: &HashMap<String, AttributeValue>,
1512    expr_attr_names: &HashMap<String, String>,
1513    expr_attr_values: &HashMap<String, Value>,
1514) -> bool {
1515    let Some(inner) = rest.strip_suffix(')') else {
1516        return false;
1517    };
1518    let mut split = inner.splitn(2, ',');
1519    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1520        return false;
1521    };
1522    let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
1523    let expected = expr_attr_values.get(val_ref.trim());
1524    let (Some(a), Some(e)) = (actual.as_ref(), expected) else {
1525        return false;
1526    };
1527
1528    if let (Some(a_s), Some(e_s)) = (
1529        a.get("S").and_then(|v| v.as_str()),
1530        e.get("S").and_then(|v| v.as_str()),
1531    ) {
1532        return a_s.contains(e_s);
1533    }
1534    if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
1535        if let Some(val) = e.get("S") {
1536            return set.contains(val);
1537        }
1538    }
1539    if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
1540        if let Some(val) = e.get("N") {
1541            return set.contains(val);
1542        }
1543    }
1544    if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
1545        if let Some(val) = e.get("B") {
1546            return set.contains(val);
1547        }
1548    }
1549    if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
1550        return list.contains(e);
1551    }
1552    false
1553}
1554
1555/// `attribute_type(path, :type)` — checks whether the attribute at `path`
1556/// is stored under the wire type identified by `:type` (one of the
1557/// DynamoDB type letters S/N/B/BOOL/NULL/SS/NS/BS/L/M).
1558fn eval_attribute_type(
1559    rest: &str,
1560    item: &HashMap<String, AttributeValue>,
1561    expr_attr_names: &HashMap<String, String>,
1562    expr_attr_values: &HashMap<String, Value>,
1563) -> bool {
1564    let Some(inner) = rest.strip_suffix(')') else {
1565        return false;
1566    };
1567    let mut split = inner.splitn(2, ',');
1568    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1569        return false;
1570    };
1571    let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
1572    let expected_type = expr_attr_values
1573        .get(val_ref.trim())
1574        .and_then(|v| v.get("S"))
1575        .and_then(|v| v.as_str());
1576    let (Some(val), Some(t)) = (actual.as_ref(), expected_type) else {
1577        return false;
1578    };
1579    match t {
1580        "S" => val.get("S").is_some(),
1581        "N" => val.get("N").is_some(),
1582        "B" => val.get("B").is_some(),
1583        "BOOL" => val.get("BOOL").is_some(),
1584        "NULL" => val.get("NULL").is_some(),
1585        "SS" => val.get("SS").is_some(),
1586        "NS" => val.get("NS").is_some(),
1587        "BS" => val.get("BS").is_some(),
1588        "L" => val.get("L").is_some(),
1589        "M" => val.get("M").is_some(),
1590        _ => false,
1591    }
1592}
1593
1594/// Parse an `attr IN (:v1, :v2, ...)` expression. Mirrors the DynamoDB
1595/// ConditionExpression / FilterExpression grammar where IN takes a single
1596/// operand on the left and 1–100 comma-separated value refs inside parens
1597/// on the right. Case-insensitive; tolerates missing spaces after commas
1598/// (aws-sdk-go's `expression` builder emits ", " but hand-built expressions
1599/// often use `strings.Join(..., ",")`). Returns None for non-IN inputs so
1600/// callers can fall through to their other grammar branches.
1601fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
1602    let upper = expr.to_ascii_uppercase();
1603    let in_pos = upper.find(" IN ")?;
1604    let attr_ref = expr[..in_pos].trim();
1605    if attr_ref.is_empty() {
1606        return None;
1607    }
1608    let rest = expr[in_pos + 4..].trim_start();
1609    let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
1610    let values: Vec<&str> = inner
1611        .split(',')
1612        .map(|s| s.trim())
1613        .filter(|s| !s.is_empty())
1614        .collect();
1615    if values.is_empty() {
1616        return None;
1617    }
1618    Some((attr_ref, values))
1619}
1620
1621/// Return true iff `actual` equals any of the `value_refs` resolved through
1622/// `expr_attr_values`. A missing attribute never matches (mirrors AWS, which
1623/// evaluates `IN` against undefined attributes as false).
1624fn evaluate_in_match(
1625    actual: Option<&AttributeValue>,
1626    value_refs: &[&str],
1627    expr_attr_values: &HashMap<String, Value>,
1628) -> bool {
1629    value_refs.iter().any(|v_ref| {
1630        let expected = expr_attr_values.get(*v_ref);
1631        matches!((actual, expected), (Some(a), Some(e)) if a == e)
1632    })
1633}
1634
1635/// One of the four DynamoDB ``UpdateExpression`` action keywords.
1636#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1637enum UpdateAction {
1638    Set,
1639    Remove,
1640    Add,
1641    Delete,
1642}
1643
1644impl UpdateAction {
1645    /// All four keywords as written on the wire — these double as the search
1646    /// terms for ``parse_update_clauses``.
1647    const KEYWORDS: &'static [(&'static str, UpdateAction)] = &[
1648        ("SET", UpdateAction::Set),
1649        ("REMOVE", UpdateAction::Remove),
1650        ("ADD", UpdateAction::Add),
1651        ("DELETE", UpdateAction::Delete),
1652    ];
1653
1654    fn keyword(self) -> &'static str {
1655        match self {
1656            UpdateAction::Set => "SET",
1657            UpdateAction::Remove => "REMOVE",
1658            UpdateAction::Add => "ADD",
1659            UpdateAction::Delete => "DELETE",
1660        }
1661    }
1662}
1663
1664fn apply_update_expression(
1665    item: &mut HashMap<String, AttributeValue>,
1666    expr: &str,
1667    expr_attr_names: &HashMap<String, String>,
1668    expr_attr_values: &HashMap<String, Value>,
1669) -> Result<(), AwsServiceError> {
1670    let clauses = parse_update_clauses(expr);
1671    if clauses.is_empty() && !expr.trim().is_empty() {
1672        return Err(AwsServiceError::aws_error(
1673            StatusCode::BAD_REQUEST,
1674            "ValidationException",
1675            "Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
1676        ));
1677    }
1678    for (action, assignments) in &clauses {
1679        match action {
1680            UpdateAction::Set => {
1681                for assignment in assignments {
1682                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1683                }
1684            }
1685            UpdateAction::Remove => {
1686                for attr_ref in assignments {
1687                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1688                    item.remove(&attr);
1689                }
1690            }
1691            UpdateAction::Add => {
1692                for assignment in assignments {
1693                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1694                }
1695            }
1696            UpdateAction::Delete => {
1697                for assignment in assignments {
1698                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1699                }
1700            }
1701        }
1702    }
1703    Ok(())
1704}
1705
1706fn parse_update_clauses(expr: &str) -> Vec<(UpdateAction, Vec<String>)> {
1707    let mut clauses: Vec<(UpdateAction, Vec<String>)> = Vec::new();
1708    let upper = expr.to_ascii_uppercase();
1709    let mut positions: Vec<(usize, UpdateAction)> = Vec::new();
1710
1711    for &(kw, action) in UpdateAction::KEYWORDS {
1712        let mut search_from = 0;
1713        while let Some(pos) = upper[search_from..].find(kw) {
1714            let abs_pos = search_from + pos;
1715            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
1716            let after_pos = abs_pos + kw.len();
1717            let after_ok =
1718                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
1719            if before_ok && after_ok {
1720                positions.push((abs_pos, action));
1721            }
1722            search_from = abs_pos + kw.len();
1723        }
1724    }
1725
1726    positions.sort_by_key(|(pos, _)| *pos);
1727
1728    for (i, &(pos, action)) in positions.iter().enumerate() {
1729        let start = pos + action.keyword().len();
1730        let end = if i + 1 < positions.len() {
1731            positions[i + 1].0
1732        } else {
1733            expr.len()
1734        };
1735        let content = expr[start..end].trim();
1736        // Use a paren-aware split so that function-call arguments such as
1737        // `list_append(#a, :b)` are kept as a single assignment rather than
1738        // being torn apart at the inner comma.
1739        let assignments: Vec<String> = split_on_top_level_keyword(content, ",")
1740            .into_iter()
1741            .map(|s| s.trim().to_string())
1742            .collect();
1743        clauses.push((action, assignments));
1744    }
1745
1746    clauses
1747}
1748
1749fn apply_set_assignment(
1750    item: &mut HashMap<String, AttributeValue>,
1751    assignment: &str,
1752    expr_attr_names: &HashMap<String, String>,
1753    expr_attr_values: &HashMap<String, Value>,
1754) -> Result<(), AwsServiceError> {
1755    let Some((left, right)) = assignment.split_once('=') else {
1756        return Ok(());
1757    };
1758
1759    let left_trimmed = left.trim();
1760    let right = right.trim();
1761
1762    // One RHS evaluator used for every LHS shape so `SET a.b = a.b + :d`,
1763    // `SET a.b = list_append(a.b, :list)`, and `SET a.b = if_not_exists(a.b, :v)`
1764    // all work against nested paths, not just top-level attributes. The evaluator
1765    // returns Ok(None) when the RHS is a no-op (if_not_exists where the target
1766    // already has a value, or an unresolvable plain reference).
1767    let new_value = evaluate_set_rhs(right, item, expr_attr_names, expr_attr_values)?;
1768
1769    if is_dotted_path(left_trimmed) {
1770        // A None value is a no-op (if_not_exists skip, or unresolvable plain
1771        // ref) — matches top-level SET's silent-skip behavior for the same
1772        // shapes. Structural errors (missing parent map, non-map intermediate)
1773        // surface from assign_nested_path itself.
1774        let Some(v) = new_value else {
1775            return Ok(());
1776        };
1777        return assign_nested_path(item, left_trimmed, expr_attr_names, v);
1778    }
1779
1780    // Split off a trailing `[N]` list-index suffix so we can resolve the
1781    // attribute name ref on its own. Without this, `resolve_attr_name` sees
1782    // "#items[0]" as a whole and misses the `#items` → `items` mapping.
1783    let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
1784        Some((name, idx)) => (name, Some(idx)),
1785        None => (left_trimmed, None),
1786    };
1787    let attr = resolve_attr_name(attr_ref, expr_attr_names);
1788
1789    let Some(v) = new_value else {
1790        return Ok(());
1791    };
1792    match list_index {
1793        Some(idx) => assign_list_index(item, &attr, idx, v),
1794        None => {
1795            item.insert(attr, v);
1796            Ok(())
1797        }
1798    }
1799}
1800
1801/// Evaluate the RHS of a `SET` assignment without writing it anywhere.
1802/// Returns `Ok(Some(value))` with the computed value, `Ok(None)` for
1803/// no-op cases (if_not_exists where the target already has a value, or
1804/// an unresolvable plain reference in dotted-path context), or
1805/// `Err(ValidationException)` for type-mismatched arithmetic.
1806fn evaluate_set_rhs(
1807    right: &str,
1808    item: &HashMap<String, AttributeValue>,
1809    expr_attr_names: &HashMap<String, String>,
1810    expr_attr_values: &HashMap<String, Value>,
1811) -> Result<Option<Value>, AwsServiceError> {
1812    if let Some(rest) = right
1813        .strip_prefix("if_not_exists(")
1814        .or_else(|| right.strip_prefix("if_not_exists ("))
1815    {
1816        return Ok(evaluate_if_not_exists_rhs(
1817            rest,
1818            item,
1819            expr_attr_names,
1820            expr_attr_values,
1821        ));
1822    }
1823
1824    if let Some(rest) = right
1825        .strip_prefix("list_append(")
1826        .or_else(|| right.strip_prefix("list_append ("))
1827    {
1828        return Ok(evaluate_list_append_rhs(
1829            rest,
1830            item,
1831            expr_attr_names,
1832            expr_attr_values,
1833        ));
1834    }
1835
1836    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
1837        return evaluate_arithmetic_rhs(
1838            arith_left,
1839            arith_right,
1840            is_add,
1841            item,
1842            expr_attr_names,
1843            expr_attr_values,
1844        );
1845    }
1846
1847    Ok(resolve_ref_or_path(
1848        right,
1849        item,
1850        expr_attr_names,
1851        expr_attr_values,
1852    ))
1853}
1854
1855/// `if_not_exists(path, :val)` — evaluates to nothing when `path` already
1856/// resolves to a value, and to the default ref otherwise. `path` may be a
1857/// top-level attribute, a placeholder, or a dotted path inside an M-typed
1858/// attribute.
1859fn evaluate_if_not_exists_rhs(
1860    rest: &str,
1861    item: &HashMap<String, AttributeValue>,
1862    expr_attr_names: &HashMap<String, String>,
1863    expr_attr_values: &HashMap<String, Value>,
1864) -> Option<Value> {
1865    let inner = rest.strip_suffix(')')?;
1866    let mut split = inner.splitn(2, ',');
1867    let (check, default) = (split.next()?, split.next()?);
1868    if resolve_ref_or_path(check.trim(), item, expr_attr_names, expr_attr_values).is_some() {
1869        return None;
1870    }
1871    resolve_ref_or_path(default.trim(), item, expr_attr_names, expr_attr_values)
1872}
1873
1874/// `list_append(a, b)` — concatenate the L arrays of two list operands.
1875/// Either operand may be missing or non-list, in which case it contributes
1876/// nothing. Both operands may be value refs (`:list`) or document paths
1877/// (top-level or dotted).
1878fn evaluate_list_append_rhs(
1879    rest: &str,
1880    item: &HashMap<String, AttributeValue>,
1881    expr_attr_names: &HashMap<String, String>,
1882    expr_attr_values: &HashMap<String, Value>,
1883) -> Option<Value> {
1884    let inner = rest.strip_suffix(')')?;
1885    let mut split = inner.splitn(2, ',');
1886    let (a_ref, b_ref) = (split.next()?, split.next()?);
1887    let a_val = resolve_ref_or_path(a_ref.trim(), item, expr_attr_names, expr_attr_values);
1888    let b_val = resolve_ref_or_path(b_ref.trim(), item, expr_attr_names, expr_attr_values);
1889
1890    let mut merged = Vec::new();
1891    for v in [&a_val, &b_val].iter().copied().flatten() {
1892        if let Value::Object(obj) = v {
1893            if let Some(Value::Array(arr)) = obj.get("L") {
1894                merged.extend(arr.clone());
1895            }
1896        }
1897    }
1898    Some(json!({ "L": merged }))
1899}
1900
1901/// `<arith_left> +/- <arith_right>` — both operands must resolve to N values
1902/// (or the LHS may be missing, in which case it's treated as 0). Anything
1903/// else is rejected with the same `ValidationException` AWS returns.
1904fn evaluate_arithmetic_rhs(
1905    arith_left: &str,
1906    arith_right: &str,
1907    is_add: bool,
1908    item: &HashMap<String, AttributeValue>,
1909    expr_attr_names: &HashMap<String, String>,
1910    expr_attr_values: &HashMap<String, Value>,
1911) -> Result<Option<Value>, AwsServiceError> {
1912    let left_val = resolve_ref_or_path(arith_left.trim(), item, expr_attr_names, expr_attr_values);
1913    let right_val =
1914        resolve_ref_or_path(arith_right.trim(), item, expr_attr_names, expr_attr_values);
1915
1916    let left_num = match extract_number(&left_val) {
1917        Some(n) => n,
1918        None if left_val.is_some() => {
1919            return Err(AwsServiceError::aws_error(
1920                StatusCode::BAD_REQUEST,
1921                "ValidationException",
1922                "An operand in the update expression has an incorrect data type",
1923            ));
1924        }
1925        None => 0.0,
1926    };
1927    let right_num = extract_number(&right_val).ok_or_else(|| {
1928        AwsServiceError::aws_error(
1929            StatusCode::BAD_REQUEST,
1930            "ValidationException",
1931            "An operand in the update expression has an incorrect data type",
1932        )
1933    })?;
1934
1935    let result = if is_add {
1936        left_num + right_num
1937    } else {
1938        left_num - right_num
1939    };
1940
1941    let num_str = if result == result.trunc() {
1942        format!("{}", result as i64)
1943    } else {
1944        format!("{result}")
1945    };
1946
1947    Ok(Some(json!({ "N": num_str })))
1948}
1949
1950/// Parse a trailing `[N]` list-index suffix off the LHS of a SET assignment.
1951/// Returns the bare attribute reference and the index, or None when the LHS
1952/// is a plain attribute (or a path shape we don't yet support).
1953fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
1954    let path = path.trim();
1955    if !path.ends_with(']') {
1956        return None;
1957    }
1958    let open = path.rfind('[')?;
1959    // Require no further `.` / `[` / `]` inside the bracketed portion and no
1960    // further path segments after — we only handle the single-index case
1961    // `name[N]`, not nested shapes like `a.b[0].c`.
1962    let idx_str = &path[open + 1..path.len() - 1];
1963    let idx: usize = idx_str.parse().ok()?;
1964    let name = &path[..open];
1965    if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
1966        return None;
1967    }
1968    Some((name, idx))
1969}
1970
1971/// Assign a value to a specific index of a `L`-typed attribute. If `idx` is
1972/// within the current list, replaces that slot; if it's at the end, appends.
1973/// AWS rejects writes beyond `len`, so we return a `ValidationException` for
1974/// out-of-range indices and non-list attributes.
1975fn assign_list_index(
1976    item: &mut HashMap<String, AttributeValue>,
1977    attr: &str,
1978    idx: usize,
1979    value: Value,
1980) -> Result<(), AwsServiceError> {
1981    let Some(existing) = item.get_mut(attr) else {
1982        return Err(invalid_document_path());
1983    };
1984    let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
1985        return Err(invalid_document_path());
1986    };
1987    if idx < list.len() {
1988        list[idx] = value;
1989    } else if idx == list.len() {
1990        list.push(value);
1991    } else {
1992        return Err(invalid_document_path());
1993    }
1994    Ok(())
1995}
1996
1997fn invalid_document_path() -> AwsServiceError {
1998    AwsServiceError::aws_error(
1999        StatusCode::BAD_REQUEST,
2000        "ValidationException",
2001        "The document path provided in the update expression is invalid for update",
2002    )
2003}
2004
2005/// Resolve a SET-RHS operand that may be either a value placeholder
2006/// (``:foo``) or a document path (top-level attribute, ``#name``, or a
2007/// dotted path like ``profile.email`` / ``#web.#count``).
2008fn resolve_ref_or_path(
2009    reference: &str,
2010    item: &HashMap<String, AttributeValue>,
2011    expr_attr_names: &HashMap<String, String>,
2012    expr_attr_values: &HashMap<String, Value>,
2013) -> Option<Value> {
2014    let reference = reference.trim();
2015    if reference.starts_with(':') {
2016        return expr_attr_values.get(reference).cloned();
2017    }
2018    let resolved = resolve_projection_path(reference, expr_attr_names);
2019    resolve_nested_path(item, &resolved)
2020}
2021
2022/// True if `path` targets a nested key inside an M-typed attribute. Bracketed
2023/// list indices (`a[0]`, `a.b[0]`) are not supported by the nested-SET writer.
2024fn is_dotted_path(path: &str) -> bool {
2025    path.contains('.') && !path.contains('[')
2026}
2027
2028/// Write `value` at a dotted path inside an M-typed attribute.
2029///
2030/// Resolves each `#name` segment through `expr_attr_names`. The top-level
2031/// attribute and every intermediate segment must already exist as a Map —
2032/// DynamoDB rejects writes through missing parents with ValidationException.
2033fn assign_nested_path(
2034    item: &mut HashMap<String, AttributeValue>,
2035    path: &str,
2036    expr_attr_names: &HashMap<String, String>,
2037    value: Value,
2038) -> Result<(), AwsServiceError> {
2039    let mut segments: Vec<String> = path
2040        .split('.')
2041        .map(|seg| resolve_attr_name(seg.trim(), expr_attr_names))
2042        .collect();
2043    if segments.len() < 2 {
2044        return Err(invalid_document_path());
2045    }
2046
2047    let leaf = segments.pop().expect("len >= 2");
2048    let top = segments.remove(0);
2049
2050    let top_attr = item.get_mut(&top).ok_or_else(invalid_document_path)?;
2051    let mut current = top_attr
2052        .get_mut("M")
2053        .and_then(|m| m.as_object_mut())
2054        .ok_or_else(invalid_document_path)?;
2055
2056    for seg in &segments {
2057        current = current
2058            .get_mut(seg)
2059            .and_then(|v| v.get_mut("M"))
2060            .and_then(|m| m.as_object_mut())
2061            .ok_or_else(invalid_document_path)?;
2062    }
2063
2064    current.insert(leaf, value);
2065    Ok(())
2066}
2067
2068fn extract_number(val: &Option<Value>) -> Option<f64> {
2069    val.as_ref()
2070        .and_then(|v| v.get("N"))
2071        .and_then(|n| n.as_str())
2072        .and_then(|s| s.parse().ok())
2073}
2074
2075fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
2076    let mut depth = 0;
2077    for (i, c) in expr.char_indices() {
2078        match c {
2079            '(' => depth += 1,
2080            ')' => depth -= 1,
2081            '+' if depth == 0 && i > 0 => {
2082                return Some((&expr[..i], &expr[i + 1..], true));
2083            }
2084            '-' if depth == 0 && i > 0 => {
2085                return Some((&expr[..i], &expr[i + 1..], false));
2086            }
2087            _ => {}
2088        }
2089    }
2090    None
2091}
2092
2093fn apply_add_assignment(
2094    item: &mut HashMap<String, AttributeValue>,
2095    assignment: &str,
2096    expr_attr_names: &HashMap<String, String>,
2097    expr_attr_values: &HashMap<String, Value>,
2098) -> Result<(), AwsServiceError> {
2099    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
2100    if parts.len() != 2 {
2101        return Ok(());
2102    }
2103
2104    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
2105    let val_ref = parts[1].trim();
2106    let add_val = expr_attr_values.get(val_ref);
2107
2108    if let Some(add_val) = add_val {
2109        if let Some(existing) = item.get(&attr) {
2110            if let (Some(existing_num), Some(add_num)) = (
2111                extract_number(&Some(existing.clone())),
2112                extract_number(&Some(add_val.clone())),
2113            ) {
2114                let result = existing_num + add_num;
2115                let num_str = if result == result.trunc() {
2116                    format!("{}", result as i64)
2117                } else {
2118                    format!("{result}")
2119                };
2120                item.insert(attr, json!({"N": num_str}));
2121            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
2122                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
2123                    let mut merged: Vec<Value> = existing_set.clone();
2124                    for v in add_set {
2125                        if !merged.contains(v) {
2126                            merged.push(v.clone());
2127                        }
2128                    }
2129                    item.insert(attr, json!({"SS": merged}));
2130                }
2131            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
2132                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
2133                    let mut merged: Vec<Value> = existing_set.clone();
2134                    for v in add_set {
2135                        if !merged.contains(v) {
2136                            merged.push(v.clone());
2137                        }
2138                    }
2139                    item.insert(attr, json!({"NS": merged}));
2140                }
2141            } else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
2142                if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
2143                    let mut merged: Vec<Value> = existing_set.clone();
2144                    for v in add_set {
2145                        if !merged.contains(v) {
2146                            merged.push(v.clone());
2147                        }
2148                    }
2149                    item.insert(attr, json!({"BS": merged}));
2150                }
2151            }
2152        } else {
2153            item.insert(attr, add_val.clone());
2154        }
2155    }
2156
2157    Ok(())
2158}
2159
2160fn apply_delete_assignment(
2161    item: &mut HashMap<String, AttributeValue>,
2162    assignment: &str,
2163    expr_attr_names: &HashMap<String, String>,
2164    expr_attr_values: &HashMap<String, Value>,
2165) -> Result<(), AwsServiceError> {
2166    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
2167    if parts.len() != 2 {
2168        return Ok(());
2169    }
2170
2171    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
2172    let val_ref = parts[1].trim();
2173    let del_val = expr_attr_values.get(val_ref);
2174
2175    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
2176        if let (Some(existing_set), Some(del_set)) = (
2177            existing.get("SS").and_then(|v| v.as_array()),
2178            del_val.get("SS").and_then(|v| v.as_array()),
2179        ) {
2180            let filtered: Vec<Value> = existing_set
2181                .iter()
2182                .filter(|v| !del_set.contains(v))
2183                .cloned()
2184                .collect();
2185            if filtered.is_empty() {
2186                item.remove(&attr);
2187            } else {
2188                item.insert(attr, json!({"SS": filtered}));
2189            }
2190        } else if let (Some(existing_set), Some(del_set)) = (
2191            existing.get("NS").and_then(|v| v.as_array()),
2192            del_val.get("NS").and_then(|v| v.as_array()),
2193        ) {
2194            let filtered: Vec<Value> = existing_set
2195                .iter()
2196                .filter(|v| !del_set.contains(v))
2197                .cloned()
2198                .collect();
2199            if filtered.is_empty() {
2200                item.remove(&attr);
2201            } else {
2202                item.insert(attr, json!({"NS": filtered}));
2203            }
2204        } else if let (Some(existing_set), Some(del_set)) = (
2205            existing.get("BS").and_then(|v| v.as_array()),
2206            del_val.get("BS").and_then(|v| v.as_array()),
2207        ) {
2208            let filtered: Vec<Value> = existing_set
2209                .iter()
2210                .filter(|v| !del_set.contains(v))
2211                .cloned()
2212                .collect();
2213            if filtered.is_empty() {
2214                item.remove(&attr);
2215            } else {
2216                item.insert(attr, json!({"BS": filtered}));
2217            }
2218        }
2219    }
2220
2221    Ok(())
2222}
2223
2224pub(super) struct TableDescriptionInput<'a> {
2225    pub arn: &'a str,
2226    pub table_id: &'a str,
2227    pub key_schema: &'a [KeySchemaElement],
2228    pub attribute_definitions: &'a [AttributeDefinition],
2229    pub provisioned_throughput: &'a ProvisionedThroughput,
2230    pub gsi: &'a [GlobalSecondaryIndex],
2231    pub lsi: &'a [LocalSecondaryIndex],
2232    pub billing_mode: &'a str,
2233    pub created_at: chrono::DateTime<chrono::Utc>,
2234    pub item_count: i64,
2235    pub size_bytes: i64,
2236    pub status: &'a str,
2237    pub deletion_protection_enabled: bool,
2238    pub on_demand_throughput: Option<&'a crate::state::OnDemandThroughput>,
2239}
2240
2241fn build_table_description_json(input: &TableDescriptionInput<'_>) -> Value {
2242    let TableDescriptionInput {
2243        arn,
2244        table_id,
2245        key_schema,
2246        attribute_definitions,
2247        provisioned_throughput,
2248        gsi,
2249        lsi,
2250        billing_mode,
2251        created_at,
2252        item_count,
2253        size_bytes,
2254        status,
2255        deletion_protection_enabled,
2256        on_demand_throughput,
2257    } = *input;
2258    let table_name = arn.rsplit('/').next().unwrap_or("");
2259    let creation_timestamp =
2260        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
2261
2262    let ks: Vec<Value> = key_schema
2263        .iter()
2264        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2265        .collect();
2266
2267    let ad: Vec<Value> = attribute_definitions
2268        .iter()
2269        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
2270        .collect();
2271
2272    let mut desc = json!({
2273        "TableName": table_name,
2274        "TableArn": arn,
2275        "TableId": table_id,
2276        "TableStatus": status,
2277        "KeySchema": ks,
2278        "AttributeDefinitions": ad,
2279        "CreationDateTime": creation_timestamp,
2280        "ItemCount": item_count,
2281        "TableSizeBytes": size_bytes,
2282        "BillingModeSummary": { "BillingMode": billing_mode },
2283        "DeletionProtectionEnabled": deletion_protection_enabled,
2284    });
2285
2286    if billing_mode != "PAY_PER_REQUEST" {
2287        desc["ProvisionedThroughput"] = json!({
2288            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
2289            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
2290            "NumberOfDecreasesToday": 0,
2291        });
2292    } else {
2293        desc["ProvisionedThroughput"] = json!({
2294            "ReadCapacityUnits": 0,
2295            "WriteCapacityUnits": 0,
2296            "NumberOfDecreasesToday": 0,
2297        });
2298    }
2299
2300    if let Some(odt) = on_demand_throughput {
2301        desc["OnDemandThroughput"] = json!({
2302            "MaxReadRequestUnits": odt.max_read_request_units,
2303            "MaxWriteRequestUnits": odt.max_write_request_units,
2304        });
2305    }
2306
2307    // Terraform's AWS provider now waits on WarmThroughput after CreateTable.
2308    // Real AWS returns an ACTIVE warm throughput object for active tables,
2309    // including PAY_PER_REQUEST tables. Returning null keeps the provider in a
2310    // perpetual "still creating" loop.
2311    if status == "ACTIVE" {
2312        desc["WarmThroughput"] = json!({
2313            "ReadUnitsPerSecond": 0,
2314            "WriteUnitsPerSecond": 0,
2315            "Status": "ACTIVE",
2316        });
2317    }
2318
2319    if !gsi.is_empty() {
2320        let gsi_json: Vec<Value> = gsi
2321            .iter()
2322            .map(|g| {
2323                let gks: Vec<Value> = g
2324                    .key_schema
2325                    .iter()
2326                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2327                    .collect();
2328                let mut idx = json!({
2329                    "IndexName": g.index_name,
2330                    "KeySchema": gks,
2331                    "Projection": { "ProjectionType": g.projection.projection_type },
2332                    "IndexStatus": "ACTIVE",
2333                    "IndexArn": format!("{arn}/index/{}", g.index_name),
2334                    "ItemCount": 0,
2335                    "IndexSizeBytes": 0,
2336                });
2337                if !g.projection.non_key_attributes.is_empty() {
2338                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
2339                }
2340                if let Some(ref pt) = g.provisioned_throughput {
2341                    idx["ProvisionedThroughput"] = json!({
2342                        "ReadCapacityUnits": pt.read_capacity_units,
2343                        "WriteCapacityUnits": pt.write_capacity_units,
2344                        "NumberOfDecreasesToday": 0,
2345                    });
2346                }
2347                if let Some(ref odt) = g.on_demand_throughput {
2348                    idx["OnDemandThroughput"] = json!({
2349                        "MaxReadRequestUnits": odt.max_read_request_units,
2350                        "MaxWriteRequestUnits": odt.max_write_request_units,
2351                    });
2352                }
2353                idx
2354            })
2355            .collect();
2356        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
2357    }
2358
2359    if !lsi.is_empty() {
2360        let lsi_json: Vec<Value> = lsi
2361            .iter()
2362            .map(|l| {
2363                let lks: Vec<Value> = l
2364                    .key_schema
2365                    .iter()
2366                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2367                    .collect();
2368                let mut idx = json!({
2369                    "IndexName": l.index_name,
2370                    "KeySchema": lks,
2371                    "Projection": { "ProjectionType": l.projection.projection_type },
2372                    "IndexArn": format!("{arn}/index/{}", l.index_name),
2373                    "ItemCount": 0,
2374                    "IndexSizeBytes": 0,
2375                });
2376                if !l.projection.non_key_attributes.is_empty() {
2377                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
2378                }
2379                idx
2380            })
2381            .collect();
2382        desc["LocalSecondaryIndexes"] = json!(lsi_json);
2383    }
2384
2385    desc
2386}
2387
2388fn build_table_description(table: &DynamoTable) -> Value {
2389    let mut desc = build_table_description_json(&TableDescriptionInput {
2390        arn: &table.arn,
2391        table_id: &table.table_id,
2392        key_schema: &table.key_schema,
2393        attribute_definitions: &table.attribute_definitions,
2394        provisioned_throughput: &table.provisioned_throughput,
2395        gsi: &table.gsi,
2396        lsi: &table.lsi,
2397        billing_mode: &table.billing_mode,
2398        created_at: table.created_at,
2399        item_count: table.item_count,
2400        size_bytes: table.size_bytes,
2401        status: &table.status,
2402        deletion_protection_enabled: table.deletion_protection_enabled,
2403        on_demand_throughput: table.on_demand_throughput.as_ref(),
2404    });
2405
2406    // `LatestStreamArn` / `LatestStreamLabel` persist after a stream has
2407    // been created, even if streams are currently disabled — real AWS
2408    // keeps them for ~24h post-disable so DescribeTable callers can still
2409    // observe the last active stream. fakecloud keeps them for the
2410    // table's lifetime, which is sufficient for any single test run.
2411    if let Some(ref stream_arn) = table.stream_arn {
2412        desc["LatestStreamArn"] = json!(stream_arn);
2413        desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
2414    }
2415    // The `StreamSpecification` block is only present while streams are
2416    // actively enabled. When absent, the Terraform provider Read falls
2417    // through to the prior `stream_view_type` from its own state rather
2418    // than clearing it, which matches the diff behaviour the upstream
2419    // acceptance tests assert on.
2420    if table.stream_enabled {
2421        if let Some(ref view_type) = table.stream_view_type {
2422            desc["StreamSpecification"] = json!({
2423                "StreamEnabled": true,
2424                "StreamViewType": view_type,
2425            });
2426        }
2427    }
2428
2429    // SSEDescription is only returned when the customer explicitly enabled
2430    // a KMS-backed SSE. Real AWS tables using the default AWS-owned key omit
2431    // this field entirely, and the Terraform provider's Read asserts
2432    // `server_side_encryption.#` == 0 in that case.
2433    if let Some(ref sse_type) = table.sse_type {
2434        let mut sse_desc = json!({
2435            "Status": "ENABLED",
2436            "SSEType": sse_type,
2437        });
2438        if let Some(ref key_arn) = table.sse_kms_key_arn {
2439            sse_desc["KMSMasterKeyArn"] = json!(key_arn);
2440        }
2441        desc["SSEDescription"] = sse_desc;
2442    }
2443
2444    desc
2445}
2446
2447fn execute_partiql_statement(
2448    state: &SharedDynamoDbState,
2449    statement: &str,
2450    parameters: &[Value],
2451) -> Result<AwsResponse, AwsServiceError> {
2452    let trimmed = statement.trim();
2453    let upper = trimmed.to_ascii_uppercase();
2454
2455    if upper.starts_with("SELECT") {
2456        execute_partiql_select(state, trimmed, parameters)
2457    } else if upper.starts_with("INSERT") {
2458        execute_partiql_insert(state, trimmed, parameters)
2459    } else if upper.starts_with("UPDATE") {
2460        execute_partiql_update(state, trimmed, parameters)
2461    } else if upper.starts_with("DELETE") {
2462        execute_partiql_delete(state, trimmed, parameters)
2463    } else {
2464        Err(AwsServiceError::aws_error(
2465            StatusCode::BAD_REQUEST,
2466            "ValidationException",
2467            format!("Unsupported PartiQL statement: {trimmed}"),
2468        ))
2469    }
2470}
2471
2472/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
2473fn execute_partiql_select(
2474    state: &SharedDynamoDbState,
2475    statement: &str,
2476    parameters: &[Value],
2477) -> Result<AwsResponse, AwsServiceError> {
2478    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
2479    let upper = statement.to_ascii_uppercase();
2480    let from_pos = upper.find("FROM").ok_or_else(|| {
2481        AwsServiceError::aws_error(
2482            StatusCode::BAD_REQUEST,
2483            "ValidationException",
2484            "Invalid SELECT statement: missing FROM",
2485        )
2486    })?;
2487
2488    let after_from = statement[from_pos + 4..].trim();
2489    let (table_name, rest) = parse_partiql_table_name(after_from);
2490
2491    let __mas = state.read();
2492    let state = __mas.default_ref();
2493    let table = get_table(&state.tables, &table_name)?;
2494
2495    let rest_upper = rest.trim().to_ascii_uppercase();
2496    if rest_upper.starts_with("WHERE") {
2497        let where_clause = rest.trim()[5..].trim();
2498        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
2499        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
2500        DynamoDbService::ok_json(json!({ "Items": items }))
2501    } else {
2502        // No WHERE, return all items
2503        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
2504        DynamoDbService::ok_json(json!({ "Items": items }))
2505    }
2506}
2507
2508fn execute_partiql_insert(
2509    state: &SharedDynamoDbState,
2510    statement: &str,
2511    parameters: &[Value],
2512) -> Result<AwsResponse, AwsServiceError> {
2513    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
2514    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
2515    let upper = statement.to_ascii_uppercase();
2516    let into_pos = upper.find("INTO").ok_or_else(|| {
2517        AwsServiceError::aws_error(
2518            StatusCode::BAD_REQUEST,
2519            "ValidationException",
2520            "Invalid INSERT statement: missing INTO",
2521        )
2522    })?;
2523
2524    let after_into = statement[into_pos + 4..].trim();
2525    let (table_name, rest) = parse_partiql_table_name(after_into);
2526
2527    let rest_upper = rest.trim().to_ascii_uppercase();
2528    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
2529        AwsServiceError::aws_error(
2530            StatusCode::BAD_REQUEST,
2531            "ValidationException",
2532            "Invalid INSERT statement: missing VALUE",
2533        )
2534    })?;
2535
2536    let value_str = rest.trim()[value_pos + 5..].trim();
2537    let item = parse_partiql_value_object(value_str, parameters)?;
2538
2539    let mut __mas = state.write();
2540    let state = __mas.default_mut();
2541    let table = get_table_mut(&mut state.tables, &table_name)?;
2542    let key = extract_key(table, &item);
2543    if table.find_item_index(&key).is_some() {
2544        // DynamoDB PartiQL INSERT fails if item exists
2545        return Err(AwsServiceError::aws_error(
2546            StatusCode::BAD_REQUEST,
2547            "DuplicateItemException",
2548            "Duplicate primary key exists in table",
2549        ));
2550    } else {
2551        table.items.push(item);
2552    }
2553    table.recalculate_stats();
2554
2555    DynamoDbService::ok_json(json!({}))
2556}
2557
2558fn execute_partiql_update(
2559    state: &SharedDynamoDbState,
2560    statement: &str,
2561    parameters: &[Value],
2562) -> Result<AwsResponse, AwsServiceError> {
2563    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
2564    // or: UPDATE "tablename" SET attr=? WHERE pk=?
2565    let after_update = statement[6..].trim(); // skip "UPDATE"
2566    let (table_name, rest) = parse_partiql_table_name(after_update);
2567
2568    let rest_upper = rest.trim().to_ascii_uppercase();
2569    let set_pos = rest_upper.find("SET").ok_or_else(|| {
2570        AwsServiceError::aws_error(
2571            StatusCode::BAD_REQUEST,
2572            "ValidationException",
2573            "Invalid UPDATE statement: missing SET",
2574        )
2575    })?;
2576
2577    let after_set = rest.trim()[set_pos + 3..].trim();
2578
2579    // Split on WHERE
2580    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
2581    let (set_clause, where_clause) = if let Some(wp) = where_pos {
2582        (&after_set[..wp], after_set[wp + 5..].trim())
2583    } else {
2584        (after_set, "")
2585    };
2586
2587    let mut __mas = state.write();
2588    let state = __mas.default_mut();
2589    let table = get_table_mut(&mut state.tables, &table_name)?;
2590
2591    let matched_indices = if !where_clause.is_empty() {
2592        find_partiql_where_indices(table, where_clause, parameters)?
2593    } else {
2594        (0..table.items.len()).collect()
2595    };
2596
2597    // Parse SET assignments: attr=value, attr2=value2
2598    let param_offset = count_params_in_str(where_clause);
2599    let assignments: Vec<&str> = set_clause.split(',').collect();
2600    for idx in &matched_indices {
2601        let mut local_offset = param_offset;
2602        for assignment in &assignments {
2603            let assignment = assignment.trim();
2604            if let Some((attr, val_str)) = assignment.split_once('=') {
2605                let attr = attr.trim().trim_matches('"');
2606                let val_str = val_str.trim();
2607                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
2608                if let Some(v) = value {
2609                    table.items[*idx].insert(attr.to_string(), v);
2610                }
2611            }
2612        }
2613    }
2614    table.recalculate_stats();
2615
2616    DynamoDbService::ok_json(json!({}))
2617}
2618
2619fn execute_partiql_delete(
2620    state: &SharedDynamoDbState,
2621    statement: &str,
2622    parameters: &[Value],
2623) -> Result<AwsResponse, AwsServiceError> {
2624    // Pattern: DELETE FROM "tablename" WHERE pk='val'
2625    let upper = statement.to_ascii_uppercase();
2626    let from_pos = upper.find("FROM").ok_or_else(|| {
2627        AwsServiceError::aws_error(
2628            StatusCode::BAD_REQUEST,
2629            "ValidationException",
2630            "Invalid DELETE statement: missing FROM",
2631        )
2632    })?;
2633
2634    let after_from = statement[from_pos + 4..].trim();
2635    let (table_name, rest) = parse_partiql_table_name(after_from);
2636
2637    let rest_upper = rest.trim().to_ascii_uppercase();
2638    if !rest_upper.starts_with("WHERE") {
2639        return Err(AwsServiceError::aws_error(
2640            StatusCode::BAD_REQUEST,
2641            "ValidationException",
2642            "DELETE requires a WHERE clause",
2643        ));
2644    }
2645    let where_clause = rest.trim()[5..].trim();
2646
2647    let mut __mas = state.write();
2648    let state = __mas.default_mut();
2649    let table = get_table_mut(&mut state.tables, &table_name)?;
2650
2651    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
2652    // Remove from highest index first to avoid invalidating lower indices
2653    indices.sort_unstable();
2654    indices.reverse();
2655    for idx in indices {
2656        table.items.remove(idx);
2657    }
2658    table.recalculate_stats();
2659
2660    DynamoDbService::ok_json(json!({}))
2661}
2662
2663/// Parse a table name that may be quoted with double quotes.
2664/// Returns (table_name, rest_of_string).
2665fn parse_partiql_table_name(s: &str) -> (String, &str) {
2666    let s = s.trim();
2667    if let Some(stripped) = s.strip_prefix('"') {
2668        // Quoted name
2669        if let Some(end) = stripped.find('"') {
2670            let name = &stripped[..end];
2671            let rest = &stripped[end + 1..];
2672            (name.to_string(), rest)
2673        } else {
2674            let end = s.find(' ').unwrap_or(s.len());
2675            (s[..end].trim_matches('"').to_string(), &s[end..])
2676        }
2677    } else {
2678        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
2679        (s[..end].to_string(), &s[end..])
2680    }
2681}
2682
2683/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
2684/// Returns matching items.
2685fn evaluate_partiql_where<'a>(
2686    table: &'a DynamoTable,
2687    where_clause: &str,
2688    parameters: &[Value],
2689) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
2690    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
2691    Ok(indices.iter().map(|i| &table.items[*i]).collect())
2692}
2693
2694fn find_partiql_where_indices(
2695    table: &DynamoTable,
2696    where_clause: &str,
2697    parameters: &[Value],
2698) -> Result<Vec<usize>, AwsServiceError> {
2699    let conditions = split_partiql_and_clauses(where_clause);
2700    let parsed_conditions = parse_partiql_equality_conditions(&conditions, parameters);
2701
2702    let mut indices = Vec::new();
2703    for (i, item) in table.items.iter().enumerate() {
2704        let all_match = parsed_conditions
2705            .iter()
2706            .all(|(attr, expected)| item.get(attr) == Some(expected));
2707        if all_match {
2708            indices.push(i);
2709        }
2710    }
2711
2712    Ok(indices)
2713}
2714
2715/// Split a PartiQL WHERE clause on case-insensitive ` AND ` boundaries.
2716fn split_partiql_and_clauses(where_clause: &str) -> Vec<&str> {
2717    let upper = where_clause.to_uppercase();
2718    if !upper.contains(" AND ") {
2719        return vec![where_clause.trim()];
2720    }
2721    let mut parts = Vec::new();
2722    let mut last = 0;
2723    for (i, _) in upper.match_indices(" AND ") {
2724        parts.push(where_clause[last..i].trim());
2725        last = i + 5;
2726    }
2727    parts.push(where_clause[last..].trim());
2728    parts
2729}
2730
2731/// Parse each `col = literal` (or `col = ?`) condition into an
2732/// `(attribute_name, expected_AttributeValue)` pair. Conditions that
2733/// don't parse as equality, or whose RHS literal can't be resolved, are
2734/// silently dropped — that mirrors the prior inline behavior.
2735fn parse_partiql_equality_conditions(
2736    conditions: &[&str],
2737    parameters: &[Value],
2738) -> Vec<(String, Value)> {
2739    let mut param_idx = 0usize;
2740    let mut parsed = Vec::new();
2741    for cond in conditions {
2742        let cond = cond.trim();
2743        if let Some((left, right)) = cond.split_once('=') {
2744            let attr = left.trim().trim_matches('"').to_string();
2745            let val_str = right.trim();
2746            if let Some(value) = parse_partiql_literal(val_str, parameters, &mut param_idx) {
2747                parsed.push((attr, value));
2748            }
2749        }
2750    }
2751    parsed
2752}
2753
2754/// Parse a PartiQL literal value. Supports:
2755/// - 'string' -> {"S": "string"}
2756/// - 123 -> {"N": "123"}
2757/// - ? -> parameter from list
2758fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
2759    let s = s.trim();
2760    if s == "?" {
2761        let idx = *param_idx;
2762        *param_idx += 1;
2763        parameters.get(idx).cloned()
2764    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
2765        let inner = &s[1..s.len() - 1];
2766        Some(json!({"S": inner}))
2767    } else if let Ok(n) = s.parse::<f64>() {
2768        let num_str = if n == n.trunc() {
2769            format!("{}", n as i64)
2770        } else {
2771            format!("{n}")
2772        };
2773        Some(json!({"N": num_str}))
2774    } else {
2775        None
2776    }
2777}
2778
2779/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
2780fn parse_partiql_value_object(
2781    s: &str,
2782    parameters: &[Value],
2783) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
2784    let s = s.trim();
2785    let inner = s
2786        .strip_prefix('{')
2787        .and_then(|s| s.strip_suffix('}'))
2788        .ok_or_else(|| {
2789            AwsServiceError::aws_error(
2790                StatusCode::BAD_REQUEST,
2791                "ValidationException",
2792                "Invalid VALUE: expected object literal",
2793            )
2794        })?;
2795
2796    let mut item = HashMap::new();
2797    let mut param_idx = 0usize;
2798
2799    // Simple comma-separated key:value parsing
2800    for pair in split_partiql_pairs(inner) {
2801        let pair = pair.trim();
2802        if pair.is_empty() {
2803            continue;
2804        }
2805        if let Some((key_part, val_part)) = pair.split_once(':') {
2806            let key = key_part
2807                .trim()
2808                .trim_matches('\'')
2809                .trim_matches('"')
2810                .to_string();
2811            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
2812                item.insert(key, val);
2813            }
2814        }
2815    }
2816
2817    Ok(item)
2818}
2819
2820/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
2821fn split_partiql_pairs(s: &str) -> Vec<&str> {
2822    let mut parts = Vec::new();
2823    let mut start = 0;
2824    let mut depth = 0;
2825    let mut in_quote = false;
2826
2827    for (i, c) in s.char_indices() {
2828        match c {
2829            '\'' if !in_quote => in_quote = true,
2830            '\'' if in_quote => in_quote = false,
2831            '{' if !in_quote => depth += 1,
2832            '}' if !in_quote => depth -= 1,
2833            ',' if !in_quote && depth == 0 => {
2834                parts.push(&s[start..i]);
2835                start = i + 1;
2836            }
2837            _ => {}
2838        }
2839    }
2840    parts.push(&s[start..]);
2841    parts
2842}
2843
2844/// Count ? parameters in a string.
2845fn count_params_in_str(s: &str) -> usize {
2846    s.chars().filter(|c| *c == '?').count()
2847}
2848
2849#[cfg(test)]
2850mod tests {
2851    use super::*;
2852    use serde_json::json;
2853
2854    #[test]
2855    fn test_parse_update_clauses_set() {
2856        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
2857        assert_eq!(clauses.len(), 1);
2858        assert_eq!(clauses[0].0, UpdateAction::Set);
2859        assert_eq!(clauses[0].1.len(), 2);
2860    }
2861
2862    #[test]
2863    fn test_parse_update_clauses_set_and_remove() {
2864        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
2865        assert_eq!(clauses.len(), 2);
2866        assert_eq!(clauses[0].0, UpdateAction::Set);
2867        assert_eq!(clauses[1].0, UpdateAction::Remove);
2868    }
2869
2870    #[test]
2871    fn test_parse_update_clauses_list_append_single_assignment() {
2872        // Before fix: naive comma split tore list_append(#0, :0) at the
2873        // inner comma, producing two bogus assignments instead of one.
2874        let clauses = parse_update_clauses("SET #0 = list_append(#0, :0)");
2875        assert_eq!(clauses.len(), 1);
2876        assert_eq!(clauses[0].0, UpdateAction::Set);
2877        assert_eq!(
2878            clauses[0].1.len(),
2879            1,
2880            "list_append(a, b) must be kept as a single assignment, not split at the inner comma"
2881        );
2882    }
2883
2884    #[test]
2885    fn test_parse_update_clauses_list_append_mixed_with_plain_set() {
2886        // list_append assignment followed by a plain SET — the comma between
2887        // the two assignments must still split them, while the comma inside
2888        // the list_append call must not.
2889        let clauses = parse_update_clauses("SET #0 = list_append(#0, :new), #1 = :other");
2890        assert_eq!(clauses.len(), 1);
2891        assert_eq!(clauses[0].0, UpdateAction::Set);
2892        assert_eq!(
2893            clauses[0].1.len(),
2894            2,
2895            "two SET assignments: one list_append and one plain"
2896        );
2897    }
2898
2899    #[test]
2900    fn test_evaluate_key_condition_simple() {
2901        let mut item = HashMap::new();
2902        item.insert("pk".to_string(), json!({"S": "user1"}));
2903        item.insert("sk".to_string(), json!({"S": "order1"}));
2904
2905        let mut expr_values = HashMap::new();
2906        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
2907
2908        assert!(evaluate_key_condition(
2909            "pk = :pk",
2910            &item,
2911            &HashMap::new(),
2912            &expr_values,
2913        ));
2914    }
2915
2916    #[test]
2917    fn test_compare_attribute_values_numbers() {
2918        let a = json!({"N": "10"});
2919        let b = json!({"N": "20"});
2920        assert_eq!(
2921            compare_attribute_values(Some(&a), Some(&b)),
2922            std::cmp::Ordering::Less
2923        );
2924    }
2925
2926    #[test]
2927    fn test_compare_attribute_values_strings() {
2928        let a = json!({"S": "apple"});
2929        let b = json!({"S": "banana"});
2930        assert_eq!(
2931            compare_attribute_values(Some(&a), Some(&b)),
2932            std::cmp::Ordering::Less
2933        );
2934    }
2935
2936    #[test]
2937    fn test_split_on_and() {
2938        let parts = split_on_and("pk = :pk AND sk > :sk");
2939        assert_eq!(parts.len(), 2);
2940        assert_eq!(parts[0].trim(), "pk = :pk");
2941        assert_eq!(parts[1].trim(), "sk > :sk");
2942    }
2943
2944    #[test]
2945    fn test_split_on_and_respects_parentheses() {
2946        // Before fix: split_on_and would split inside the parens
2947        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
2948        // Should NOT split on the AND inside parentheses
2949        assert_eq!(parts.len(), 1);
2950        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
2951    }
2952
2953    #[test]
2954    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
2955        // (a AND b) OR c — should match when c is true but a is false
2956        let mut item = HashMap::new();
2957        item.insert("x".to_string(), json!({"S": "no"}));
2958        item.insert("y".to_string(), json!({"S": "no"}));
2959        item.insert("z".to_string(), json!({"S": "yes"}));
2960
2961        let mut expr_values = HashMap::new();
2962        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
2963
2964        // x=yes AND y=yes => false, but z=yes => true => overall true
2965        let result = evaluate_filter_expression(
2966            "(x = :yes AND y = :yes) OR z = :yes",
2967            &item,
2968            &HashMap::new(),
2969            &expr_values,
2970        );
2971        assert!(result, "should match because z = :yes is true");
2972
2973        // x=yes AND y=yes => false, z=yes => false => overall false
2974        let mut item2 = HashMap::new();
2975        item2.insert("x".to_string(), json!({"S": "no"}));
2976        item2.insert("y".to_string(), json!({"S": "no"}));
2977        item2.insert("z".to_string(), json!({"S": "no"}));
2978
2979        let result2 = evaluate_filter_expression(
2980            "(x = :yes AND y = :yes) OR z = :yes",
2981            &item2,
2982            &HashMap::new(),
2983            &expr_values,
2984        );
2985        assert!(!result2, "should not match because nothing is true");
2986    }
2987
2988    #[test]
2989    fn test_project_item_nested_path() {
2990        // Item with a list attribute containing maps
2991        let mut item = HashMap::new();
2992        item.insert("pk".to_string(), json!({"S": "key1"}));
2993        item.insert(
2994            "data".to_string(),
2995            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
2996        );
2997
2998        let body = json!({
2999            "ProjectionExpression": "data[0].name"
3000        });
3001
3002        let projected = project_item(&item, &body);
3003        // Should contain data[0].name = "Alice", not the entire data[0] element
3004        let name = projected
3005            .get("data")
3006            .and_then(|v| v.get("L"))
3007            .and_then(|v| v.get(0))
3008            .and_then(|v| v.get("M"))
3009            .and_then(|v| v.get("name"))
3010            .and_then(|v| v.get("S"))
3011            .and_then(|v| v.as_str());
3012        assert_eq!(name, Some("Alice"));
3013
3014        // Should NOT contain the "age" field
3015        let age = projected
3016            .get("data")
3017            .and_then(|v| v.get("L"))
3018            .and_then(|v| v.get(0))
3019            .and_then(|v| v.get("M"))
3020            .and_then(|v| v.get("age"));
3021        assert!(age.is_none(), "age should not be present in projection");
3022    }
3023
3024    #[test]
3025    fn test_resolve_nested_path_map() {
3026        let mut item = HashMap::new();
3027        item.insert(
3028            "info".to_string(),
3029            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
3030        );
3031
3032        let result = resolve_nested_path(&item, "info.address.city");
3033        assert_eq!(result, Some(json!({"S": "NYC"})));
3034    }
3035
3036    #[test]
3037    fn test_resolve_nested_path_list_then_map() {
3038        let mut item = HashMap::new();
3039        item.insert(
3040            "items".to_string(),
3041            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
3042        );
3043
3044        let result = resolve_nested_path(&item, "items[0].sku");
3045        assert_eq!(result, Some(json!({"S": "ABC"})));
3046    }
3047
3048    // -- Integration-style tests using DynamoDbService --
3049
3050    use crate::state::SharedDynamoDbState;
3051    use parking_lot::RwLock;
3052    use std::sync::Arc;
3053
3054    fn make_service() -> DynamoDbService {
3055        let state: SharedDynamoDbState = Arc::new(RwLock::new(
3056            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
3057        ));
3058        DynamoDbService::new(state)
3059    }
3060
3061    fn make_request(action: &str, body: Value) -> AwsRequest {
3062        AwsRequest {
3063            service: "dynamodb".to_string(),
3064            action: action.to_string(),
3065            region: "us-east-1".to_string(),
3066            account_id: "123456789012".to_string(),
3067            request_id: "test-id".to_string(),
3068            headers: http::HeaderMap::new(),
3069            query_params: HashMap::new(),
3070            body: serde_json::to_vec(&body).unwrap().into(),
3071            path_segments: vec![],
3072            raw_path: "/".to_string(),
3073            raw_query: String::new(),
3074            method: http::Method::POST,
3075            is_query_protocol: false,
3076            access_key_id: None,
3077            principal: None,
3078        }
3079    }
3080
3081    fn create_test_table(svc: &DynamoDbService) {
3082        let req = make_request(
3083            "CreateTable",
3084            json!({
3085                "TableName": "test-table",
3086                "KeySchema": [
3087                    { "AttributeName": "pk", "KeyType": "HASH" }
3088                ],
3089                "AttributeDefinitions": [
3090                    { "AttributeName": "pk", "AttributeType": "S" }
3091                ],
3092                "BillingMode": "PAY_PER_REQUEST"
3093            }),
3094        );
3095        svc.create_table(&req).unwrap();
3096    }
3097
3098    #[test]
3099    fn describe_table_returns_stable_table_id_and_active_warm_throughput() {
3100        let svc = make_service();
3101        let req = make_request(
3102            "CreateTable",
3103            json!({
3104                "TableName": "warm-throughput-table",
3105                "KeySchema": [
3106                    { "AttributeName": "pk", "KeyType": "HASH" }
3107                ],
3108                "AttributeDefinitions": [
3109                    { "AttributeName": "pk", "AttributeType": "S" }
3110                ],
3111                "BillingMode": "PAY_PER_REQUEST"
3112            }),
3113        );
3114        let create_resp = svc.create_table(&req).unwrap();
3115        let create_body: Value = serde_json::from_slice(create_resp.body.expect_bytes()).unwrap();
3116        let create_table = &create_body["TableDescription"];
3117
3118        assert_eq!(create_table["TableStatus"], "ACTIVE");
3119        assert_eq!(create_table["WarmThroughput"]["Status"], "ACTIVE");
3120        let table_id = create_table["TableId"].as_str().unwrap().to_string();
3121        assert!(!table_id.is_empty());
3122
3123        let describe_req = make_request(
3124            "DescribeTable",
3125            json!({ "TableName": "warm-throughput-table" }),
3126        );
3127        let describe_resp = svc.describe_table(&describe_req).unwrap();
3128        let describe_body: Value =
3129            serde_json::from_slice(describe_resp.body.expect_bytes()).unwrap();
3130        let described_table = &describe_body["Table"];
3131
3132        assert_eq!(described_table["TableStatus"], "ACTIVE");
3133        assert_eq!(described_table["WarmThroughput"]["Status"], "ACTIVE");
3134        assert_eq!(described_table["TableId"], table_id);
3135
3136        let describe_resp_again = svc.describe_table(&describe_req).unwrap();
3137        let describe_body_again: Value =
3138            serde_json::from_slice(describe_resp_again.body.expect_bytes()).unwrap();
3139        assert_eq!(describe_body_again["Table"]["TableId"], table_id);
3140    }
3141
3142    #[test]
3143    fn delete_item_return_values_all_old() {
3144        let svc = make_service();
3145        create_test_table(&svc);
3146
3147        // Put an item
3148        let req = make_request(
3149            "PutItem",
3150            json!({
3151                "TableName": "test-table",
3152                "Item": {
3153                    "pk": { "S": "key1" },
3154                    "name": { "S": "Alice" },
3155                    "age": { "N": "30" }
3156                }
3157            }),
3158        );
3159        svc.put_item(&req).unwrap();
3160
3161        // Delete with ReturnValues=ALL_OLD
3162        let req = make_request(
3163            "DeleteItem",
3164            json!({
3165                "TableName": "test-table",
3166                "Key": { "pk": { "S": "key1" } },
3167                "ReturnValues": "ALL_OLD"
3168            }),
3169        );
3170        let resp = svc.delete_item(&req).unwrap();
3171        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3172
3173        // Verify the old item is returned
3174        let attrs = &body["Attributes"];
3175        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
3176        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
3177        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
3178
3179        // Verify the item is actually deleted
3180        let req = make_request(
3181            "GetItem",
3182            json!({
3183                "TableName": "test-table",
3184                "Key": { "pk": { "S": "key1" } }
3185            }),
3186        );
3187        let resp = svc.get_item(&req).unwrap();
3188        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3189        assert!(body.get("Item").is_none(), "item should be deleted");
3190    }
3191
3192    #[test]
3193    fn transact_get_items_returns_existing_and_missing() {
3194        let svc = make_service();
3195        create_test_table(&svc);
3196
3197        // Put one item
3198        let req = make_request(
3199            "PutItem",
3200            json!({
3201                "TableName": "test-table",
3202                "Item": {
3203                    "pk": { "S": "exists" },
3204                    "val": { "S": "hello" }
3205                }
3206            }),
3207        );
3208        svc.put_item(&req).unwrap();
3209
3210        let req = make_request(
3211            "TransactGetItems",
3212            json!({
3213                "TransactItems": [
3214                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
3215                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
3216                ]
3217            }),
3218        );
3219        let resp = svc.transact_get_items(&req).unwrap();
3220        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3221        let responses = body["Responses"].as_array().unwrap();
3222        assert_eq!(responses.len(), 2);
3223        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
3224        assert!(responses[1].get("Item").is_none());
3225    }
3226
3227    #[test]
3228    fn transact_write_items_put_and_delete() {
3229        let svc = make_service();
3230        create_test_table(&svc);
3231
3232        // Put initial item
3233        let req = make_request(
3234            "PutItem",
3235            json!({
3236                "TableName": "test-table",
3237                "Item": {
3238                    "pk": { "S": "to-delete" },
3239                    "val": { "S": "bye" }
3240                }
3241            }),
3242        );
3243        svc.put_item(&req).unwrap();
3244
3245        // TransactWrite: put new + delete existing
3246        let req = make_request(
3247            "TransactWriteItems",
3248            json!({
3249                "TransactItems": [
3250                    {
3251                        "Put": {
3252                            "TableName": "test-table",
3253                            "Item": {
3254                                "pk": { "S": "new-item" },
3255                                "val": { "S": "hi" }
3256                            }
3257                        }
3258                    },
3259                    {
3260                        "Delete": {
3261                            "TableName": "test-table",
3262                            "Key": { "pk": { "S": "to-delete" } }
3263                        }
3264                    }
3265                ]
3266            }),
3267        );
3268        let resp = svc.transact_write_items(&req).unwrap();
3269        assert_eq!(resp.status, StatusCode::OK);
3270
3271        // Verify new item exists
3272        let req = make_request(
3273            "GetItem",
3274            json!({
3275                "TableName": "test-table",
3276                "Key": { "pk": { "S": "new-item" } }
3277            }),
3278        );
3279        let resp = svc.get_item(&req).unwrap();
3280        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3281        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
3282
3283        // Verify deleted item is gone
3284        let req = make_request(
3285            "GetItem",
3286            json!({
3287                "TableName": "test-table",
3288                "Key": { "pk": { "S": "to-delete" } }
3289            }),
3290        );
3291        let resp = svc.get_item(&req).unwrap();
3292        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3293        assert!(body.get("Item").is_none());
3294    }
3295
3296    #[test]
3297    fn transact_write_items_condition_check_failure() {
3298        let svc = make_service();
3299        create_test_table(&svc);
3300
3301        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
3302        let req = make_request(
3303            "TransactWriteItems",
3304            json!({
3305                "TransactItems": [
3306                    {
3307                        "ConditionCheck": {
3308                            "TableName": "test-table",
3309                            "Key": { "pk": { "S": "nonexistent" } },
3310                            "ConditionExpression": "attribute_exists(pk)"
3311                        }
3312                    }
3313                ]
3314            }),
3315        );
3316        let resp = svc.transact_write_items(&req).unwrap();
3317        // Should be a 400 error response
3318        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
3319        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3320        assert_eq!(
3321            body["__type"].as_str().unwrap(),
3322            "TransactionCanceledException"
3323        );
3324        assert!(body["CancellationReasons"].as_array().is_some());
3325    }
3326
3327    #[test]
3328    fn update_and_describe_time_to_live() {
3329        let svc = make_service();
3330        create_test_table(&svc);
3331
3332        // Enable TTL
3333        let req = make_request(
3334            "UpdateTimeToLive",
3335            json!({
3336                "TableName": "test-table",
3337                "TimeToLiveSpecification": {
3338                    "AttributeName": "ttl",
3339                    "Enabled": true
3340                }
3341            }),
3342        );
3343        let resp = svc.update_time_to_live(&req).unwrap();
3344        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3345        assert_eq!(
3346            body["TimeToLiveSpecification"]["AttributeName"]
3347                .as_str()
3348                .unwrap(),
3349            "ttl"
3350        );
3351        assert!(body["TimeToLiveSpecification"]["Enabled"]
3352            .as_bool()
3353            .unwrap());
3354
3355        // Describe TTL
3356        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3357        let resp = svc.describe_time_to_live(&req).unwrap();
3358        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3359        assert_eq!(
3360            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3361                .as_str()
3362                .unwrap(),
3363            "ENABLED"
3364        );
3365        assert_eq!(
3366            body["TimeToLiveDescription"]["AttributeName"]
3367                .as_str()
3368                .unwrap(),
3369            "ttl"
3370        );
3371
3372        // Disable TTL
3373        let req = make_request(
3374            "UpdateTimeToLive",
3375            json!({
3376                "TableName": "test-table",
3377                "TimeToLiveSpecification": {
3378                    "AttributeName": "ttl",
3379                    "Enabled": false
3380                }
3381            }),
3382        );
3383        svc.update_time_to_live(&req).unwrap();
3384
3385        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3386        let resp = svc.describe_time_to_live(&req).unwrap();
3387        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3388        assert_eq!(
3389            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3390                .as_str()
3391                .unwrap(),
3392            "DISABLED"
3393        );
3394    }
3395
3396    #[test]
3397    fn resource_policy_lifecycle() {
3398        let svc = make_service();
3399        create_test_table(&svc);
3400
3401        let table_arn = {
3402            let __mas = svc.state.read();
3403            let state = __mas.default_ref();
3404            state.tables.get("test-table").unwrap().arn.clone()
3405        };
3406
3407        // Put policy
3408        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
3409        let req = make_request(
3410            "PutResourcePolicy",
3411            json!({
3412                "ResourceArn": table_arn,
3413                "Policy": policy_doc
3414            }),
3415        );
3416        let resp = svc.put_resource_policy(&req).unwrap();
3417        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3418        assert!(body["RevisionId"].as_str().is_some());
3419
3420        // Get policy
3421        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3422        let resp = svc.get_resource_policy(&req).unwrap();
3423        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3424        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
3425
3426        // Delete policy
3427        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
3428        svc.delete_resource_policy(&req).unwrap();
3429
3430        // Get should return null now
3431        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3432        let resp = svc.get_resource_policy(&req).unwrap();
3433        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3434        assert!(body["Policy"].is_null());
3435    }
3436
3437    #[test]
3438    fn describe_endpoints() {
3439        let svc = make_service();
3440        let req = make_request("DescribeEndpoints", json!({}));
3441        let resp = svc.describe_endpoints(&req).unwrap();
3442        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3443        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
3444    }
3445
3446    #[test]
3447    fn describe_limits() {
3448        let svc = make_service();
3449        let req = make_request("DescribeLimits", json!({}));
3450        let resp = svc.describe_limits(&req).unwrap();
3451        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3452        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
3453    }
3454
3455    #[test]
3456    fn backup_lifecycle() {
3457        let svc = make_service();
3458        create_test_table(&svc);
3459
3460        // Create backup
3461        let req = make_request(
3462            "CreateBackup",
3463            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
3464        );
3465        let resp = svc.create_backup(&req).unwrap();
3466        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3467        let backup_arn = body["BackupDetails"]["BackupArn"]
3468            .as_str()
3469            .unwrap()
3470            .to_string();
3471        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
3472
3473        // Describe backup
3474        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
3475        let resp = svc.describe_backup(&req).unwrap();
3476        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3477        assert_eq!(
3478            body["BackupDescription"]["BackupDetails"]["BackupName"],
3479            "my-backup"
3480        );
3481
3482        // List backups
3483        let req = make_request("ListBackups", json!({}));
3484        let resp = svc.list_backups(&req).unwrap();
3485        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3486        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
3487
3488        // Restore from backup
3489        let req = make_request(
3490            "RestoreTableFromBackup",
3491            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
3492        );
3493        svc.restore_table_from_backup(&req).unwrap();
3494
3495        // Verify restored table exists
3496        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
3497        let resp = svc.describe_table(&req).unwrap();
3498        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3499        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3500
3501        // Delete backup
3502        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
3503        svc.delete_backup(&req).unwrap();
3504
3505        // List should be empty
3506        let req = make_request("ListBackups", json!({}));
3507        let resp = svc.list_backups(&req).unwrap();
3508        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3509        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
3510    }
3511
3512    #[test]
3513    fn continuous_backups() {
3514        let svc = make_service();
3515        create_test_table(&svc);
3516
3517        // Initially disabled
3518        let req = make_request(
3519            "DescribeContinuousBackups",
3520            json!({ "TableName": "test-table" }),
3521        );
3522        let resp = svc.describe_continuous_backups(&req).unwrap();
3523        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3524        assert_eq!(
3525            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3526                ["PointInTimeRecoveryStatus"],
3527            "DISABLED"
3528        );
3529
3530        // Enable
3531        let req = make_request(
3532            "UpdateContinuousBackups",
3533            json!({
3534                "TableName": "test-table",
3535                "PointInTimeRecoverySpecification": {
3536                    "PointInTimeRecoveryEnabled": true
3537                }
3538            }),
3539        );
3540        svc.update_continuous_backups(&req).unwrap();
3541
3542        // Verify
3543        let req = make_request(
3544            "DescribeContinuousBackups",
3545            json!({ "TableName": "test-table" }),
3546        );
3547        let resp = svc.describe_continuous_backups(&req).unwrap();
3548        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3549        assert_eq!(
3550            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3551                ["PointInTimeRecoveryStatus"],
3552            "ENABLED"
3553        );
3554    }
3555
3556    #[test]
3557    fn restore_table_to_point_in_time() {
3558        let svc = make_service();
3559        create_test_table(&svc);
3560
3561        let req = make_request(
3562            "RestoreTableToPointInTime",
3563            json!({
3564                "SourceTableName": "test-table",
3565                "TargetTableName": "pitr-restored"
3566            }),
3567        );
3568        svc.restore_table_to_point_in_time(&req).unwrap();
3569
3570        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
3571        let resp = svc.describe_table(&req).unwrap();
3572        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3573        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3574    }
3575
3576    #[test]
3577    fn global_table_lifecycle() {
3578        let svc = make_service();
3579
3580        // Create global table
3581        let req = make_request(
3582            "CreateGlobalTable",
3583            json!({
3584                "GlobalTableName": "my-global",
3585                "ReplicationGroup": [
3586                    { "RegionName": "us-east-1" },
3587                    { "RegionName": "eu-west-1" }
3588                ]
3589            }),
3590        );
3591        let resp = svc.create_global_table(&req).unwrap();
3592        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3593        assert_eq!(
3594            body["GlobalTableDescription"]["GlobalTableStatus"],
3595            "ACTIVE"
3596        );
3597
3598        // Describe
3599        let req = make_request(
3600            "DescribeGlobalTable",
3601            json!({ "GlobalTableName": "my-global" }),
3602        );
3603        let resp = svc.describe_global_table(&req).unwrap();
3604        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3605        assert_eq!(
3606            body["GlobalTableDescription"]["ReplicationGroup"]
3607                .as_array()
3608                .unwrap()
3609                .len(),
3610            2
3611        );
3612
3613        // List
3614        let req = make_request("ListGlobalTables", json!({}));
3615        let resp = svc.list_global_tables(&req).unwrap();
3616        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3617        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
3618
3619        // Update - add a region
3620        let req = make_request(
3621            "UpdateGlobalTable",
3622            json!({
3623                "GlobalTableName": "my-global",
3624                "ReplicaUpdates": [
3625                    { "Create": { "RegionName": "ap-southeast-1" } }
3626                ]
3627            }),
3628        );
3629        let resp = svc.update_global_table(&req).unwrap();
3630        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3631        assert_eq!(
3632            body["GlobalTableDescription"]["ReplicationGroup"]
3633                .as_array()
3634                .unwrap()
3635                .len(),
3636            3
3637        );
3638
3639        // Describe settings
3640        let req = make_request(
3641            "DescribeGlobalTableSettings",
3642            json!({ "GlobalTableName": "my-global" }),
3643        );
3644        let resp = svc.describe_global_table_settings(&req).unwrap();
3645        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3646        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
3647
3648        // Update settings (no-op, just verify no error)
3649        let req = make_request(
3650            "UpdateGlobalTableSettings",
3651            json!({ "GlobalTableName": "my-global" }),
3652        );
3653        svc.update_global_table_settings(&req).unwrap();
3654    }
3655
3656    #[test]
3657    fn table_replica_auto_scaling() {
3658        let svc = make_service();
3659        create_test_table(&svc);
3660
3661        let req = make_request(
3662            "DescribeTableReplicaAutoScaling",
3663            json!({ "TableName": "test-table" }),
3664        );
3665        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
3666        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3667        assert_eq!(
3668            body["TableAutoScalingDescription"]["TableName"],
3669            "test-table"
3670        );
3671
3672        let req = make_request(
3673            "UpdateTableReplicaAutoScaling",
3674            json!({ "TableName": "test-table" }),
3675        );
3676        svc.update_table_replica_auto_scaling(&req).unwrap();
3677    }
3678
3679    #[test]
3680    fn kinesis_streaming_lifecycle() {
3681        let svc = make_service();
3682        create_test_table(&svc);
3683
3684        // Enable
3685        let req = make_request(
3686            "EnableKinesisStreamingDestination",
3687            json!({
3688                "TableName": "test-table",
3689                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3690            }),
3691        );
3692        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
3693        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3694        assert_eq!(body["DestinationStatus"], "ACTIVE");
3695
3696        // Describe
3697        let req = make_request(
3698            "DescribeKinesisStreamingDestination",
3699            json!({ "TableName": "test-table" }),
3700        );
3701        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
3702        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3703        assert_eq!(
3704            body["KinesisDataStreamDestinations"]
3705                .as_array()
3706                .unwrap()
3707                .len(),
3708            1
3709        );
3710
3711        // Update
3712        let req = make_request(
3713            "UpdateKinesisStreamingDestination",
3714            json!({
3715                "TableName": "test-table",
3716                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
3717                "UpdateKinesisStreamingConfiguration": {
3718                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
3719                }
3720            }),
3721        );
3722        svc.update_kinesis_streaming_destination(&req).unwrap();
3723
3724        // Disable
3725        let req = make_request(
3726            "DisableKinesisStreamingDestination",
3727            json!({
3728                "TableName": "test-table",
3729                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3730            }),
3731        );
3732        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
3733        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3734        assert_eq!(body["DestinationStatus"], "DISABLED");
3735    }
3736
3737    #[test]
3738    fn contributor_insights_lifecycle() {
3739        let svc = make_service();
3740        create_test_table(&svc);
3741
3742        // Initially disabled
3743        let req = make_request(
3744            "DescribeContributorInsights",
3745            json!({ "TableName": "test-table" }),
3746        );
3747        let resp = svc.describe_contributor_insights(&req).unwrap();
3748        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3749        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
3750
3751        // Enable
3752        let req = make_request(
3753            "UpdateContributorInsights",
3754            json!({
3755                "TableName": "test-table",
3756                "ContributorInsightsAction": "ENABLE"
3757            }),
3758        );
3759        let resp = svc.update_contributor_insights(&req).unwrap();
3760        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3761        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
3762
3763        // List
3764        let req = make_request("ListContributorInsights", json!({}));
3765        let resp = svc.list_contributor_insights(&req).unwrap();
3766        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3767        assert_eq!(
3768            body["ContributorInsightsSummaries"]
3769                .as_array()
3770                .unwrap()
3771                .len(),
3772            1
3773        );
3774    }
3775
3776    #[test]
3777    fn export_lifecycle() {
3778        let svc = make_service();
3779        create_test_table(&svc);
3780
3781        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
3782
3783        // Export
3784        let req = make_request(
3785            "ExportTableToPointInTime",
3786            json!({
3787                "TableArn": table_arn,
3788                "S3Bucket": "my-bucket"
3789            }),
3790        );
3791        let resp = svc.export_table_to_point_in_time(&req).unwrap();
3792        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3793        let export_arn = body["ExportDescription"]["ExportArn"]
3794            .as_str()
3795            .unwrap()
3796            .to_string();
3797        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
3798
3799        // Describe
3800        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
3801        let resp = svc.describe_export(&req).unwrap();
3802        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3803        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
3804
3805        // List
3806        let req = make_request("ListExports", json!({}));
3807        let resp = svc.list_exports(&req).unwrap();
3808        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3809        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
3810    }
3811
3812    #[test]
3813    fn import_lifecycle() {
3814        let svc = make_service();
3815
3816        let req = make_request(
3817            "ImportTable",
3818            json!({
3819                "InputFormat": "DYNAMODB_JSON",
3820                "S3BucketSource": { "S3Bucket": "import-bucket" },
3821                "TableCreationParameters": {
3822                    "TableName": "imported-table",
3823                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
3824                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
3825                }
3826            }),
3827        );
3828        let resp = svc.import_table(&req).unwrap();
3829        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3830        let import_arn = body["ImportTableDescription"]["ImportArn"]
3831            .as_str()
3832            .unwrap()
3833            .to_string();
3834        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3835
3836        // Describe import
3837        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
3838        let resp = svc.describe_import(&req).unwrap();
3839        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3840        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3841
3842        // List imports
3843        let req = make_request("ListImports", json!({}));
3844        let resp = svc.list_imports(&req).unwrap();
3845        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3846        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
3847
3848        // Verify the table was created
3849        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
3850        let resp = svc.describe_table(&req).unwrap();
3851        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3852        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3853    }
3854
3855    #[test]
3856    fn backup_restore_preserves_items() {
3857        let svc = make_service();
3858        create_test_table(&svc);
3859
3860        // Put 3 items
3861        for i in 1..=3 {
3862            let req = make_request(
3863                "PutItem",
3864                json!({
3865                    "TableName": "test-table",
3866                    "Item": {
3867                        "pk": { "S": format!("key{i}") },
3868                        "data": { "S": format!("value{i}") }
3869                    }
3870                }),
3871            );
3872            svc.put_item(&req).unwrap();
3873        }
3874
3875        // Create backup
3876        let req = make_request(
3877            "CreateBackup",
3878            json!({
3879                "TableName": "test-table",
3880                "BackupName": "my-backup"
3881            }),
3882        );
3883        let resp = svc.create_backup(&req).unwrap();
3884        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3885        let backup_arn = body["BackupDetails"]["BackupArn"]
3886            .as_str()
3887            .unwrap()
3888            .to_string();
3889
3890        // Delete all items from the original table
3891        for i in 1..=3 {
3892            let req = make_request(
3893                "DeleteItem",
3894                json!({
3895                    "TableName": "test-table",
3896                    "Key": { "pk": { "S": format!("key{i}") } }
3897                }),
3898            );
3899            svc.delete_item(&req).unwrap();
3900        }
3901
3902        // Verify original table is empty
3903        let req = make_request("Scan", json!({ "TableName": "test-table" }));
3904        let resp = svc.scan(&req).unwrap();
3905        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3906        assert_eq!(body["Count"], 0);
3907
3908        // Restore from backup
3909        let req = make_request(
3910            "RestoreTableFromBackup",
3911            json!({
3912                "BackupArn": backup_arn,
3913                "TargetTableName": "restored-table"
3914            }),
3915        );
3916        svc.restore_table_from_backup(&req).unwrap();
3917
3918        // Scan restored table — should have 3 items
3919        let req = make_request("Scan", json!({ "TableName": "restored-table" }));
3920        let resp = svc.scan(&req).unwrap();
3921        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3922        assert_eq!(body["Count"], 3);
3923        assert_eq!(body["Items"].as_array().unwrap().len(), 3);
3924    }
3925
3926    #[test]
3927    fn global_table_replicates_writes() {
3928        let svc = make_service();
3929        create_test_table(&svc);
3930
3931        // Create global table with replicas
3932        let req = make_request(
3933            "CreateGlobalTable",
3934            json!({
3935                "GlobalTableName": "test-table",
3936                "ReplicationGroup": [
3937                    { "RegionName": "us-east-1" },
3938                    { "RegionName": "eu-west-1" }
3939                ]
3940            }),
3941        );
3942        let resp = svc.create_global_table(&req).unwrap();
3943        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3944        assert_eq!(
3945            body["GlobalTableDescription"]["GlobalTableStatus"],
3946            "ACTIVE"
3947        );
3948
3949        // Put an item
3950        let req = make_request(
3951            "PutItem",
3952            json!({
3953                "TableName": "test-table",
3954                "Item": {
3955                    "pk": { "S": "replicated-key" },
3956                    "data": { "S": "replicated-value" }
3957                }
3958            }),
3959        );
3960        svc.put_item(&req).unwrap();
3961
3962        // Verify the item is readable (since all replicas share the same table)
3963        let req = make_request(
3964            "GetItem",
3965            json!({
3966                "TableName": "test-table",
3967                "Key": { "pk": { "S": "replicated-key" } }
3968            }),
3969        );
3970        let resp = svc.get_item(&req).unwrap();
3971        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3972        assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
3973        assert_eq!(body["Item"]["data"]["S"], "replicated-value");
3974    }
3975
3976    #[test]
3977    fn contributor_insights_tracks_access() {
3978        let svc = make_service();
3979        create_test_table(&svc);
3980
3981        // Enable contributor insights
3982        let req = make_request(
3983            "UpdateContributorInsights",
3984            json!({
3985                "TableName": "test-table",
3986                "ContributorInsightsAction": "ENABLE"
3987            }),
3988        );
3989        svc.update_contributor_insights(&req).unwrap();
3990
3991        // Put items with different partition keys
3992        for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
3993            let req = make_request(
3994                "PutItem",
3995                json!({
3996                    "TableName": "test-table",
3997                    "Item": {
3998                        "pk": { "S": key },
3999                        "data": { "S": "value" }
4000                    }
4001                }),
4002            );
4003            svc.put_item(&req).unwrap();
4004        }
4005
4006        // Get items (to also track read access)
4007        for _ in 0..3 {
4008            let req = make_request(
4009                "GetItem",
4010                json!({
4011                    "TableName": "test-table",
4012                    "Key": { "pk": { "S": "alpha" } }
4013                }),
4014            );
4015            svc.get_item(&req).unwrap();
4016        }
4017
4018        // Describe contributor insights — should show top contributors
4019        let req = make_request(
4020            "DescribeContributorInsights",
4021            json!({ "TableName": "test-table" }),
4022        );
4023        let resp = svc.describe_contributor_insights(&req).unwrap();
4024        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4025        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
4026
4027        let contributors = body["TopContributors"].as_array().unwrap();
4028        assert!(
4029            !contributors.is_empty(),
4030            "TopContributors should not be empty"
4031        );
4032
4033        // alpha was accessed 3 (put) + 3 (get) = 6 times, beta 2 times
4034        // alpha should be the top contributor
4035        let top = &contributors[0];
4036        assert!(top["Count"].as_u64().unwrap() > 0);
4037
4038        // Verify the rule list is populated
4039        let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
4040        assert!(!rules.is_empty());
4041    }
4042
4043    #[test]
4044    fn contributor_insights_not_tracked_when_disabled() {
4045        let svc = make_service();
4046        create_test_table(&svc);
4047
4048        // Put items without enabling insights
4049        let req = make_request(
4050            "PutItem",
4051            json!({
4052                "TableName": "test-table",
4053                "Item": {
4054                    "pk": { "S": "key1" },
4055                    "data": { "S": "value" }
4056                }
4057            }),
4058        );
4059        svc.put_item(&req).unwrap();
4060
4061        // Describe — should show empty contributors
4062        let req = make_request(
4063            "DescribeContributorInsights",
4064            json!({ "TableName": "test-table" }),
4065        );
4066        let resp = svc.describe_contributor_insights(&req).unwrap();
4067        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4068        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
4069
4070        let contributors = body["TopContributors"].as_array().unwrap();
4071        assert!(contributors.is_empty());
4072    }
4073
4074    #[test]
4075    fn contributor_insights_disabled_table_no_counters_after_scan() {
4076        let svc = make_service();
4077        create_test_table(&svc);
4078
4079        // Put items
4080        for key in &["alpha", "beta"] {
4081            let req = make_request(
4082                "PutItem",
4083                json!({
4084                    "TableName": "test-table",
4085                    "Item": { "pk": { "S": key } }
4086                }),
4087            );
4088            svc.put_item(&req).unwrap();
4089        }
4090
4091        // Enable insights, then scan, then disable, then check counters are cleared
4092        let req = make_request(
4093            "UpdateContributorInsights",
4094            json!({
4095                "TableName": "test-table",
4096                "ContributorInsightsAction": "ENABLE"
4097            }),
4098        );
4099        svc.update_contributor_insights(&req).unwrap();
4100
4101        // Scan to trigger counter collection
4102        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4103        svc.scan(&req).unwrap();
4104
4105        // Verify counters were collected
4106        let req = make_request(
4107            "DescribeContributorInsights",
4108            json!({ "TableName": "test-table" }),
4109        );
4110        let resp = svc.describe_contributor_insights(&req).unwrap();
4111        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4112        let contributors = body["TopContributors"].as_array().unwrap();
4113        assert!(
4114            !contributors.is_empty(),
4115            "counters should be non-empty while enabled"
4116        );
4117
4118        // Disable insights (this clears counters)
4119        let req = make_request(
4120            "UpdateContributorInsights",
4121            json!({
4122                "TableName": "test-table",
4123                "ContributorInsightsAction": "DISABLE"
4124            }),
4125        );
4126        svc.update_contributor_insights(&req).unwrap();
4127
4128        // Scan again -- should NOT accumulate counters since insights is disabled
4129        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4130        svc.scan(&req).unwrap();
4131
4132        // Verify counters are still empty
4133        let req = make_request(
4134            "DescribeContributorInsights",
4135            json!({ "TableName": "test-table" }),
4136        );
4137        let resp = svc.describe_contributor_insights(&req).unwrap();
4138        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4139        let contributors = body["TopContributors"].as_array().unwrap();
4140        assert!(
4141            contributors.is_empty(),
4142            "counters should be empty after disabling insights"
4143        );
4144    }
4145
4146    #[test]
4147    fn scan_pagination_with_limit() {
4148        let svc = make_service();
4149        create_test_table(&svc);
4150
4151        // Insert 5 items
4152        for i in 0..5 {
4153            let req = make_request(
4154                "PutItem",
4155                json!({
4156                    "TableName": "test-table",
4157                    "Item": {
4158                        "pk": { "S": format!("item{i}") },
4159                        "data": { "S": format!("value{i}") }
4160                    }
4161                }),
4162            );
4163            svc.put_item(&req).unwrap();
4164        }
4165
4166        // Scan with limit=2
4167        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
4168        let resp = svc.scan(&req).unwrap();
4169        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4170        assert_eq!(body["Count"], 2);
4171        assert!(
4172            body["LastEvaluatedKey"].is_object(),
4173            "should have LastEvaluatedKey when limit < total items"
4174        );
4175        assert!(body["LastEvaluatedKey"]["pk"].is_object());
4176
4177        // Page through all items
4178        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
4179        let mut lek = body["LastEvaluatedKey"].clone();
4180
4181        while lek.is_object() {
4182            let req = make_request(
4183                "Scan",
4184                json!({
4185                    "TableName": "test-table",
4186                    "Limit": 2,
4187                    "ExclusiveStartKey": lek
4188                }),
4189            );
4190            let resp = svc.scan(&req).unwrap();
4191            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4192            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
4193            lek = body["LastEvaluatedKey"].clone();
4194        }
4195
4196        assert_eq!(
4197            all_items.len(),
4198            5,
4199            "should retrieve all 5 items via pagination"
4200        );
4201    }
4202
4203    #[test]
4204    fn scan_no_pagination_when_all_fit() {
4205        let svc = make_service();
4206        create_test_table(&svc);
4207
4208        for i in 0..3 {
4209            let req = make_request(
4210                "PutItem",
4211                json!({
4212                    "TableName": "test-table",
4213                    "Item": {
4214                        "pk": { "S": format!("item{i}") }
4215                    }
4216                }),
4217            );
4218            svc.put_item(&req).unwrap();
4219        }
4220
4221        // Scan with limit > item count
4222        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
4223        let resp = svc.scan(&req).unwrap();
4224        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4225        assert_eq!(body["Count"], 3);
4226        assert!(
4227            body["LastEvaluatedKey"].is_null(),
4228            "should not have LastEvaluatedKey when all items fit"
4229        );
4230
4231        // Scan without limit
4232        let req = make_request("Scan", json!({ "TableName": "test-table" }));
4233        let resp = svc.scan(&req).unwrap();
4234        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4235        assert_eq!(body["Count"], 3);
4236        assert!(body["LastEvaluatedKey"].is_null());
4237    }
4238
4239    fn create_composite_table(svc: &DynamoDbService) {
4240        let req = make_request(
4241            "CreateTable",
4242            json!({
4243                "TableName": "composite-table",
4244                "KeySchema": [
4245                    { "AttributeName": "pk", "KeyType": "HASH" },
4246                    { "AttributeName": "sk", "KeyType": "RANGE" }
4247                ],
4248                "AttributeDefinitions": [
4249                    { "AttributeName": "pk", "AttributeType": "S" },
4250                    { "AttributeName": "sk", "AttributeType": "S" }
4251                ],
4252                "BillingMode": "PAY_PER_REQUEST"
4253            }),
4254        );
4255        svc.create_table(&req).unwrap();
4256    }
4257
4258    #[test]
4259    fn query_pagination_with_composite_key() {
4260        let svc = make_service();
4261        create_composite_table(&svc);
4262
4263        // Insert 5 items under the same partition key
4264        for i in 0..5 {
4265            let req = make_request(
4266                "PutItem",
4267                json!({
4268                    "TableName": "composite-table",
4269                    "Item": {
4270                        "pk": { "S": "user1" },
4271                        "sk": { "S": format!("item{i:03}") },
4272                        "data": { "S": format!("value{i}") }
4273                    }
4274                }),
4275            );
4276            svc.put_item(&req).unwrap();
4277        }
4278
4279        // Query with limit=2
4280        let req = make_request(
4281            "Query",
4282            json!({
4283                "TableName": "composite-table",
4284                "KeyConditionExpression": "pk = :pk",
4285                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4286                "Limit": 2
4287            }),
4288        );
4289        let resp = svc.query(&req).unwrap();
4290        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4291        assert_eq!(body["Count"], 2);
4292        assert!(body["LastEvaluatedKey"].is_object());
4293        assert!(body["LastEvaluatedKey"]["pk"].is_object());
4294        assert!(body["LastEvaluatedKey"]["sk"].is_object());
4295
4296        // Page through all items
4297        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
4298        let mut lek = body["LastEvaluatedKey"].clone();
4299
4300        while lek.is_object() {
4301            let req = make_request(
4302                "Query",
4303                json!({
4304                    "TableName": "composite-table",
4305                    "KeyConditionExpression": "pk = :pk",
4306                    "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4307                    "Limit": 2,
4308                    "ExclusiveStartKey": lek
4309                }),
4310            );
4311            let resp = svc.query(&req).unwrap();
4312            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4313            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
4314            lek = body["LastEvaluatedKey"].clone();
4315        }
4316
4317        assert_eq!(
4318            all_items.len(),
4319            5,
4320            "should retrieve all 5 items via pagination"
4321        );
4322
4323        // Verify items came back sorted by sort key
4324        let sks: Vec<String> = all_items
4325            .iter()
4326            .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
4327            .collect();
4328        let mut sorted = sks.clone();
4329        sorted.sort();
4330        assert_eq!(sks, sorted, "items should be sorted by sort key");
4331    }
4332
4333    #[test]
4334    fn query_no_pagination_when_all_fit() {
4335        let svc = make_service();
4336        create_composite_table(&svc);
4337
4338        for i in 0..2 {
4339            let req = make_request(
4340                "PutItem",
4341                json!({
4342                    "TableName": "composite-table",
4343                    "Item": {
4344                        "pk": { "S": "user1" },
4345                        "sk": { "S": format!("item{i}") }
4346                    }
4347                }),
4348            );
4349            svc.put_item(&req).unwrap();
4350        }
4351
4352        let req = make_request(
4353            "Query",
4354            json!({
4355                "TableName": "composite-table",
4356                "KeyConditionExpression": "pk = :pk",
4357                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4358                "Limit": 10
4359            }),
4360        );
4361        let resp = svc.query(&req).unwrap();
4362        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4363        assert_eq!(body["Count"], 2);
4364        assert!(
4365            body["LastEvaluatedKey"].is_null(),
4366            "should not have LastEvaluatedKey when all items fit"
4367        );
4368    }
4369
4370    fn create_gsi_table(svc: &DynamoDbService) {
4371        let req = make_request(
4372            "CreateTable",
4373            json!({
4374                "TableName": "gsi-table",
4375                "KeySchema": [
4376                    { "AttributeName": "pk", "KeyType": "HASH" }
4377                ],
4378                "AttributeDefinitions": [
4379                    { "AttributeName": "pk", "AttributeType": "S" },
4380                    { "AttributeName": "gsi_pk", "AttributeType": "S" },
4381                    { "AttributeName": "gsi_sk", "AttributeType": "S" }
4382                ],
4383                "BillingMode": "PAY_PER_REQUEST",
4384                "GlobalSecondaryIndexes": [
4385                    {
4386                        "IndexName": "gsi-index",
4387                        "KeySchema": [
4388                            { "AttributeName": "gsi_pk", "KeyType": "HASH" },
4389                            { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
4390                        ],
4391                        "Projection": { "ProjectionType": "ALL" }
4392                    }
4393                ]
4394            }),
4395        );
4396        svc.create_table(&req).unwrap();
4397    }
4398
4399    #[test]
4400    fn gsi_query_last_evaluated_key_includes_table_pk() {
4401        let svc = make_service();
4402        create_gsi_table(&svc);
4403
4404        // Insert 3 items with the SAME GSI key but different table PKs
4405        for i in 0..3 {
4406            let req = make_request(
4407                "PutItem",
4408                json!({
4409                    "TableName": "gsi-table",
4410                    "Item": {
4411                        "pk": { "S": format!("item{i}") },
4412                        "gsi_pk": { "S": "shared" },
4413                        "gsi_sk": { "S": "sort" }
4414                    }
4415                }),
4416            );
4417            svc.put_item(&req).unwrap();
4418        }
4419
4420        // Query GSI with Limit=1 to trigger pagination
4421        let req = make_request(
4422            "Query",
4423            json!({
4424                "TableName": "gsi-table",
4425                "IndexName": "gsi-index",
4426                "KeyConditionExpression": "gsi_pk = :v",
4427                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4428                "Limit": 1
4429            }),
4430        );
4431        let resp = svc.query(&req).unwrap();
4432        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4433        assert_eq!(body["Count"], 1);
4434        let lek = &body["LastEvaluatedKey"];
4435        assert!(lek.is_object(), "should have LastEvaluatedKey");
4436        // Must contain the index keys
4437        assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
4438        assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
4439        // Must also contain the table PK
4440        assert!(
4441            lek["pk"].is_object(),
4442            "LEK must contain table PK for GSI queries"
4443        );
4444    }
4445
4446    #[test]
4447    fn gsi_query_pagination_returns_all_items() {
4448        let svc = make_service();
4449        create_gsi_table(&svc);
4450
4451        // Insert 4 items with the SAME GSI key but different table PKs
4452        for i in 0..4 {
4453            let req = make_request(
4454                "PutItem",
4455                json!({
4456                    "TableName": "gsi-table",
4457                    "Item": {
4458                        "pk": { "S": format!("item{i:03}") },
4459                        "gsi_pk": { "S": "shared" },
4460                        "gsi_sk": { "S": "sort" }
4461                    }
4462                }),
4463            );
4464            svc.put_item(&req).unwrap();
4465        }
4466
4467        // Paginate through all items with Limit=2
4468        let mut all_pks = Vec::new();
4469        let mut lek: Option<Value> = None;
4470
4471        loop {
4472            let mut query = json!({
4473                "TableName": "gsi-table",
4474                "IndexName": "gsi-index",
4475                "KeyConditionExpression": "gsi_pk = :v",
4476                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4477                "Limit": 2
4478            });
4479            if let Some(ref start_key) = lek {
4480                query["ExclusiveStartKey"] = start_key.clone();
4481            }
4482
4483            let req = make_request("Query", query);
4484            let resp = svc.query(&req).unwrap();
4485            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4486
4487            for item in body["Items"].as_array().unwrap() {
4488                let pk = item["pk"]["S"].as_str().unwrap().to_string();
4489                all_pks.push(pk);
4490            }
4491
4492            if body["LastEvaluatedKey"].is_object() {
4493                lek = Some(body["LastEvaluatedKey"].clone());
4494            } else {
4495                break;
4496            }
4497        }
4498
4499        all_pks.sort();
4500        assert_eq!(
4501            all_pks,
4502            vec!["item000", "item001", "item002", "item003"],
4503            "pagination should return all items without duplicates"
4504        );
4505    }
4506
4507    fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
4508        pairs
4509            .iter()
4510            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4511            .collect()
4512    }
4513
4514    fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
4515        pairs
4516            .iter()
4517            .map(|(k, v)| (k.to_string(), v.to_string()))
4518            .collect()
4519    }
4520
4521    fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
4522        pairs
4523            .iter()
4524            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4525            .collect()
4526    }
4527
4528    #[test]
4529    fn test_evaluate_condition_bare_not_equal() {
4530        let item = cond_item(&[("state", "active")]);
4531        let names = cond_names(&[("#s", "state")]);
4532        let values = cond_values(&[(":c", "complete")]);
4533
4534        assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
4535
4536        let item2 = cond_item(&[("state", "complete")]);
4537        assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
4538    }
4539
4540    #[test]
4541    fn test_evaluate_condition_parenthesized_not_equal() {
4542        let item = cond_item(&[("state", "active")]);
4543        let names = cond_names(&[("#s", "state")]);
4544        let values = cond_values(&[(":c", "complete")]);
4545
4546        assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
4547    }
4548
4549    #[test]
4550    fn test_evaluate_condition_parenthesized_equal_mismatch() {
4551        let item = cond_item(&[("state", "active")]);
4552        let names = cond_names(&[("#s", "state")]);
4553        let values = cond_values(&[(":c", "complete")]);
4554
4555        assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
4556    }
4557
4558    #[test]
4559    fn test_evaluate_condition_compound_and() {
4560        let item = cond_item(&[("state", "active")]);
4561        let names = cond_names(&[("#s", "state")]);
4562        let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
4563
4564        // active <> complete AND active <> failed => true
4565        assert!(
4566            evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
4567        );
4568    }
4569
4570    #[test]
4571    fn test_evaluate_condition_compound_and_mismatch() {
4572        let item = cond_item(&[("state", "inactive")]);
4573        let names = cond_names(&[("#s", "state")]);
4574        let values = cond_values(&[(":a", "active"), (":b", "active")]);
4575
4576        // inactive = active AND inactive = active => false
4577        assert!(
4578            evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
4579        );
4580    }
4581
4582    #[test]
4583    fn test_evaluate_condition_compound_or() {
4584        let item = cond_item(&[("state", "running")]);
4585        let names = cond_names(&[("#s", "state")]);
4586        let values = cond_values(&[(":a", "active"), (":b", "idle")]);
4587
4588        // running = active OR running = idle => false
4589        assert!(
4590            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
4591        );
4592
4593        // running = active OR running = running => true
4594        let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
4595        assert!(
4596            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
4597        );
4598    }
4599
4600    #[test]
4601    fn test_evaluate_condition_not_operator() {
4602        let item = cond_item(&[("state", "active")]);
4603        let names = cond_names(&[("#s", "state")]);
4604        let values = cond_values(&[(":c", "complete")]);
4605
4606        // NOT (active = complete) => NOT false => true
4607        assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
4608
4609        // NOT (active <> complete) => NOT true => false
4610        assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
4611
4612        // NOT attribute_exists(#s) on existing item => NOT true => false
4613        assert!(
4614            evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
4615        );
4616
4617        // NOT attribute_exists(#s) on missing item => NOT false => true
4618        assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
4619    }
4620
4621    #[test]
4622    fn test_evaluate_condition_begins_with() {
4623        // After unification, conditions support begins_with via
4624        // evaluate_single_filter_condition (previously only filters had it).
4625        let item = cond_item(&[("name", "fakecloud-dynamodb")]);
4626        let names = cond_names(&[("#n", "name")]);
4627        let values = cond_values(&[(":p", "fakecloud")]);
4628
4629        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
4630
4631        let values2 = cond_values(&[(":p", "realcloud")]);
4632        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
4633    }
4634
4635    #[test]
4636    fn test_evaluate_condition_contains() {
4637        let item = cond_item(&[("tags", "alpha,beta,gamma")]);
4638        let names = cond_names(&[("#t", "tags")]);
4639        let values = cond_values(&[(":v", "beta")]);
4640
4641        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
4642
4643        let values2 = cond_values(&[(":v", "delta")]);
4644        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
4645    }
4646
4647    #[test]
4648    fn test_evaluate_condition_no_existing_item() {
4649        // When no item exists (PutItem with condition), attribute_not_exists
4650        // should succeed and attribute_exists should fail.
4651        let names = cond_names(&[("#s", "state")]);
4652        let values = cond_values(&[(":v", "active")]);
4653
4654        assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
4655        assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
4656        // Comparison against missing item: None != Some(val) => true for <>
4657        assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
4658        // None == Some(val) => false for =
4659        assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
4660    }
4661
4662    #[test]
4663    fn test_evaluate_filter_not_operator() {
4664        let item = cond_item(&[("status", "pending")]);
4665        let names = cond_names(&[("#s", "status")]);
4666        let values = cond_values(&[(":v", "pending")]);
4667
4668        assert!(!evaluate_filter_expression(
4669            "NOT (#s = :v)",
4670            &item,
4671            &names,
4672            &values
4673        ));
4674        assert!(evaluate_filter_expression(
4675            "NOT (#s <> :v)",
4676            &item,
4677            &names,
4678            &values
4679        ));
4680    }
4681
4682    #[test]
4683    fn test_evaluate_filter_expression_in_match() {
4684        // aws-sdk-go v2's expression.Name("state").In(Value("active"), Value("pending"))
4685        // emits "#0 IN (:0, :1)". Before fix: neither evaluate_single_filter_condition
4686        // nor evaluate_single_key_condition handled IN, so the filter leaf fell through
4687        // to the simple-comparison loop, hit no operators, and returned `true` — meaning
4688        // every item matched every IN filter regardless of value.
4689        let item = cond_item(&[("state", "active")]);
4690        let names = cond_names(&[("#s", "state")]);
4691        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4692
4693        assert!(
4694            evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4695            "state=active should match IN (active, pending)"
4696        );
4697    }
4698
4699    #[test]
4700    fn test_evaluate_filter_expression_in_no_match() {
4701        let item = cond_item(&[("state", "complete")]);
4702        let names = cond_names(&[("#s", "state")]);
4703        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4704
4705        assert!(
4706            !evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4707            "state=complete should not match IN (active, pending)"
4708        );
4709    }
4710
4711    #[test]
4712    fn test_evaluate_filter_expression_in_no_spaces() {
4713        // orderbot emits the raw form
4714        //     "#status IN (" + strings.Join(keys, ",") + ")"
4715        // which produces "IN (:v0,:v1,:v2)" — no spaces after commas. Must parse.
4716        let item = cond_item(&[("status", "shipped")]);
4717        let names = cond_names(&[("#s", "status")]);
4718        let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
4719
4720        assert!(
4721            evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
4722            "no-space IN list should still parse"
4723        );
4724    }
4725
4726    #[test]
4727    fn test_evaluate_filter_expression_in_missing_attribute() {
4728        // A missing attribute must not match any IN list — the silent-true
4729        // fallthrough would wrongly accept these items.
4730        let item: HashMap<String, AttributeValue> = HashMap::new();
4731        let names = cond_names(&[("#s", "state")]);
4732        let values = cond_values(&[(":a", "active")]);
4733
4734        assert!(
4735            !evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
4736            "missing attribute should not match any IN list"
4737        );
4738    }
4739
4740    #[test]
4741    fn test_evaluate_filter_expression_compound_in_and_eq() {
4742        // Shape emitted by `Name("state").In(...).And(Name("priority").Equal(...))`:
4743        //     "(#0 IN (:0, :1)) AND (#1 = :2)"
4744        // split_on_and handles the outer parens, but the IN leaf had the
4745        // silent-true fallthrough, so any item with priority=high would match
4746        // regardless of state.
4747        let item = cond_item(&[("state", "active"), ("priority", "high")]);
4748        let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
4749        let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
4750
4751        assert!(
4752            evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
4753            "(active IN (active, pending)) AND (high = high) should match"
4754        );
4755
4756        let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
4757        assert!(
4758            !evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
4759            "(complete IN (active, pending)) AND (high = high) should not match"
4760        );
4761    }
4762
4763    #[test]
4764    fn test_evaluate_condition_attribute_exists_with_space() {
4765        // aws-sdk-go v2's expression.NewBuilder emits function calls with a
4766        // space between the name and the opening paren:
4767        //     "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))"
4768        // Before fix: extract_function_arg used strip_prefix("attribute_exists(")
4769        // with no space, so these fell through the filter leaf entirely and
4770        // hit evaluate_single_key_condition's silent-true fallthrough —
4771        // every conditional write was silently accepted.
4772        let item = cond_item(&[("store_id", "s-1")]);
4773        let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
4774        let values = cond_values(&[(":0", "tab-A")]);
4775
4776        // On an existing item without active_viewer_tab_id: exists(store_id)
4777        // is true, not_exists(active_viewer_tab_id) is true → OK.
4778        assert!(
4779            evaluate_condition(
4780                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4781                Some(&item),
4782                &names,
4783                &values,
4784            )
4785            .is_ok(),
4786            "claim-lease compound on free item should succeed"
4787        );
4788
4789        // On a missing item: exists(store_id) is false → whole AND false → Err.
4790        assert!(
4791            evaluate_condition(
4792                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4793                None,
4794                &names,
4795                &values,
4796            )
4797            .is_err(),
4798            "claim-lease compound on missing item must fail attribute_exists branch"
4799        );
4800
4801        // On an item already held by tab-B: exists ✓, not_exists ✗, #1 = :0 ✗
4802        // → (✓) AND ((✗) OR (✗)) → false → Err.
4803        let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
4804        assert!(
4805            evaluate_condition(
4806                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4807                Some(&held),
4808                &names,
4809                &values,
4810            )
4811            .is_err(),
4812            "claim-lease compound on item held by another tab must fail"
4813        );
4814
4815        // Same tab re-claiming: exists ✓, not_exists ✗, #1 = :0 ✓
4816        // → (✓) AND ((✗) OR (✓)) → true → Ok.
4817        let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
4818        assert!(
4819            evaluate_condition(
4820                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4821                Some(&self_held),
4822                &names,
4823                &values,
4824            )
4825            .is_ok(),
4826            "same-tab re-claim must succeed"
4827        );
4828    }
4829
4830    #[test]
4831    fn test_evaluate_condition_in_match() {
4832        // evaluate_condition delegates to evaluate_filter_expression, so this
4833        // also proves the ConditionExpression path. Before fix: silently Ok.
4834        let item = cond_item(&[("state", "active")]);
4835        let names = cond_names(&[("#s", "state")]);
4836        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4837
4838        assert!(
4839            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
4840            "IN should succeed when actual value is in the list"
4841        );
4842    }
4843
4844    #[test]
4845    fn test_evaluate_condition_in_no_match() {
4846        // Before fix: evaluate_condition silently returned Ok(()) for IN — any
4847        // conditional write was accepted regardless of actual state, the
4848        // opposite of what the caller asked for.
4849        let item = cond_item(&[("state", "complete")]);
4850        let names = cond_names(&[("#s", "state")]);
4851        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4852
4853        assert!(
4854            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
4855            "IN should fail when actual value is not in the list"
4856        );
4857    }
4858
4859    #[test]
4860    fn test_apply_update_set_list_index_replaces_existing() {
4861        // Shape emitted by orderbot's order-item update retry loop:
4862        //     UpdateExpression: fmt.Sprintf("SET #items[%d] = :item", index)
4863        // Before fix: apply_set_assignment called resolve_attr_name on the
4864        // whole "#items[0]" token, which misses the name map, and then
4865        // item.insert("#items[0]", :item), producing a top-level key
4866        // literally named "#items[0]" rather than mutating the list.
4867        let mut item = HashMap::new();
4868        item.insert(
4869            "items".to_string(),
4870            json!({"L": [
4871                {"M": {"sku": {"S": "OLD-A"}}},
4872                {"M": {"sku": {"S": "OLD-B"}}},
4873            ]}),
4874        );
4875
4876        let names = cond_names(&[("#items", "items")]);
4877        let mut values = HashMap::new();
4878        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
4879
4880        apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
4881
4882        let items_list = item
4883            .get("items")
4884            .and_then(|v| v.get("L"))
4885            .and_then(|v| v.as_array())
4886            .expect("items should still be a list");
4887        assert_eq!(items_list.len(), 2, "list length should be unchanged");
4888        let sku0 = items_list[0]
4889            .get("M")
4890            .and_then(|m| m.get("sku"))
4891            .and_then(|s| s.get("S"))
4892            .and_then(|s| s.as_str());
4893        assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
4894        let sku1 = items_list[1]
4895            .get("M")
4896            .and_then(|m| m.get("sku"))
4897            .and_then(|s| s.get("S"))
4898            .and_then(|s| s.as_str());
4899        assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
4900
4901        assert!(!item.contains_key("items[0]"));
4902        assert!(!item.contains_key("#items[0]"));
4903    }
4904
4905    #[test]
4906    fn test_apply_update_set_list_index_second_slot() {
4907        let mut item = HashMap::new();
4908        item.insert(
4909            "items".to_string(),
4910            json!({"L": [
4911                {"M": {"sku": {"S": "A"}}},
4912                {"M": {"sku": {"S": "B"}}},
4913                {"M": {"sku": {"S": "C"}}},
4914            ]}),
4915        );
4916
4917        let names = cond_names(&[("#items", "items")]);
4918        let mut values = HashMap::new();
4919        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
4920
4921        apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
4922
4923        let items_list = item
4924            .get("items")
4925            .and_then(|v| v.get("L"))
4926            .and_then(|v| v.as_array())
4927            .unwrap();
4928        let skus: Vec<&str> = items_list
4929            .iter()
4930            .map(|v| {
4931                v.get("M")
4932                    .and_then(|m| m.get("sku"))
4933                    .and_then(|s| s.get("S"))
4934                    .and_then(|s| s.as_str())
4935                    .unwrap()
4936            })
4937            .collect();
4938        assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
4939    }
4940
4941    #[test]
4942    fn test_apply_update_set_list_index_without_name_ref() {
4943        // Same fix must also work when the LHS is a literal attribute name,
4944        // not an expression attribute name ref.
4945        let mut item = HashMap::new();
4946        item.insert(
4947            "tags".to_string(),
4948            json!({"L": [{"S": "red"}, {"S": "blue"}]}),
4949        );
4950
4951        let names: HashMap<String, String> = HashMap::new();
4952        let mut values = HashMap::new();
4953        values.insert(":t".to_string(), json!({"S": "green"}));
4954
4955        apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
4956
4957        let tags = item
4958            .get("tags")
4959            .and_then(|v| v.get("L"))
4960            .and_then(|v| v.as_array())
4961            .unwrap();
4962        assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
4963        assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
4964    }
4965
4966    #[test]
4967    fn test_list_append_into_empty_list() {
4968        // Regression: UpdateItem with `SET #0 = list_append(#0, :0)` where
4969        // the attribute already exists as an empty list silently no-oped.
4970        // Root cause: parse_update_clauses split `list_append(#0, :0)` at
4971        // the inner comma, so apply_set_list_append received a truncated
4972        // `rest` with no closing ')' and returned early without writing.
4973        let mut item = HashMap::new();
4974        item.insert("files".to_string(), json!({"L": []}));
4975
4976        let names = cond_names(&[("#0", "files")]);
4977        let mut values = HashMap::new();
4978        values.insert(
4979            ":0".to_string(),
4980            json!({"L": [{"M": {"field": {"S": "value"}}}]}),
4981        );
4982
4983        apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
4984            .unwrap();
4985
4986        let list = item
4987            .get("files")
4988            .and_then(|v| v.get("L"))
4989            .and_then(|v| v.as_array())
4990            .expect("files should be an L-typed attribute");
4991        assert_eq!(list.len(), 1, "one element should have been appended");
4992    }
4993
4994    #[test]
4995    fn test_list_append_into_nonempty_list() {
4996        // Verifies the same fix works when the existing list already has elements.
4997        let mut item = HashMap::new();
4998        item.insert(
4999            "files".to_string(),
5000            json!({"L": [{"M": {"field": {"S": "existing"}}}]}),
5001        );
5002
5003        let names = cond_names(&[("#0", "files")]);
5004        let mut values = HashMap::new();
5005        values.insert(
5006            ":0".to_string(),
5007            json!({"L": [{"M": {"field": {"S": "new"}}}]}),
5008        );
5009
5010        apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
5011            .unwrap();
5012
5013        let list = item
5014            .get("files")
5015            .and_then(|v| v.get("L"))
5016            .and_then(|v| v.as_array())
5017            .expect("files should be an L-typed attribute");
5018        assert_eq!(list.len(), 2, "existing element plus one new element");
5019    }
5020
5021    #[test]
5022    fn test_list_append_combined_with_plain_set() {
5023        // Verifies that a mixed expression like
5024        // `SET #a = list_append(#a, :v), #b = :other` correctly applies
5025        // both assignments after the paren-aware comma split fix.
5026        let mut item = HashMap::new();
5027        item.insert("logs".to_string(), json!({"L": []}));
5028        item.insert("count".to_string(), json!({"N": "0"}));
5029
5030        let names = cond_names(&[("#a", "logs"), ("#b", "count")]);
5031        let mut values = HashMap::new();
5032        values.insert(":v".to_string(), json!({"L": [{"S": "entry"}]}));
5033        values.insert(":other".to_string(), json!({"N": "1"}));
5034
5035        apply_update_expression(
5036            &mut item,
5037            "SET #a = list_append(#a, :v), #b = :other",
5038            &names,
5039            &values,
5040        )
5041        .unwrap();
5042
5043        let list = item
5044            .get("logs")
5045            .and_then(|v| v.get("L"))
5046            .and_then(|v| v.as_array())
5047            .expect("logs should be an L-typed attribute");
5048        assert_eq!(list.len(), 1, "one log entry appended");
5049
5050        let count = item
5051            .get("count")
5052            .and_then(|v| v.get("N"))
5053            .and_then(|v| v.as_str())
5054            .expect("count should be an N-typed attribute");
5055        assert_eq!(count, "1", "count updated to 1");
5056    }
5057
5058    #[test]
5059    fn test_unrecognized_expression_returns_false() {
5060        // evaluate_single_key_condition must fail-closed: an expression shape
5061        // it doesn't recognize should return false (reject), not true (accept).
5062        let item = cond_item(&[("x", "1")]);
5063        let names: HashMap<String, String> = HashMap::new();
5064        let values: HashMap<String, Value> = HashMap::new();
5065
5066        assert!(
5067            !evaluate_single_key_condition("GARBAGE NONSENSE", &item, &names, &values),
5068            "unrecognized expression must return false"
5069        );
5070    }
5071
5072    #[test]
5073    fn test_set_list_index_out_of_range_returns_error() {
5074        // SET list[N] where N > len must return a ValidationException,
5075        // not silently no-op.
5076        let mut item = HashMap::new();
5077        item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
5078
5079        let names: HashMap<String, String> = HashMap::new();
5080        let mut values = HashMap::new();
5081        values.insert(":v".to_string(), json!({"S": "z"}));
5082
5083        let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
5084        assert!(
5085            result.is_err(),
5086            "out-of-range list index must return an error"
5087        );
5088
5089        // List should be unchanged
5090        let list = item
5091            .get("items")
5092            .and_then(|v| v.get("L"))
5093            .and_then(|v| v.as_array())
5094            .unwrap();
5095        assert_eq!(list.len(), 2);
5096    }
5097
5098    #[test]
5099    fn test_set_list_index_on_non_list_returns_error() {
5100        // SET attr[0] = :v where attr is a string (not a list) must return
5101        // a ValidationException.
5102        let mut item = HashMap::new();
5103        item.insert("name".to_string(), json!({"S": "hello"}));
5104
5105        let names: HashMap<String, String> = HashMap::new();
5106        let mut values = HashMap::new();
5107        values.insert(":v".to_string(), json!({"S": "z"}));
5108
5109        let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
5110        assert!(
5111            result.is_err(),
5112            "list index on non-list attribute must return an error"
5113        );
5114    }
5115
5116    #[test]
5117    fn test_unrecognized_update_action_returns_error() {
5118        let mut item = HashMap::new();
5119        item.insert("name".to_string(), json!({"S": "hello"}));
5120
5121        let names: HashMap<String, String> = HashMap::new();
5122        let mut values = HashMap::new();
5123        values.insert(":bar".to_string(), json!({"S": "baz"}));
5124
5125        let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
5126        assert!(
5127            result.is_err(),
5128            "unrecognized UpdateExpression action must return an error"
5129        );
5130        let err_msg = format!("{}", result.unwrap_err());
5131        assert!(
5132            err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
5133            "error should mention Invalid UpdateExpression, got: {err_msg}"
5134        );
5135    }
5136
5137    // ── size() function tests ──────────────────────────────────────────
5138
5139    #[test]
5140    fn test_size_string() {
5141        let mut item = HashMap::new();
5142        item.insert("name".to_string(), json!({"S": "hello"}));
5143        let names = HashMap::new();
5144        let mut values = HashMap::new();
5145        values.insert(":limit".to_string(), json!({"N": "5"}));
5146
5147        assert!(evaluate_single_filter_condition(
5148            "size(name) = :limit",
5149            &item,
5150            &names,
5151            &values,
5152        ));
5153        values.insert(":limit".to_string(), json!({"N": "4"}));
5154        assert!(evaluate_single_filter_condition(
5155            "size(name) > :limit",
5156            &item,
5157            &names,
5158            &values,
5159        ));
5160    }
5161
5162    #[test]
5163    fn test_size_list() {
5164        let mut item = HashMap::new();
5165        item.insert(
5166            "items".to_string(),
5167            json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
5168        );
5169        let names = HashMap::new();
5170        let mut values = HashMap::new();
5171        values.insert(":limit".to_string(), json!({"N": "3"}));
5172
5173        assert!(evaluate_single_filter_condition(
5174            "size(items) = :limit",
5175            &item,
5176            &names,
5177            &values,
5178        ));
5179    }
5180
5181    #[test]
5182    fn test_size_map() {
5183        let mut item = HashMap::new();
5184        item.insert(
5185            "data".to_string(),
5186            json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
5187        );
5188        let names = HashMap::new();
5189        let mut values = HashMap::new();
5190        values.insert(":limit".to_string(), json!({"N": "2"}));
5191
5192        assert!(evaluate_single_filter_condition(
5193            "size(data) = :limit",
5194            &item,
5195            &names,
5196            &values,
5197        ));
5198    }
5199
5200    #[test]
5201    fn test_size_set() {
5202        let mut item = HashMap::new();
5203        item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
5204        let names = HashMap::new();
5205        let mut values = HashMap::new();
5206        values.insert(":limit".to_string(), json!({"N": "3"}));
5207
5208        assert!(evaluate_single_filter_condition(
5209            "size(tags) > :limit",
5210            &item,
5211            &names,
5212            &values,
5213        ));
5214    }
5215
5216    // ── attribute_type() function tests ────────────────────────────────
5217
5218    #[test]
5219    fn test_attribute_type_string() {
5220        let mut item = HashMap::new();
5221        item.insert("name".to_string(), json!({"S": "hello"}));
5222        let names = HashMap::new();
5223        let mut values = HashMap::new();
5224        values.insert(":t".to_string(), json!({"S": "S"}));
5225
5226        assert!(evaluate_single_filter_condition(
5227            "attribute_type(name, :t)",
5228            &item,
5229            &names,
5230            &values,
5231        ));
5232
5233        values.insert(":t".to_string(), json!({"S": "N"}));
5234        assert!(!evaluate_single_filter_condition(
5235            "attribute_type(name, :t)",
5236            &item,
5237            &names,
5238            &values,
5239        ));
5240    }
5241
5242    #[test]
5243    fn test_attribute_type_number() {
5244        let mut item = HashMap::new();
5245        item.insert("age".to_string(), json!({"N": "42"}));
5246        let names = HashMap::new();
5247        let mut values = HashMap::new();
5248        values.insert(":t".to_string(), json!({"S": "N"}));
5249
5250        assert!(evaluate_single_filter_condition(
5251            "attribute_type(age, :t)",
5252            &item,
5253            &names,
5254            &values,
5255        ));
5256    }
5257
5258    #[test]
5259    fn test_attribute_type_list() {
5260        let mut item = HashMap::new();
5261        item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
5262        let names = HashMap::new();
5263        let mut values = HashMap::new();
5264        values.insert(":t".to_string(), json!({"S": "L"}));
5265
5266        assert!(evaluate_single_filter_condition(
5267            "attribute_type(items, :t)",
5268            &item,
5269            &names,
5270            &values,
5271        ));
5272    }
5273
5274    #[test]
5275    fn test_attribute_type_map() {
5276        let mut item = HashMap::new();
5277        item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
5278        let names = HashMap::new();
5279        let mut values = HashMap::new();
5280        values.insert(":t".to_string(), json!({"S": "M"}));
5281
5282        assert!(evaluate_single_filter_condition(
5283            "attribute_type(data, :t)",
5284            &item,
5285            &names,
5286            &values,
5287        ));
5288    }
5289
5290    #[test]
5291    fn test_attribute_type_bool() {
5292        let mut item = HashMap::new();
5293        item.insert("active".to_string(), json!({"BOOL": true}));
5294        let names = HashMap::new();
5295        let mut values = HashMap::new();
5296        values.insert(":t".to_string(), json!({"S": "BOOL"}));
5297
5298        assert!(evaluate_single_filter_condition(
5299            "attribute_type(active, :t)",
5300            &item,
5301            &names,
5302            &values,
5303        ));
5304    }
5305
5306    // ── begins_with rejects non-string types ───────────────────────────
5307
5308    #[test]
5309    fn test_begins_with_rejects_number_type() {
5310        let mut item = HashMap::new();
5311        item.insert("code".to_string(), json!({"N": "12345"}));
5312        let names = HashMap::new();
5313        let mut values = HashMap::new();
5314        values.insert(":prefix".to_string(), json!({"S": "123"}));
5315
5316        assert!(
5317            !evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
5318            "begins_with must return false for N-type attributes"
5319        );
5320    }
5321
5322    #[test]
5323    fn test_begins_with_works_on_string_type() {
5324        let mut item = HashMap::new();
5325        item.insert("code".to_string(), json!({"S": "abc123"}));
5326        let names = HashMap::new();
5327        let mut values = HashMap::new();
5328        values.insert(":prefix".to_string(), json!({"S": "abc"}));
5329
5330        assert!(evaluate_single_filter_condition(
5331            "begins_with(code, :prefix)",
5332            &item,
5333            &names,
5334            &values,
5335        ));
5336    }
5337
5338    // ── contains on sets ───────────────────────────────────────────────
5339
5340    #[test]
5341    fn test_contains_string_set() {
5342        let mut item = HashMap::new();
5343        item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
5344        let names = HashMap::new();
5345        let mut values = HashMap::new();
5346        values.insert(":val".to_string(), json!({"S": "blue"}));
5347
5348        assert!(evaluate_single_filter_condition(
5349            "contains(tags, :val)",
5350            &item,
5351            &names,
5352            &values,
5353        ));
5354
5355        values.insert(":val".to_string(), json!({"S": "yellow"}));
5356        assert!(!evaluate_single_filter_condition(
5357            "contains(tags, :val)",
5358            &item,
5359            &names,
5360            &values,
5361        ));
5362    }
5363
5364    #[test]
5365    fn test_contains_number_set() {
5366        let mut item = HashMap::new();
5367        item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
5368        let names = HashMap::new();
5369        let mut values = HashMap::new();
5370        values.insert(":val".to_string(), json!({"N": "2"}));
5371
5372        assert!(evaluate_single_filter_condition(
5373            "contains(scores, :val)",
5374            &item,
5375            &names,
5376            &values,
5377        ));
5378    }
5379
5380    // ── SET arithmetic type validation ─────────────────────────────────
5381
5382    #[test]
5383    fn test_set_arithmetic_rejects_string_operand() {
5384        let mut item = HashMap::new();
5385        item.insert("name".to_string(), json!({"S": "hello"}));
5386        let names = HashMap::new();
5387        let mut values = HashMap::new();
5388        values.insert(":val".to_string(), json!({"N": "1"}));
5389
5390        let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
5391        assert!(
5392            result.is_err(),
5393            "arithmetic on S-type attribute must return a ValidationException"
5394        );
5395    }
5396
5397    #[test]
5398    fn test_set_arithmetic_rejects_string_value() {
5399        let mut item = HashMap::new();
5400        item.insert("count".to_string(), json!({"N": "5"}));
5401        let names = HashMap::new();
5402        let mut values = HashMap::new();
5403        values.insert(":val".to_string(), json!({"S": "notanumber"}));
5404
5405        let result =
5406            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5407        assert!(
5408            result.is_err(),
5409            "arithmetic with S-type value must return a ValidationException"
5410        );
5411    }
5412
5413    #[test]
5414    fn test_set_arithmetic_valid_numbers() {
5415        let mut item = HashMap::new();
5416        item.insert("count".to_string(), json!({"N": "10"}));
5417        let names = HashMap::new();
5418        let mut values = HashMap::new();
5419        values.insert(":val".to_string(), json!({"N": "3"}));
5420
5421        let result =
5422            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5423        assert!(result.is_ok());
5424        assert_eq!(item["count"], json!({"N": "13"}));
5425    }
5426
5427    // ── Binary Set (BS) support in ADD/DELETE ──────────────────────────
5428
5429    #[test]
5430    fn test_add_binary_set() {
5431        let mut item = HashMap::new();
5432        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
5433        let names = HashMap::new();
5434        let mut values = HashMap::new();
5435        values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
5436
5437        let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
5438        assert!(result.is_ok());
5439        let bs = item["data"]["BS"].as_array().unwrap();
5440        assert_eq!(bs.len(), 3, "should merge sets without duplicates");
5441        assert!(bs.contains(&json!("YQ==")));
5442        assert!(bs.contains(&json!("Yg==")));
5443        assert!(bs.contains(&json!("Yw==")));
5444    }
5445
5446    #[test]
5447    fn test_delete_binary_set() {
5448        let mut item = HashMap::new();
5449        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
5450        let names = HashMap::new();
5451        let mut values = HashMap::new();
5452        values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
5453
5454        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5455        assert!(result.is_ok());
5456        let bs = item["data"]["BS"].as_array().unwrap();
5457        assert_eq!(bs.len(), 2);
5458        assert!(!bs.contains(&json!("Yg==")));
5459    }
5460
5461    #[test]
5462    fn test_delete_binary_set_removes_attr_when_empty() {
5463        let mut item = HashMap::new();
5464        item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
5465        let names = HashMap::new();
5466        let mut values = HashMap::new();
5467        values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
5468
5469        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5470        assert!(result.is_ok());
5471        assert!(
5472            !item.contains_key("data"),
5473            "attribute should be removed when set becomes empty"
5474        );
5475    }
5476
5477    fn body_json(resp: &AwsResponse) -> Value {
5478        serde_json::from_slice(resp.body.expect_bytes()).unwrap()
5479    }
5480
5481    fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
5482        match result {
5483            Err(e) => e,
5484            Ok(_) => panic!("expected error, got Ok"),
5485        }
5486    }
5487
5488    // ── CreateTable ──
5489
5490    #[test]
5491    fn create_table_basic() {
5492        let svc = make_service();
5493        let req = make_request(
5494            "CreateTable",
5495            json!({
5496                "TableName": "my-table",
5497                "KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
5498                "AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"}],
5499                "BillingMode": "PAY_PER_REQUEST",
5500            }),
5501        );
5502        let resp = svc.create_table(&req).unwrap();
5503        let b = body_json(&resp);
5504        assert_eq!(b["TableDescription"]["TableName"], "my-table");
5505        assert_eq!(b["TableDescription"]["TableStatus"], "ACTIVE");
5506        assert!(b["TableDescription"]["TableArn"].as_str().is_some());
5507    }
5508
5509    #[test]
5510    fn create_table_with_sort_key_and_gsi() {
5511        let svc = make_service();
5512        let req = make_request(
5513            "CreateTable",
5514            json!({
5515                "TableName": "gsi-table",
5516                "KeySchema": [
5517                    {"AttributeName": "pk", "KeyType": "HASH"},
5518                    {"AttributeName": "sk", "KeyType": "RANGE"},
5519                ],
5520                "AttributeDefinitions": [
5521                    {"AttributeName": "pk", "AttributeType": "S"},
5522                    {"AttributeName": "sk", "AttributeType": "S"},
5523                    {"AttributeName": "gsi_key", "AttributeType": "N"},
5524                ],
5525                "GlobalSecondaryIndexes": [{
5526                    "IndexName": "gsi1",
5527                    "KeySchema": [{"AttributeName": "gsi_key", "KeyType": "HASH"}],
5528                    "Projection": {"ProjectionType": "ALL"},
5529                }],
5530                "BillingMode": "PAY_PER_REQUEST",
5531            }),
5532        );
5533        let resp = svc.create_table(&req).unwrap();
5534        let b = body_json(&resp);
5535        let gsi = b["TableDescription"]["GlobalSecondaryIndexes"]
5536            .as_array()
5537            .unwrap();
5538        assert_eq!(gsi.len(), 1);
5539        assert_eq!(gsi[0]["IndexName"], "gsi1");
5540    }
5541
5542    #[test]
5543    fn create_table_duplicate_fails() {
5544        let svc = make_service();
5545        create_test_table(&svc);
5546
5547        let req = make_request(
5548            "CreateTable",
5549            json!({
5550                "TableName": "test-table",
5551                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
5552                "AttributeDefinitions": [{"AttributeName": "pk", "AttributeType": "S"}],
5553                "BillingMode": "PAY_PER_REQUEST",
5554            }),
5555        );
5556        let err = expect_err(svc.create_table(&req));
5557        assert!(err.to_string().contains("ResourceInUseException"));
5558    }
5559
5560    #[test]
5561    fn create_table_missing_key_attr_in_definitions() {
5562        let svc = make_service();
5563        let req = make_request(
5564            "CreateTable",
5565            json!({
5566                "TableName": "bad",
5567                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
5568                "AttributeDefinitions": [{"AttributeName": "other", "AttributeType": "S"}],
5569                "BillingMode": "PAY_PER_REQUEST",
5570            }),
5571        );
5572        let err = expect_err(svc.create_table(&req));
5573        assert!(err.to_string().contains("ValidationException"));
5574    }
5575
5576    // ── DescribeTable ──
5577
5578    #[test]
5579    fn describe_table_found() {
5580        let svc = make_service();
5581        create_test_table(&svc);
5582
5583        let req = make_request("DescribeTable", json!({"TableName": "test-table"}));
5584        let resp = svc.describe_table(&req).unwrap();
5585        let b = body_json(&resp);
5586        assert_eq!(b["Table"]["TableName"], "test-table");
5587        assert_eq!(b["Table"]["TableStatus"], "ACTIVE");
5588    }
5589
5590    #[test]
5591    fn describe_table_not_found() {
5592        let svc = make_service();
5593        let req = make_request("DescribeTable", json!({"TableName": "nope"}));
5594        let err = expect_err(svc.describe_table(&req));
5595        assert!(err.to_string().contains("ResourceNotFoundException"));
5596    }
5597
5598    // ── DeleteTable ──
5599
5600    #[test]
5601    fn delete_table_removes_table() {
5602        let svc = make_service();
5603        create_test_table(&svc);
5604
5605        let req = make_request("DeleteTable", json!({"TableName": "test-table"}));
5606        let resp = svc.delete_table(&req).unwrap();
5607        let b = body_json(&resp);
5608        assert_eq!(b["TableDescription"]["TableName"], "test-table");
5609
5610        // Should be gone
5611        let req = make_request("DescribeTable", json!({"TableName": "test-table"}));
5612        assert!(svc.describe_table(&req).is_err());
5613    }
5614
5615    // ── ListTables ──
5616
5617    #[test]
5618    fn list_tables_returns_names() {
5619        let svc = make_service();
5620        create_test_table(&svc);
5621
5622        let req = make_request("ListTables", json!({}));
5623        let resp = svc.list_tables(&req).unwrap();
5624        let b = body_json(&resp);
5625        let names = b["TableNames"].as_array().unwrap();
5626        assert!(names.iter().any(|n| n == "test-table"));
5627    }
5628
5629    // ── PutItem / GetItem / DeleteItem ──
5630
5631    #[test]
5632    fn put_and_get_item() {
5633        let svc = make_service();
5634        create_test_table(&svc);
5635
5636        let req = make_request(
5637            "PutItem",
5638            json!({
5639                "TableName": "test-table",
5640                "Item": {
5641                    "pk": {"S": "key1"},
5642                    "name": {"S": "Alice"},
5643                    "age": {"N": "30"},
5644                },
5645            }),
5646        );
5647        svc.put_item(&req).unwrap();
5648
5649        let req = make_request(
5650            "GetItem",
5651            json!({
5652                "TableName": "test-table",
5653                "Key": {"pk": {"S": "key1"}},
5654            }),
5655        );
5656        let resp = svc.get_item(&req).unwrap();
5657        let b = body_json(&resp);
5658        assert_eq!(b["Item"]["name"]["S"], "Alice");
5659        assert_eq!(b["Item"]["age"]["N"], "30");
5660    }
5661
5662    #[test]
5663    fn get_item_not_found() {
5664        let svc = make_service();
5665        create_test_table(&svc);
5666
5667        let req = make_request(
5668            "GetItem",
5669            json!({
5670                "TableName": "test-table",
5671                "Key": {"pk": {"S": "nonexistent"}},
5672            }),
5673        );
5674        let resp = svc.get_item(&req).unwrap();
5675        let b = body_json(&resp);
5676        assert!(b.get("Item").is_none() || b["Item"].is_null());
5677    }
5678
5679    #[test]
5680    fn delete_item_removes_item() {
5681        let svc = make_service();
5682        create_test_table(&svc);
5683
5684        let req = make_request(
5685            "PutItem",
5686            json!({
5687                "TableName": "test-table",
5688                "Item": {"pk": {"S": "del-me"}},
5689            }),
5690        );
5691        svc.put_item(&req).unwrap();
5692
5693        let req = make_request(
5694            "DeleteItem",
5695            json!({
5696                "TableName": "test-table",
5697                "Key": {"pk": {"S": "del-me"}},
5698            }),
5699        );
5700        svc.delete_item(&req).unwrap();
5701
5702        let req = make_request(
5703            "GetItem",
5704            json!({
5705                "TableName": "test-table",
5706                "Key": {"pk": {"S": "del-me"}},
5707            }),
5708        );
5709        let resp = svc.get_item(&req).unwrap();
5710        let b = body_json(&resp);
5711        assert!(b.get("Item").is_none() || b["Item"].is_null());
5712    }
5713
5714    #[test]
5715    fn put_item_returns_old_item() {
5716        let svc = make_service();
5717        create_test_table(&svc);
5718
5719        let req = make_request(
5720            "PutItem",
5721            json!({
5722                "TableName": "test-table",
5723                "Item": {"pk": {"S": "overwrite"}, "v": {"N": "1"}},
5724            }),
5725        );
5726        svc.put_item(&req).unwrap();
5727
5728        let req = make_request(
5729            "PutItem",
5730            json!({
5731                "TableName": "test-table",
5732                "Item": {"pk": {"S": "overwrite"}, "v": {"N": "2"}},
5733                "ReturnValues": "ALL_OLD",
5734            }),
5735        );
5736        let resp = svc.put_item(&req).unwrap();
5737        let b = body_json(&resp);
5738        assert_eq!(b["Attributes"]["v"]["N"], "1");
5739    }
5740
5741    // ── UpdateItem ──
5742
5743    #[test]
5744    fn update_item_set_attribute() {
5745        let svc = make_service();
5746        create_test_table(&svc);
5747
5748        let req = make_request(
5749            "PutItem",
5750            json!({
5751                "TableName": "test-table",
5752                "Item": {"pk": {"S": "upd"}, "count": {"N": "0"}},
5753            }),
5754        );
5755        svc.put_item(&req).unwrap();
5756
5757        let req = make_request(
5758            "UpdateItem",
5759            json!({
5760                "TableName": "test-table",
5761                "Key": {"pk": {"S": "upd"}},
5762                "UpdateExpression": "SET #c = :val",
5763                "ExpressionAttributeNames": {"#c": "count"},
5764                "ExpressionAttributeValues": {":val": {"N": "42"}},
5765                "ReturnValues": "ALL_NEW",
5766            }),
5767        );
5768        let resp = svc.update_item(&req).unwrap();
5769        let b = body_json(&resp);
5770        assert_eq!(b["Attributes"]["count"]["N"], "42");
5771    }
5772
5773    // ── Query ──
5774
5775    #[test]
5776    fn query_returns_matching_items() {
5777        let svc = make_service();
5778        // Table with hash+range
5779        let req = make_request(
5780            "CreateTable",
5781            json!({
5782                "TableName": "query-table",
5783                "KeySchema": [
5784                    {"AttributeName": "pk", "KeyType": "HASH"},
5785                    {"AttributeName": "sk", "KeyType": "RANGE"},
5786                ],
5787                "AttributeDefinitions": [
5788                    {"AttributeName": "pk", "AttributeType": "S"},
5789                    {"AttributeName": "sk", "AttributeType": "S"},
5790                ],
5791                "BillingMode": "PAY_PER_REQUEST",
5792            }),
5793        );
5794        svc.create_table(&req).unwrap();
5795
5796        for i in 0..3 {
5797            let req = make_request(
5798                "PutItem",
5799                json!({
5800                    "TableName": "query-table",
5801                    "Item": {
5802                        "pk": {"S": "user1"},
5803                        "sk": {"S": format!("item-{i}")},
5804                    },
5805                }),
5806            );
5807            svc.put_item(&req).unwrap();
5808        }
5809        // Different partition
5810        let req = make_request(
5811            "PutItem",
5812            json!({
5813                "TableName": "query-table",
5814                "Item": {"pk": {"S": "user2"}, "sk": {"S": "item-0"}},
5815            }),
5816        );
5817        svc.put_item(&req).unwrap();
5818
5819        let req = make_request(
5820            "Query",
5821            json!({
5822                "TableName": "query-table",
5823                "KeyConditionExpression": "pk = :pk",
5824                "ExpressionAttributeValues": {":pk": {"S": "user1"}},
5825            }),
5826        );
5827        let resp = svc.query(&req).unwrap();
5828        let b = body_json(&resp);
5829        assert_eq!(b["Count"], 3);
5830        assert_eq!(b["Items"].as_array().unwrap().len(), 3);
5831    }
5832
5833    // ── Scan ──
5834
5835    #[test]
5836    fn scan_returns_all_items() {
5837        let svc = make_service();
5838        create_test_table(&svc);
5839
5840        for i in 0..5 {
5841            let req = make_request(
5842                "PutItem",
5843                json!({
5844                    "TableName": "test-table",
5845                    "Item": {"pk": {"S": format!("scan-{i}")}},
5846                }),
5847            );
5848            svc.put_item(&req).unwrap();
5849        }
5850
5851        let req = make_request("Scan", json!({"TableName": "test-table"}));
5852        let resp = svc.scan(&req).unwrap();
5853        let b = body_json(&resp);
5854        assert_eq!(b["Count"], 5);
5855    }
5856
5857    // ── BatchWriteItem / BatchGetItem ──
5858
5859    #[test]
5860    fn batch_write_and_get_items() {
5861        let svc = make_service();
5862        create_test_table(&svc);
5863
5864        let req = make_request(
5865            "BatchWriteItem",
5866            json!({
5867                "RequestItems": {
5868                    "test-table": [
5869                        {"PutRequest": {"Item": {"pk": {"S": "b1"}, "val": {"S": "v1"}}}},
5870                        {"PutRequest": {"Item": {"pk": {"S": "b2"}, "val": {"S": "v2"}}}},
5871                        {"PutRequest": {"Item": {"pk": {"S": "b3"}, "val": {"S": "v3"}}}},
5872                    ]
5873                }
5874            }),
5875        );
5876        let resp = svc.batch_write_item(&req).unwrap();
5877        let b = body_json(&resp);
5878        // Unprocessed should be empty
5879        assert!(
5880            b["UnprocessedItems"].as_object().unwrap().is_empty()
5881                || b["UnprocessedItems"]["test-table"]
5882                    .as_array()
5883                    .is_none_or(|a| a.is_empty())
5884        );
5885
5886        // BatchGetItem
5887        let req = make_request(
5888            "BatchGetItem",
5889            json!({
5890                "RequestItems": {
5891                    "test-table": {
5892                        "Keys": [
5893                            {"pk": {"S": "b1"}},
5894                            {"pk": {"S": "b2"}},
5895                            {"pk": {"S": "b3"}},
5896                        ]
5897                    }
5898                }
5899            }),
5900        );
5901        let resp = svc.batch_get_item(&req).unwrap();
5902        let b = body_json(&resp);
5903        let items = b["Responses"]["test-table"].as_array().unwrap();
5904        assert_eq!(items.len(), 3);
5905    }
5906
5907    // ── TransactWriteItems / TransactGetItems ──
5908
5909    #[test]
5910    fn transact_write_and_get() {
5911        let svc = make_service();
5912        create_test_table(&svc);
5913
5914        let req = make_request(
5915            "TransactWriteItems",
5916            json!({
5917                "TransactItems": [
5918                    {"Put": {"TableName": "test-table", "Item": {"pk": {"S": "tx1"}}}},
5919                    {"Put": {"TableName": "test-table", "Item": {"pk": {"S": "tx2"}}}},
5920                ]
5921            }),
5922        );
5923        svc.transact_write_items(&req).unwrap();
5924
5925        let req = make_request(
5926            "TransactGetItems",
5927            json!({
5928                "TransactItems": [
5929                    {"Get": {"TableName": "test-table", "Key": {"pk": {"S": "tx1"}}}},
5930                    {"Get": {"TableName": "test-table", "Key": {"pk": {"S": "tx2"}}}},
5931                ]
5932            }),
5933        );
5934        let resp = svc.transact_get_items(&req).unwrap();
5935        let b = body_json(&resp);
5936        let responses = b["Responses"].as_array().unwrap();
5937        assert_eq!(responses.len(), 2);
5938    }
5939
5940    // ── TagResource / UntagResource / ListTagsOfResource ──
5941
5942    #[test]
5943    fn tag_operations() {
5944        let svc = make_service();
5945        create_test_table(&svc);
5946        let arn = {
5947            let s = svc.state.read();
5948            s.default_ref()
5949                .tables
5950                .get("test-table")
5951                .unwrap()
5952                .arn
5953                .clone()
5954        };
5955
5956        let req = make_request(
5957            "TagResource",
5958            json!({
5959                "ResourceArn": arn,
5960                "Tags": [{"Key": "env", "Value": "test"}],
5961            }),
5962        );
5963        svc.tag_resource(&req).unwrap();
5964
5965        let req = make_request("ListTagsOfResource", json!({"ResourceArn": arn}));
5966        let resp = svc.list_tags_of_resource(&req).unwrap();
5967        let b = body_json(&resp);
5968        let tags = b["Tags"].as_array().unwrap();
5969        assert_eq!(tags.len(), 1);
5970        assert_eq!(tags[0]["Key"], "env");
5971
5972        let req = make_request(
5973            "UntagResource",
5974            json!({
5975                "ResourceArn": arn,
5976                "TagKeys": ["env"],
5977            }),
5978        );
5979        svc.untag_resource(&req).unwrap();
5980
5981        let req = make_request("ListTagsOfResource", json!({"ResourceArn": arn}));
5982        let resp = svc.list_tags_of_resource(&req).unwrap();
5983        let b = body_json(&resp);
5984        assert!(b["Tags"].as_array().unwrap().is_empty());
5985    }
5986
5987    // ── UpdateTable ──
5988
5989    #[test]
5990    fn update_table_add_gsi() {
5991        let svc = make_service();
5992        let req = make_request(
5993            "CreateTable",
5994            json!({
5995                "TableName": "upd-table",
5996                "KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
5997                "AttributeDefinitions": [
5998                    {"AttributeName": "pk", "AttributeType": "S"},
5999                    {"AttributeName": "gk", "AttributeType": "S"},
6000                ],
6001                "BillingMode": "PAY_PER_REQUEST",
6002            }),
6003        );
6004        svc.create_table(&req).unwrap();
6005
6006        let req = make_request(
6007            "UpdateTable",
6008            json!({
6009                "TableName": "upd-table",
6010                "GlobalSecondaryIndexUpdates": [{
6011                    "Create": {
6012                        "IndexName": "new-gsi",
6013                        "KeySchema": [{"AttributeName": "gk", "KeyType": "HASH"}],
6014                        "Projection": {"ProjectionType": "ALL"},
6015                    }
6016                }],
6017            }),
6018        );
6019        let resp = svc.update_table(&req).unwrap();
6020        let b = body_json(&resp);
6021        let gsi = b["TableDescription"]["GlobalSecondaryIndexes"]
6022            .as_array()
6023            .unwrap();
6024        assert_eq!(gsi.len(), 1);
6025        assert_eq!(gsi[0]["IndexName"], "new-gsi");
6026    }
6027
6028    // ── Scan with FilterExpression ──
6029
6030    #[test]
6031    fn scan_with_filter_expression() {
6032        let svc = make_service();
6033        create_test_table(&svc);
6034
6035        for i in 0..5 {
6036            let req = make_request(
6037                "PutItem",
6038                json!({
6039                    "TableName": "test-table",
6040                    "Item": {
6041                        "pk": {"S": format!("f-{i}")},
6042                        "status": {"S": if i % 2 == 0 { "active" } else { "inactive" }},
6043                    },
6044                }),
6045            );
6046            svc.put_item(&req).unwrap();
6047        }
6048
6049        let req = make_request(
6050            "Scan",
6051            json!({
6052                "TableName": "test-table",
6053                "FilterExpression": "#s = :val",
6054                "ExpressionAttributeNames": {"#s": "status"},
6055                "ExpressionAttributeValues": {":val": {"S": "active"}},
6056            }),
6057        );
6058        let resp = svc.scan(&req).unwrap();
6059        let b = body_json(&resp);
6060        assert_eq!(b["Count"], 3);
6061    }
6062
6063    // ── PartiQL operations (batch.rs coverage) ──
6064
6065    #[test]
6066    fn execute_statement_select() {
6067        let svc = make_service();
6068        create_test_table(&svc);
6069
6070        let req = make_request(
6071            "PutItem",
6072            json!({"TableName": "test-table", "Item": {"pk": {"S": "qs1"}, "val": {"S": "hello"}}}),
6073        );
6074        svc.put_item(&req).unwrap();
6075
6076        let req = make_request(
6077            "ExecuteStatement",
6078            json!({"Statement": "SELECT * FROM \"test-table\" WHERE pk='qs1'"}),
6079        );
6080        let resp = svc.execute_statement(&req).unwrap();
6081        let b = body_json(&resp);
6082        assert!(!b["Items"].as_array().unwrap().is_empty());
6083    }
6084
6085    #[test]
6086    fn execute_statement_insert() {
6087        let svc = make_service();
6088        create_test_table(&svc);
6089
6090        let req = make_request(
6091            "ExecuteStatement",
6092            json!({"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'ins1', 'data': 'val'}"}),
6093        );
6094        svc.execute_statement(&req).unwrap();
6095
6096        let req = make_request(
6097            "GetItem",
6098            json!({"TableName": "test-table", "Key": {"pk": {"S": "ins1"}}}),
6099        );
6100        let resp = svc.get_item(&req).unwrap();
6101        let b = body_json(&resp);
6102        assert_eq!(b["Item"]["data"]["S"], "val");
6103    }
6104
6105    #[test]
6106    fn batch_execute_statement() {
6107        let svc = make_service();
6108        create_test_table(&svc);
6109
6110        let req = make_request(
6111            "PutItem",
6112            json!({"TableName": "test-table", "Item": {"pk": {"S": "be1"}}}),
6113        );
6114        svc.put_item(&req).unwrap();
6115
6116        let req = make_request(
6117            "BatchExecuteStatement",
6118            json!({
6119                "Statements": [
6120                    {"Statement": "SELECT * FROM \"test-table\" WHERE pk='be1'"},
6121                ]
6122            }),
6123        );
6124        let resp = svc.batch_execute_statement(&req).unwrap();
6125        let b = body_json(&resp);
6126        assert!(b["Responses"].as_array().is_some());
6127    }
6128
6129    #[test]
6130    fn execute_transaction() {
6131        let svc = make_service();
6132        create_test_table(&svc);
6133
6134        let req = make_request(
6135            "ExecuteTransaction",
6136            json!({
6137                "TransactStatements": [
6138                    {"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'tx1'}"},
6139                    {"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'tx2'}"},
6140                ]
6141            }),
6142        );
6143        svc.execute_transaction(&req).unwrap();
6144
6145        let req = make_request(
6146            "GetItem",
6147            json!({"TableName": "test-table", "Key": {"pk": {"S": "tx1"}}}),
6148        );
6149        let resp = svc.get_item(&req).unwrap();
6150        let b = body_json(&resp);
6151        assert!(b["Item"].is_object());
6152    }
6153
6154    // ── Batch write with delete ──
6155
6156    #[test]
6157    fn batch_write_with_delete_requests() {
6158        let svc = make_service();
6159        create_test_table(&svc);
6160
6161        // Put items first
6162        for key in &["bwd1", "bwd2", "bwd3"] {
6163            let req = make_request(
6164                "PutItem",
6165                json!({"TableName": "test-table", "Item": {"pk": {"S": key}}}),
6166            );
6167            svc.put_item(&req).unwrap();
6168        }
6169
6170        // Batch delete two
6171        let req = make_request(
6172            "BatchWriteItem",
6173            json!({
6174                "RequestItems": {
6175                    "test-table": [
6176                        {"DeleteRequest": {"Key": {"pk": {"S": "bwd1"}}}},
6177                        {"DeleteRequest": {"Key": {"pk": {"S": "bwd2"}}}},
6178                    ]
6179                }
6180            }),
6181        );
6182        svc.batch_write_item(&req).unwrap();
6183
6184        // bwd3 should still exist
6185        let req = make_request(
6186            "GetItem",
6187            json!({"TableName": "test-table", "Key": {"pk": {"S": "bwd3"}}}),
6188        );
6189        let resp = svc.get_item(&req).unwrap();
6190        let b = body_json(&resp);
6191        assert!(b["Item"].is_object());
6192
6193        // bwd1 should be gone
6194        let req = make_request(
6195            "GetItem",
6196            json!({"TableName": "test-table", "Key": {"pk": {"S": "bwd1"}}}),
6197        );
6198        let resp = svc.get_item(&req).unwrap();
6199        let b = body_json(&resp);
6200        assert!(b.get("Item").is_none() || b["Item"].is_null());
6201    }
6202
6203    // ── Query with sort key condition ──
6204
6205    #[test]
6206    fn query_with_sort_key_begins_with() {
6207        let svc = make_service();
6208        // Table with hash+range
6209        let req = make_request(
6210            "CreateTable",
6211            json!({
6212                "TableName": "sk-table",
6213                "KeySchema": [
6214                    {"AttributeName": "pk", "KeyType": "HASH"},
6215                    {"AttributeName": "sk", "KeyType": "RANGE"},
6216                ],
6217                "AttributeDefinitions": [
6218                    {"AttributeName": "pk", "AttributeType": "S"},
6219                    {"AttributeName": "sk", "AttributeType": "S"},
6220                ],
6221                "BillingMode": "PAY_PER_REQUEST",
6222            }),
6223        );
6224        svc.create_table(&req).unwrap();
6225
6226        for sk in &["order#001", "order#002", "profile#main"] {
6227            let req = make_request(
6228                "PutItem",
6229                json!({"TableName": "sk-table", "Item": {"pk": {"S": "u1"}, "sk": {"S": sk}}}),
6230            );
6231            svc.put_item(&req).unwrap();
6232        }
6233
6234        let req = make_request(
6235            "Query",
6236            json!({
6237                "TableName": "sk-table",
6238                "KeyConditionExpression": "pk = :pk AND begins_with(sk, :prefix)",
6239                "ExpressionAttributeValues": {":pk": {"S": "u1"}, ":prefix": {"S": "order#"}},
6240            }),
6241        );
6242        let resp = svc.query(&req).unwrap();
6243        let b = body_json(&resp);
6244        assert_eq!(b["Count"], 2);
6245    }
6246
6247    // ── Scan with limit ──
6248
6249    #[test]
6250    fn scan_with_limit() {
6251        let svc = make_service();
6252        create_test_table(&svc);
6253
6254        for i in 0..10 {
6255            let req = make_request(
6256                "PutItem",
6257                json!({"TableName": "test-table", "Item": {"pk": {"S": format!("lim{i}")}}}),
6258            );
6259            svc.put_item(&req).unwrap();
6260        }
6261
6262        let req = make_request("Scan", json!({"TableName": "test-table", "Limit": 3}));
6263        let resp = svc.scan(&req).unwrap();
6264        let b = body_json(&resp);
6265        assert_eq!(b["Count"], 3);
6266        assert!(b["LastEvaluatedKey"].is_object());
6267    }
6268
6269    // ── Error branches ──
6270
6271    #[test]
6272    fn batch_get_item_table_not_found() {
6273        let svc = make_service();
6274        let req = make_request(
6275            "BatchGetItem",
6276            json!({"RequestItems": {"ghost": {"Keys": [{"pk": {"S": "k"}}]}}}),
6277        );
6278        assert!(svc.batch_get_item(&req).is_err());
6279    }
6280
6281    #[test]
6282    fn batch_write_item_table_not_found() {
6283        let svc = make_service();
6284        let req = make_request(
6285            "BatchWriteItem",
6286            json!({"RequestItems": {"ghost": [{"PutRequest": {"Item": {"pk": {"S": "k"}}}}]}}),
6287        );
6288        assert!(svc.batch_write_item(&req).is_err());
6289    }
6290
6291    // ── Global tables ──
6292
6293    #[test]
6294    fn create_and_describe_global_table() {
6295        let svc = make_service();
6296        create_test_table(&svc);
6297
6298        let req = make_request(
6299            "CreateGlobalTable",
6300            json!({
6301                "GlobalTableName": "test-table",
6302                "ReplicationGroup": [{"RegionName": "us-east-1"}, {"RegionName": "eu-west-1"}],
6303            }),
6304        );
6305        svc.create_global_table(&req).unwrap();
6306
6307        let req = make_request(
6308            "DescribeGlobalTable",
6309            json!({"GlobalTableName": "test-table"}),
6310        );
6311        let resp = svc.describe_global_table(&req).unwrap();
6312        let b = body_json(&resp);
6313        assert!(b["GlobalTableDescription"].is_object());
6314    }
6315
6316    #[test]
6317    fn list_global_tables() {
6318        let svc = make_service();
6319        let req = make_request("ListGlobalTables", json!({}));
6320        let resp = svc.list_global_tables(&req).unwrap();
6321        let b = body_json(&resp);
6322        assert!(b["GlobalTables"].as_array().is_some());
6323    }
6324
6325    // ── Backup operations ──
6326
6327    #[test]
6328    fn create_and_list_backups() {
6329        let svc = make_service();
6330        create_test_table(&svc);
6331
6332        let req = make_request(
6333            "CreateBackup",
6334            json!({"TableName": "test-table", "BackupName": "bak1"}),
6335        );
6336        let resp = svc.create_backup(&req).unwrap();
6337        let b = body_json(&resp);
6338        assert!(b["BackupDetails"]["BackupArn"].as_str().is_some());
6339
6340        let req = make_request("ListBackups", json!({}));
6341        let resp = svc.list_backups(&req).unwrap();
6342        let b = body_json(&resp);
6343        assert!(!b["BackupSummaries"].as_array().unwrap().is_empty());
6344    }
6345
6346    // ── Import/Export ──
6347
6348    #[test]
6349    fn describe_import_not_found() {
6350        let svc = make_service();
6351        let req = make_request(
6352            "DescribeImport",
6353            json!({"ImportArn": "arn:aws:dynamodb:us-east-1:123:table/t/import/ghost"}),
6354        );
6355        assert!(svc.describe_import(&req).is_err());
6356    }
6357
6358    #[test]
6359    fn describe_export_not_found() {
6360        let svc = make_service();
6361        let req = make_request(
6362            "DescribeExport",
6363            json!({"ExportArn": "arn:aws:dynamodb:us-east-1:123:table/t/export/ghost"}),
6364        );
6365        assert!(svc.describe_export(&req).is_err());
6366    }
6367
6368    // ── tables.rs error branches ──
6369
6370    #[test]
6371    fn create_table_missing_name_errors() {
6372        let svc = make_service();
6373        let req = make_request(
6374            "CreateTable",
6375            json!({
6376                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6377                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6378                "BillingMode": "PAY_PER_REQUEST"
6379            }),
6380        );
6381        assert!(svc.create_table(&req).is_err());
6382    }
6383
6384    #[test]
6385    fn create_table_duplicate_errors() {
6386        let svc = make_service();
6387        let req = make_request(
6388            "CreateTable",
6389            json!({
6390                "TableName": "dup",
6391                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6392                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6393                "BillingMode": "PAY_PER_REQUEST"
6394            }),
6395        );
6396        svc.create_table(&req).unwrap();
6397        assert!(svc.create_table(&req).is_err());
6398    }
6399
6400    #[test]
6401    fn delete_table_missing_name_errors() {
6402        let svc = make_service();
6403        let req = make_request("DeleteTable", json!({}));
6404        assert!(svc.delete_table(&req).is_err());
6405    }
6406
6407    #[test]
6408    fn delete_table_not_found_errors() {
6409        let svc = make_service();
6410        let req = make_request("DeleteTable", json!({"TableName": "ghost"}));
6411        assert!(svc.delete_table(&req).is_err());
6412    }
6413
6414    #[test]
6415    fn describe_table_missing_name_errors() {
6416        let svc = make_service();
6417        let req = make_request("DescribeTable", json!({}));
6418        assert!(svc.describe_table(&req).is_err());
6419    }
6420
6421    #[test]
6422    fn describe_table_not_found_errors() {
6423        let svc = make_service();
6424        let req = make_request("DescribeTable", json!({"TableName": "ghost"}));
6425        assert!(svc.describe_table(&req).is_err());
6426    }
6427
6428    #[test]
6429    fn update_table_missing_name_errors() {
6430        let svc = make_service();
6431        let req = make_request("UpdateTable", json!({}));
6432        assert!(svc.update_table(&req).is_err());
6433    }
6434
6435    #[test]
6436    fn update_table_not_found_errors() {
6437        let svc = make_service();
6438        let req = make_request("UpdateTable", json!({"TableName": "ghost"}));
6439        assert!(svc.update_table(&req).is_err());
6440    }
6441
6442    #[test]
6443    fn list_tables_pagination() {
6444        let svc = make_service();
6445        for i in 0..5 {
6446            let req = make_request(
6447                "CreateTable",
6448                json!({
6449                    "TableName": format!("pt{i}"),
6450                    "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6451                    "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6452                    "BillingMode": "PAY_PER_REQUEST"
6453                }),
6454            );
6455            svc.create_table(&req).unwrap();
6456        }
6457        let req = make_request("ListTables", json!({"Limit": 2}));
6458        let resp = svc.list_tables(&req).unwrap();
6459        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6460        assert_eq!(body["TableNames"].as_array().unwrap().len(), 2);
6461        assert!(body["LastEvaluatedTableName"].is_string());
6462    }
6463
6464    #[test]
6465    fn list_tables_start_exclusive() {
6466        let svc = make_service();
6467        for i in 0..3 {
6468            let req = make_request(
6469                "CreateTable",
6470                json!({
6471                    "TableName": format!("pt{i}"),
6472                    "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6473                    "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6474                    "BillingMode": "PAY_PER_REQUEST"
6475                }),
6476            );
6477            svc.create_table(&req).unwrap();
6478        }
6479        let req = make_request("ListTables", json!({"ExclusiveStartTableName": "pt0"}));
6480        let resp = svc.list_tables(&req).unwrap();
6481        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6482        let names = body["TableNames"].as_array().unwrap();
6483        assert!(!names.iter().any(|n| n == "pt0"));
6484    }
6485
6486    #[test]
6487    fn update_time_to_live_unknown_table_errors() {
6488        let svc = make_service();
6489        let req = make_request(
6490            "UpdateTimeToLive",
6491            json!({
6492                "TableName": "ghost",
6493                "TimeToLiveSpecification": {"Enabled": true, "AttributeName": "ttl"}
6494            }),
6495        );
6496        assert!(svc.update_time_to_live(&req).is_err());
6497    }
6498
6499    #[test]
6500    fn describe_time_to_live_unknown_table_errors() {
6501        let svc = make_service();
6502        let req = make_request("DescribeTimeToLive", json!({"TableName": "ghost"}));
6503        assert!(svc.describe_time_to_live(&req).is_err());
6504    }
6505
6506    // ── resource policy ──
6507
6508    #[test]
6509    fn put_resource_policy_missing_policy_errors() {
6510        let svc = make_service();
6511        let req = make_request(
6512            "CreateTable",
6513            json!({
6514                "TableName": "rp",
6515                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6516                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6517                "BillingMode": "PAY_PER_REQUEST"
6518            }),
6519        );
6520        svc.create_table(&req).unwrap();
6521        let req = make_request(
6522            "PutResourcePolicy",
6523            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/rp"}),
6524        );
6525        assert!(svc.put_resource_policy(&req).is_err());
6526    }
6527
6528    #[test]
6529    fn get_resource_policy_unknown_table_errors() {
6530        let svc = make_service();
6531        let req = make_request(
6532            "GetResourcePolicy",
6533            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost"}),
6534        );
6535        assert!(svc.get_resource_policy(&req).is_err());
6536    }
6537
6538    // ── tags ──
6539
6540    #[test]
6541    fn tag_resource_unknown_table_errors() {
6542        let svc = make_service();
6543        let req = make_request(
6544            "TagResource",
6545            json!({
6546                "ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost",
6547                "Tags": [{"Key": "k", "Value": "v"}]
6548            }),
6549        );
6550        assert!(svc.tag_resource(&req).is_err());
6551    }
6552
6553    #[test]
6554    fn list_tags_unknown_table_errors() {
6555        let svc = make_service();
6556        let req = make_request(
6557            "ListTagsOfResource",
6558            json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost"}),
6559        );
6560        assert!(svc.list_tags_of_resource(&req).is_err());
6561    }
6562
6563    // ── backups ──
6564
6565    #[test]
6566    fn create_backup_unknown_table_errors() {
6567        let svc = make_service();
6568        let req = make_request(
6569            "CreateBackup",
6570            json!({"TableName": "ghost", "BackupName": "b1"}),
6571        );
6572        assert!(svc.create_backup(&req).is_err());
6573    }
6574
6575    #[test]
6576    fn delete_backup_not_found_errors() {
6577        let svc = make_service();
6578        let req = make_request(
6579            "DeleteBackup",
6580            json!({"BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"}),
6581        );
6582        assert!(svc.delete_backup(&req).is_err());
6583    }
6584
6585    #[test]
6586    fn describe_backup_not_found_errors() {
6587        let svc = make_service();
6588        let req = make_request(
6589            "DescribeBackup",
6590            json!({"BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"}),
6591        );
6592        assert!(svc.describe_backup(&req).is_err());
6593    }
6594
6595    #[test]
6596    fn restore_table_from_backup_not_found_errors() {
6597        let svc = make_service();
6598        let req = make_request(
6599            "RestoreTableFromBackup",
6600            json!({
6601                "TargetTableName": "restored",
6602                "BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"
6603            }),
6604        );
6605        assert!(svc.restore_table_from_backup(&req).is_err());
6606    }
6607
6608    #[test]
6609    fn update_continuous_backups_unknown_table_errors() {
6610        let svc = make_service();
6611        let req = make_request(
6612            "UpdateContinuousBackups",
6613            json!({
6614                "TableName": "ghost",
6615                "PointInTimeRecoverySpecification": {"PointInTimeRecoveryEnabled": true}
6616            }),
6617        );
6618        assert!(svc.update_continuous_backups(&req).is_err());
6619    }
6620
6621    // ── items.rs: put_item error branches ──
6622
6623    #[test]
6624    fn put_item_unknown_table_errors() {
6625        let svc = make_service();
6626        let req = make_request(
6627            "PutItem",
6628            json!({
6629                "TableName": "ghost",
6630                "Item": {"k": {"S": "v"}}
6631            }),
6632        );
6633        assert!(svc.put_item(&req).is_err());
6634    }
6635
6636    #[test]
6637    fn put_item_missing_key_attribute_errors() {
6638        let svc = make_service();
6639        svc.create_table(&make_request(
6640            "CreateTable",
6641            json!({
6642                "TableName": "pmk",
6643                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6644                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6645                "BillingMode": "PAY_PER_REQUEST"
6646            }),
6647        ))
6648        .unwrap();
6649        let req = make_request(
6650            "PutItem",
6651            json!({
6652                "TableName": "pmk",
6653                "Item": {"other": {"S": "v"}}
6654            }),
6655        );
6656        assert!(svc.put_item(&req).is_err());
6657    }
6658
6659    #[test]
6660    fn get_item_unknown_table_errors() {
6661        let svc = make_service();
6662        let req = make_request(
6663            "GetItem",
6664            json!({"TableName": "ghost", "Key": {"k": {"S": "1"}}}),
6665        );
6666        assert!(svc.get_item(&req).is_err());
6667    }
6668
6669    #[test]
6670    fn delete_item_unknown_table_errors() {
6671        let svc = make_service();
6672        let req = make_request(
6673            "DeleteItem",
6674            json!({"TableName": "ghost", "Key": {"k": {"S": "1"}}}),
6675        );
6676        assert!(svc.delete_item(&req).is_err());
6677    }
6678
6679    #[test]
6680    fn update_item_unknown_table_errors() {
6681        let svc = make_service();
6682        let req = make_request(
6683            "UpdateItem",
6684            json!({
6685                "TableName": "ghost",
6686                "Key": {"k": {"S": "1"}},
6687                "UpdateExpression": "SET x = :v",
6688                "ExpressionAttributeValues": {":v": {"S": "val"}}
6689            }),
6690        );
6691        assert!(svc.update_item(&req).is_err());
6692    }
6693
6694    #[test]
6695    fn query_unknown_table_errors() {
6696        let svc = make_service();
6697        let req = make_request(
6698            "Query",
6699            json!({
6700                "TableName": "ghost",
6701                "KeyConditionExpression": "k = :v",
6702                "ExpressionAttributeValues": {":v": {"S": "x"}}
6703            }),
6704        );
6705        assert!(svc.query(&req).is_err());
6706    }
6707
6708    #[test]
6709    fn scan_unknown_table_errors() {
6710        let svc = make_service();
6711        let req = make_request("Scan", json!({"TableName": "ghost"}));
6712        assert!(svc.scan(&req).is_err());
6713    }
6714
6715    #[test]
6716    fn scan_with_limit_returns_ok() {
6717        let svc = make_service();
6718        svc.create_table(&make_request(
6719            "CreateTable",
6720            json!({
6721                "TableName": "slt",
6722                "AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
6723                "KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
6724                "BillingMode": "PAY_PER_REQUEST"
6725            }),
6726        ))
6727        .unwrap();
6728        for i in 0..5 {
6729            svc.put_item(&make_request(
6730                "PutItem",
6731                json!({
6732                    "TableName": "slt",
6733                    "Item": {"k": {"S": format!("key-{i}")}}
6734                }),
6735            ))
6736            .unwrap();
6737        }
6738        let req = make_request("Scan", json!({"TableName": "slt", "Limit": 2}));
6739        let resp = svc.scan(&req).unwrap();
6740        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6741        assert_eq!(body["Count"], 2);
6742    }
6743
6744    #[test]
6745    fn batch_get_item_unknown_table_errors() {
6746        let svc = make_service();
6747        let req = make_request(
6748            "BatchGetItem",
6749            json!({
6750                "RequestItems": {
6751                    "ghost": {"Keys": [{"k": {"S": "1"}}]}
6752                }
6753            }),
6754        );
6755        assert!(svc.batch_get_item(&req).is_err());
6756    }
6757
6758    #[test]
6759    fn batch_write_item_unknown_table_errors() {
6760        let svc = make_service();
6761        let req = make_request(
6762            "BatchWriteItem",
6763            json!({
6764                "RequestItems": {
6765                    "ghost": [{"PutRequest": {"Item": {"k": {"S": "1"}}}}]
6766                }
6767            }),
6768        );
6769        assert!(svc.batch_write_item(&req).is_err());
6770    }
6771
6772    #[test]
6773    fn transact_write_items_unknown_table_errors() {
6774        let svc = make_service();
6775        let req = make_request(
6776            "TransactWriteItems",
6777            json!({
6778                "TransactItems": [{
6779                    "Put": {"TableName": "ghost", "Item": {"k": {"S": "1"}}}
6780                }]
6781            }),
6782        );
6783        assert!(svc.transact_write_items(&req).is_err());
6784    }
6785
6786    #[test]
6787    fn transact_get_items_unknown_table_errors() {
6788        let svc = make_service();
6789        let req = make_request(
6790            "TransactGetItems",
6791            json!({
6792                "TransactItems": [{
6793                    "Get": {"TableName": "ghost", "Key": {"k": {"S": "1"}}}
6794                }]
6795            }),
6796        );
6797        assert!(svc.transact_get_items(&req).is_err());
6798    }
6799
6800    #[test]
6801    fn describe_global_table_not_found_b() {
6802        let svc = make_service();
6803        let req = make_request("DescribeGlobalTable", json!({"GlobalTableName": "ghost"}));
6804        assert!(svc.describe_global_table(&req).is_err());
6805    }
6806
6807    #[test]
6808    fn list_global_tables_empty_ok() {
6809        let svc = make_service();
6810        let req = make_request("ListGlobalTables", json!({}));
6811        let resp = svc.list_global_tables(&req).unwrap();
6812        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
6813        assert!(body["GlobalTables"].is_array());
6814    }
6815
6816    #[test]
6817    fn split_on_top_level_keyword_between_swallows_inner_and() {
6818        let parts = split_on_top_level_keyword("x = :a AND y BETWEEN :lo AND :hi", "AND");
6819        assert_eq!(
6820            parts.len(),
6821            2,
6822            "BETWEEN's inner AND must not split; got parts = {parts:?}"
6823        );
6824    }
6825
6826    #[test]
6827    fn split_on_top_level_keyword_between_nested_parens() {
6828        let parts = split_on_top_level_keyword("(x = :a) AND (y BETWEEN :lo AND :hi)", "AND");
6829        assert_eq!(parts.len(), 2);
6830    }
6831
6832    #[test]
6833    fn split_on_top_level_keyword_whitespace_variants() {
6834        for expr in [
6835            "x = :a AND y = :b",
6836            "x=:a AND y=:b",
6837            "  x = :a   AND   y = :b  ",
6838            "x\t=\t:a\tAND\ty\t=\t:b",
6839            "x = :a\nAND\ny = :b",
6840        ] {
6841            let parts = split_on_top_level_keyword(expr, "AND");
6842            assert_eq!(parts.len(), 2, "whitespace variant failed: {expr:?}");
6843        }
6844    }
6845
6846    #[test]
6847    fn split_on_top_level_keyword_case_insensitive() {
6848        let parts = split_on_top_level_keyword("x = :a and y = :b", "AND");
6849        assert_eq!(parts.len(), 2);
6850        let parts = split_on_top_level_keyword("x = :a OR y = :b", "OR");
6851        assert_eq!(parts.len(), 2);
6852    }
6853
6854    #[test]
6855    fn split_on_top_level_keyword_does_not_match_inside_identifiers() {
6856        // `land` contains "AND" but isn't word-bounded — must not split.
6857        let parts = split_on_top_level_keyword("land = :a", "AND");
6858        assert_eq!(parts.len(), 1);
6859    }
6860}