Skip to main content

fakecloud_dynamodb/
service.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use parking_lot::RwLock;
9use serde_json::{json, Value};
10
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_s3::state::SharedS3State;
16
17use crate::state::{
18    attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
19    ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
20    KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
21    ReplicaDescription, SharedDynamoDbState,
22};
23
24pub struct DynamoDbService {
25    state: SharedDynamoDbState,
26    s3_state: Option<SharedS3State>,
27    delivery: Option<Arc<DeliveryBus>>,
28}
29
30impl DynamoDbService {
31    pub fn new(state: SharedDynamoDbState) -> Self {
32        Self {
33            state,
34            s3_state: None,
35            delivery: None,
36        }
37    }
38
39    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
40        self.s3_state = Some(s3_state);
41        self
42    }
43
44    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
45        self.delivery = Some(delivery);
46        self
47    }
48
49    /// Deliver a change record to all active Kinesis streaming destinations for a table.
50    fn deliver_to_kinesis_destinations(
51        &self,
52        table: &DynamoTable,
53        event_name: &str,
54        keys: &HashMap<String, AttributeValue>,
55        old_image: Option<&HashMap<String, AttributeValue>>,
56        new_image: Option<&HashMap<String, AttributeValue>>,
57    ) {
58        let delivery = match &self.delivery {
59            Some(d) => d,
60            None => return,
61        };
62
63        let active_destinations: Vec<_> = table
64            .kinesis_destinations
65            .iter()
66            .filter(|d| d.destination_status == "ACTIVE")
67            .collect();
68
69        if active_destinations.is_empty() {
70            return;
71        }
72
73        let mut record = json!({
74            "eventID": uuid::Uuid::new_v4().to_string(),
75            "eventName": event_name,
76            "eventVersion": "1.1",
77            "eventSource": "aws:dynamodb",
78            "awsRegion": table.arn.split(':').nth(3).unwrap_or("us-east-1"),
79            "dynamodb": {
80                "Keys": keys,
81                "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
82                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
83                "StreamViewType": "NEW_AND_OLD_IMAGES",
84            },
85            "eventSourceARN": &table.arn,
86            "tableName": &table.name,
87        });
88
89        if let Some(old) = old_image {
90            record["dynamodb"]["OldImage"] = json!(old);
91        }
92        if let Some(new) = new_image {
93            record["dynamodb"]["NewImage"] = json!(new);
94        }
95
96        let record_str = serde_json::to_string(&record).unwrap_or_default();
97        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
98        let partition_key = serde_json::to_string(keys).unwrap_or_default();
99
100        for dest in active_destinations {
101            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
102        }
103    }
104
105    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
106        serde_json::from_slice(&req.body).map_err(|e| {
107            AwsServiceError::aws_error(
108                StatusCode::BAD_REQUEST,
109                "SerializationException",
110                format!("Invalid JSON: {e}"),
111            )
112        })
113    }
114
115    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
116        Ok(AwsResponse::ok_json(body))
117    }
118
119    // ── Table Operations ────────────────────────────────────────────────
120
121    fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
122        let body = Self::parse_body(req)?;
123
124        let table_name = body["TableName"]
125            .as_str()
126            .ok_or_else(|| {
127                AwsServiceError::aws_error(
128                    StatusCode::BAD_REQUEST,
129                    "ValidationException",
130                    "TableName is required",
131                )
132            })?
133            .to_string();
134
135        let key_schema = parse_key_schema(&body["KeySchema"])?;
136        let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
137
138        // Validate that key schema attributes are defined
139        for ks in &key_schema {
140            if !attribute_definitions
141                .iter()
142                .any(|ad| ad.attribute_name == ks.attribute_name)
143            {
144                return Err(AwsServiceError::aws_error(
145                    StatusCode::BAD_REQUEST,
146                    "ValidationException",
147                    format!(
148                        "One or more parameter values were invalid: \
149                         Some index key attributes are not defined in AttributeDefinitions. \
150                         Keys: [{}], AttributeDefinitions: [{}]",
151                        ks.attribute_name,
152                        attribute_definitions
153                            .iter()
154                            .map(|ad| ad.attribute_name.as_str())
155                            .collect::<Vec<_>>()
156                            .join(", ")
157                    ),
158                ));
159            }
160        }
161
162        let billing_mode = body["BillingMode"]
163            .as_str()
164            .unwrap_or("PROVISIONED")
165            .to_string();
166
167        let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
168            ProvisionedThroughput {
169                read_capacity_units: 0,
170                write_capacity_units: 0,
171            }
172        } else {
173            parse_provisioned_throughput(&body["ProvisionedThroughput"])?
174        };
175
176        let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
177        let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
178        let tags = parse_tags(&body["Tags"]);
179
180        // Parse StreamSpecification
181        let (stream_enabled, stream_view_type) =
182            if let Some(stream_spec) = body.get("StreamSpecification") {
183                let enabled = stream_spec
184                    .get("StreamEnabled")
185                    .and_then(|v| v.as_bool())
186                    .unwrap_or(false);
187                let view_type = if enabled {
188                    stream_spec
189                        .get("StreamViewType")
190                        .and_then(|v| v.as_str())
191                        .map(|s| s.to_string())
192                } else {
193                    None
194                };
195                (enabled, view_type)
196            } else {
197                (false, None)
198            };
199
200        // Parse SSESpecification
201        let (sse_type, sse_kms_key_arn) = if let Some(sse_spec) = body.get("SSESpecification") {
202            let enabled = sse_spec
203                .get("Enabled")
204                .and_then(|v| v.as_bool())
205                .unwrap_or(false);
206            if enabled {
207                let sse_type = sse_spec
208                    .get("SSEType")
209                    .and_then(|v| v.as_str())
210                    .unwrap_or("KMS")
211                    .to_string();
212                let kms_key = sse_spec
213                    .get("KMSMasterKeyId")
214                    .and_then(|v| v.as_str())
215                    .map(|s| s.to_string());
216                (Some(sse_type), kms_key)
217            } else {
218                (None, None)
219            }
220        } else {
221            (None, None)
222        };
223
224        let mut state = self.state.write();
225
226        if state.tables.contains_key(&table_name) {
227            return Err(AwsServiceError::aws_error(
228                StatusCode::BAD_REQUEST,
229                "ResourceInUseException",
230                format!("Table already exists: {table_name}"),
231            ));
232        }
233
234        let now = Utc::now();
235        let arn = format!(
236            "arn:aws:dynamodb:{}:{}:table/{}",
237            state.region, state.account_id, table_name
238        );
239        let stream_arn = if stream_enabled {
240            Some(format!(
241                "arn:aws:dynamodb:{}:{}:table/{}/stream/{}",
242                state.region,
243                state.account_id,
244                table_name,
245                now.format("%Y-%m-%dT%H:%M:%S.%3f")
246            ))
247        } else {
248            None
249        };
250
251        let table = DynamoTable {
252            name: table_name.clone(),
253            arn: arn.clone(),
254            key_schema: key_schema.clone(),
255            attribute_definitions: attribute_definitions.clone(),
256            provisioned_throughput: provisioned_throughput.clone(),
257            items: Vec::new(),
258            gsi: gsi.clone(),
259            lsi: lsi.clone(),
260            tags,
261            created_at: now,
262            status: "ACTIVE".to_string(),
263            item_count: 0,
264            size_bytes: 0,
265            billing_mode: billing_mode.clone(),
266            ttl_attribute: None,
267            ttl_enabled: false,
268            resource_policy: None,
269            pitr_enabled: false,
270            kinesis_destinations: Vec::new(),
271            contributor_insights_status: "DISABLED".to_string(),
272            contributor_insights_counters: HashMap::new(),
273            stream_enabled,
274            stream_view_type,
275            stream_arn,
276            stream_records: Arc::new(RwLock::new(Vec::new())),
277            sse_type,
278            sse_kms_key_arn,
279        };
280
281        state.tables.insert(table_name, table);
282
283        let table_desc = build_table_description_json(
284            &arn,
285            &key_schema,
286            &attribute_definitions,
287            &provisioned_throughput,
288            &gsi,
289            &lsi,
290            &billing_mode,
291            now,
292            0,
293            0,
294            "ACTIVE",
295        );
296
297        Self::ok_json(json!({ "TableDescription": table_desc }))
298    }
299
300    fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301        let body = Self::parse_body(req)?;
302        let table_name = require_str(&body, "TableName")?;
303
304        let mut state = self.state.write();
305        let table = state.tables.remove(table_name).ok_or_else(|| {
306            AwsServiceError::aws_error(
307                StatusCode::BAD_REQUEST,
308                "ResourceNotFoundException",
309                format!("Requested resource not found: Table: {table_name} not found"),
310            )
311        })?;
312
313        let table_desc = build_table_description_json(
314            &table.arn,
315            &table.key_schema,
316            &table.attribute_definitions,
317            &table.provisioned_throughput,
318            &table.gsi,
319            &table.lsi,
320            &table.billing_mode,
321            table.created_at,
322            table.item_count,
323            table.size_bytes,
324            "DELETING",
325        );
326
327        Self::ok_json(json!({ "TableDescription": table_desc }))
328    }
329
330    fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
331        let body = Self::parse_body(req)?;
332        let table_name = require_str(&body, "TableName")?;
333
334        let state = self.state.read();
335        let table = get_table(&state.tables, table_name)?;
336
337        let table_desc = build_table_description(table);
338
339        Self::ok_json(json!({ "Table": table_desc }))
340    }
341
342    fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
343        let body = Self::parse_body(req)?;
344
345        validate_optional_string_length(
346            "exclusiveStartTableName",
347            body["ExclusiveStartTableName"].as_str(),
348            3,
349            255,
350        )?;
351        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
352
353        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
354        let exclusive_start = body["ExclusiveStartTableName"]
355            .as_str()
356            .map(|s| s.to_string());
357
358        let state = self.state.read();
359        let mut names: Vec<&String> = state.tables.keys().collect();
360        names.sort();
361
362        let start_idx = match &exclusive_start {
363            Some(start) => names
364                .iter()
365                .position(|n| n.as_str() > start.as_str())
366                .unwrap_or(names.len()),
367            None => 0,
368        };
369
370        let page: Vec<&str> = names
371            .iter()
372            .skip(start_idx)
373            .take(limit)
374            .map(|n| n.as_str())
375            .collect();
376
377        let mut result = json!({ "TableNames": page });
378
379        if start_idx + limit < names.len() {
380            if let Some(last) = page.last() {
381                result["LastEvaluatedTableName"] = json!(last);
382            }
383        }
384
385        Self::ok_json(result)
386    }
387
388    fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
389        let body = Self::parse_body(req)?;
390        let table_name = require_str(&body, "TableName")?;
391
392        let mut state = self.state.write();
393        let table = state.tables.get_mut(table_name).ok_or_else(|| {
394            AwsServiceError::aws_error(
395                StatusCode::BAD_REQUEST,
396                "ResourceNotFoundException",
397                format!("Requested resource not found: Table: {table_name} not found"),
398            )
399        })?;
400
401        if let Some(pt) = body.get("ProvisionedThroughput") {
402            if let Ok(throughput) = parse_provisioned_throughput(pt) {
403                table.provisioned_throughput = throughput;
404            }
405        }
406
407        if let Some(bm) = body["BillingMode"].as_str() {
408            table.billing_mode = bm.to_string();
409        }
410
411        // Handle SSESpecification update
412        if let Some(sse_spec) = body.get("SSESpecification") {
413            let enabled = sse_spec
414                .get("Enabled")
415                .and_then(|v| v.as_bool())
416                .unwrap_or(false);
417            if enabled {
418                table.sse_type = Some(
419                    sse_spec
420                        .get("SSEType")
421                        .and_then(|v| v.as_str())
422                        .unwrap_or("KMS")
423                        .to_string(),
424                );
425                table.sse_kms_key_arn = sse_spec
426                    .get("KMSMasterKeyId")
427                    .and_then(|v| v.as_str())
428                    .map(|s| s.to_string());
429            } else {
430                table.sse_type = None;
431                table.sse_kms_key_arn = None;
432            }
433        }
434
435        let table_desc = build_table_description_json(
436            &table.arn,
437            &table.key_schema,
438            &table.attribute_definitions,
439            &table.provisioned_throughput,
440            &table.gsi,
441            &table.lsi,
442            &table.billing_mode,
443            table.created_at,
444            table.item_count,
445            table.size_bytes,
446            &table.status,
447        );
448
449        Self::ok_json(json!({ "TableDescription": table_desc }))
450    }
451
452    // ── Item Operations ─────────────────────────────────────────────────
453
454    fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
455        // --- Parse request body and expression attributes WITHOUT holding any lock ---
456        let body = Self::parse_body(req)?;
457        let table_name = require_str(&body, "TableName")?;
458        let item = require_object(&body, "Item")?;
459        let condition = body["ConditionExpression"].as_str().map(|s| s.to_string());
460        let expr_attr_names = parse_expression_attribute_names(&body);
461        let expr_attr_values = parse_expression_attribute_values(&body);
462        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE").to_string();
463
464        // --- Acquire write lock ONLY for validation + mutation ---
465        // Capture kinesis delivery info alongside the return value
466        let (old_item, kinesis_info) = {
467            let mut state = self.state.write();
468            let region = state.region.clone();
469            let table = get_table_mut(&mut state.tables, table_name)?;
470
471            validate_key_in_item(table, &item)?;
472
473            let key = extract_key(table, &item);
474            let existing_idx = table.find_item_index(&key);
475
476            if let Some(ref cond) = condition {
477                let existing = existing_idx.map(|i| &table.items[i]);
478                evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
479            }
480
481            let old_item_for_return = if return_values == "ALL_OLD" {
482                existing_idx.map(|i| table.items[i].clone())
483            } else {
484                None
485            };
486
487            // Capture old item for stream/kinesis if needed
488            let needs_change_capture = table.stream_enabled
489                || table
490                    .kinesis_destinations
491                    .iter()
492                    .any(|d| d.destination_status == "ACTIVE");
493            let old_item_for_stream = if needs_change_capture {
494                existing_idx.map(|i| table.items[i].clone())
495            } else {
496                None
497            };
498
499            let is_modify = existing_idx.is_some();
500
501            if let Some(idx) = existing_idx {
502                table.items[idx] = item.clone();
503            } else {
504                table.items.push(item.clone());
505            }
506
507            table.record_item_access(&item);
508            table.recalculate_stats();
509
510            let event_name = if is_modify { "MODIFY" } else { "INSERT" };
511            let key = extract_key(table, &item);
512
513            // Generate stream record
514            if table.stream_enabled {
515                if let Some(record) = crate::streams::generate_stream_record(
516                    table,
517                    event_name,
518                    key.clone(),
519                    old_item_for_stream.clone(),
520                    Some(item.clone()),
521                    &region,
522                ) {
523                    crate::streams::add_stream_record(table, record);
524                }
525            }
526
527            // Capture kinesis delivery info (delivered after lock release)
528            let kinesis_info = if table
529                .kinesis_destinations
530                .iter()
531                .any(|d| d.destination_status == "ACTIVE")
532            {
533                Some((
534                    table.clone(),
535                    event_name.to_string(),
536                    key,
537                    old_item_for_stream,
538                    Some(item.clone()),
539                ))
540            } else {
541                None
542            };
543
544            (old_item_for_return, kinesis_info)
545        };
546        // --- Write lock released, build response ---
547
548        // Deliver to Kinesis destinations outside the lock
549        if let Some((table, event_name, keys, old_image, new_image)) = kinesis_info {
550            self.deliver_to_kinesis_destinations(
551                &table,
552                &event_name,
553                &keys,
554                old_image.as_ref(),
555                new_image.as_ref(),
556            );
557        }
558
559        let mut result = json!({});
560        if let Some(old) = old_item {
561            result["Attributes"] = json!(old);
562        }
563
564        Self::ok_json(result)
565    }
566
567    fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
568        // --- Parse request body WITHOUT holding any lock ---
569        let body = Self::parse_body(req)?;
570        let table_name = require_str(&body, "TableName")?;
571        let key = require_object(&body, "Key")?;
572
573        // --- Use a read lock for the lookup (allows concurrent GetItem calls) ---
574        let (result, needs_insights) = {
575            let state = self.state.read();
576            let table = get_table(&state.tables, table_name)?;
577            let needs_insights = table.contributor_insights_status == "ENABLED";
578
579            let result = match table.find_item_index(&key) {
580                Some(idx) => {
581                    let item = &table.items[idx];
582                    let projected = project_item(item, &body);
583                    json!({ "Item": projected })
584                }
585                None => json!({}),
586            };
587            (result, needs_insights)
588        };
589        // --- Read lock released ---
590
591        // Only acquire write lock if contributor insights tracking is enabled
592        if needs_insights {
593            let mut state = self.state.write();
594            if let Some(table) = state.tables.get_mut(table_name) {
595                table.record_key_access(&key);
596            }
597        }
598
599        Self::ok_json(result)
600    }
601
602    fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
603        let body = Self::parse_body(req)?;
604
605        validate_optional_enum_value(
606            "conditionalOperator",
607            &body["ConditionalOperator"],
608            &["AND", "OR"],
609        )?;
610        validate_optional_enum_value(
611            "returnConsumedCapacity",
612            &body["ReturnConsumedCapacity"],
613            &["INDEXES", "TOTAL", "NONE"],
614        )?;
615        validate_optional_enum_value(
616            "returnValues",
617            &body["ReturnValues"],
618            &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
619        )?;
620        validate_optional_enum_value(
621            "returnItemCollectionMetrics",
622            &body["ReturnItemCollectionMetrics"],
623            &["SIZE", "NONE"],
624        )?;
625        validate_optional_enum_value(
626            "returnValuesOnConditionCheckFailure",
627            &body["ReturnValuesOnConditionCheckFailure"],
628            &["ALL_OLD", "NONE"],
629        )?;
630
631        let table_name = require_str(&body, "TableName")?;
632        let key = require_object(&body, "Key")?;
633
634        let (result, kinesis_info) = {
635            let mut state = self.state.write();
636            let region = state.region.clone();
637            let table = get_table_mut(&mut state.tables, table_name)?;
638
639            let condition = body["ConditionExpression"].as_str();
640            let expr_attr_names = parse_expression_attribute_names(&body);
641            let expr_attr_values = parse_expression_attribute_values(&body);
642
643            let existing_idx = table.find_item_index(&key);
644
645            if let Some(cond) = condition {
646                let existing = existing_idx.map(|i| &table.items[i]);
647                evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
648            }
649
650            let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
651
652            let mut result = json!({});
653            let mut kinesis_info = None;
654
655            if let Some(idx) = existing_idx {
656                let old_item = table.items[idx].clone();
657                if return_values == "ALL_OLD" {
658                    result["Attributes"] = json!(old_item.clone());
659                }
660
661                // Generate stream record before removing
662                if table.stream_enabled {
663                    if let Some(record) = crate::streams::generate_stream_record(
664                        table,
665                        "REMOVE",
666                        key.clone(),
667                        Some(old_item.clone()),
668                        None,
669                        &region,
670                    ) {
671                        crate::streams::add_stream_record(table, record);
672                    }
673                }
674
675                // Capture kinesis delivery info
676                if table
677                    .kinesis_destinations
678                    .iter()
679                    .any(|d| d.destination_status == "ACTIVE")
680                {
681                    kinesis_info = Some((table.clone(), key.clone(), Some(old_item)));
682                }
683
684                table.items.remove(idx);
685                table.recalculate_stats();
686            }
687
688            let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
689            let return_icm = body["ReturnItemCollectionMetrics"]
690                .as_str()
691                .unwrap_or("NONE");
692
693            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
694                result["ConsumedCapacity"] = json!({
695                    "TableName": table_name,
696                    "CapacityUnits": 1.0,
697                });
698            }
699
700            if return_icm == "SIZE" {
701                result["ItemCollectionMetrics"] = json!({});
702            }
703
704            (result, kinesis_info)
705        };
706
707        // Deliver to Kinesis destinations outside the lock
708        if let Some((table, keys, old_image)) = kinesis_info {
709            self.deliver_to_kinesis_destinations(&table, "REMOVE", &keys, old_image.as_ref(), None);
710        }
711
712        Self::ok_json(result)
713    }
714
715    fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
716        let body = Self::parse_body(req)?;
717        let table_name = require_str(&body, "TableName")?;
718        let key = require_object(&body, "Key")?;
719
720        let mut state = self.state.write();
721        let region = state.region.clone();
722        let table = get_table_mut(&mut state.tables, table_name)?;
723
724        validate_key_attributes_in_key(table, &key)?;
725
726        let condition = body["ConditionExpression"].as_str();
727        let expr_attr_names = parse_expression_attribute_names(&body);
728        let expr_attr_values = parse_expression_attribute_values(&body);
729        let update_expression = body["UpdateExpression"].as_str();
730
731        let existing_idx = table.find_item_index(&key);
732
733        if let Some(cond) = condition {
734            let existing = existing_idx.map(|i| &table.items[i]);
735            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
736        }
737
738        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
739
740        let is_insert = existing_idx.is_none();
741        let idx = match existing_idx {
742            Some(i) => i,
743            None => {
744                let mut new_item = HashMap::new();
745                for (k, v) in &key {
746                    new_item.insert(k.clone(), v.clone());
747                }
748                table.items.push(new_item);
749                table.items.len() - 1
750            }
751        };
752
753        // Capture old item for stream/kinesis (before update)
754        let needs_change_capture = table.stream_enabled
755            || table
756                .kinesis_destinations
757                .iter()
758                .any(|d| d.destination_status == "ACTIVE");
759        let old_item_for_stream = if needs_change_capture {
760            Some(table.items[idx].clone())
761        } else {
762            None
763        };
764
765        let old_item = if return_values == "ALL_OLD" {
766            Some(table.items[idx].clone())
767        } else {
768            None
769        };
770
771        if let Some(expr) = update_expression {
772            apply_update_expression(
773                &mut table.items[idx],
774                expr,
775                &expr_attr_names,
776                &expr_attr_values,
777            )?;
778        }
779
780        let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
781            Some(table.items[idx].clone())
782        } else {
783            None
784        };
785
786        let event_name = if is_insert { "INSERT" } else { "MODIFY" };
787        let new_item_for_stream = table.items[idx].clone();
788
789        // Generate stream record after update
790        if table.stream_enabled {
791            if let Some(record) = crate::streams::generate_stream_record(
792                table,
793                event_name,
794                key.clone(),
795                old_item_for_stream.clone(),
796                Some(new_item_for_stream.clone()),
797                &region,
798            ) {
799                crate::streams::add_stream_record(table, record);
800            }
801        }
802
803        // Capture kinesis delivery info
804        let kinesis_info = if table
805            .kinesis_destinations
806            .iter()
807            .any(|d| d.destination_status == "ACTIVE")
808        {
809            Some((
810                table.clone(),
811                event_name.to_string(),
812                key.clone(),
813                old_item_for_stream,
814                Some(new_item_for_stream),
815            ))
816        } else {
817            None
818        };
819
820        table.recalculate_stats();
821
822        // Release the write lock (drop `state`)
823        drop(state);
824
825        // Deliver to Kinesis destinations outside the lock
826        if let Some((tbl, ev, keys, old_image, new_image)) = kinesis_info {
827            self.deliver_to_kinesis_destinations(
828                &tbl,
829                &ev,
830                &keys,
831                old_image.as_ref(),
832                new_image.as_ref(),
833            );
834        }
835
836        let mut result = json!({});
837        if let Some(old) = old_item {
838            result["Attributes"] = json!(old);
839        } else if let Some(new) = new_item {
840            result["Attributes"] = json!(new);
841        }
842
843        Self::ok_json(result)
844    }
845
846    // ── Query & Scan ────────────────────────────────────────────────────
847
848    fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
849        let body = Self::parse_body(req)?;
850        let table_name = require_str(&body, "TableName")?;
851
852        let state = self.state.read();
853        let table = get_table(&state.tables, table_name)?;
854
855        let expr_attr_names = parse_expression_attribute_names(&body);
856        let expr_attr_values = parse_expression_attribute_values(&body);
857
858        let key_condition = body["KeyConditionExpression"].as_str();
859        let filter_expression = body["FilterExpression"].as_str();
860        let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
861        let limit = body["Limit"].as_i64().map(|l| l as usize);
862        let index_name = body["IndexName"].as_str();
863        let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
864            parse_key_map(&body["ExclusiveStartKey"]);
865
866        let (items_to_scan, hash_key_name, range_key_name): (
867            &[HashMap<String, AttributeValue>],
868            String,
869            Option<String>,
870        ) = if let Some(idx_name) = index_name {
871            if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
872                let hk = gsi
873                    .key_schema
874                    .iter()
875                    .find(|k| k.key_type == "HASH")
876                    .map(|k| k.attribute_name.clone())
877                    .unwrap_or_default();
878                let rk = gsi
879                    .key_schema
880                    .iter()
881                    .find(|k| k.key_type == "RANGE")
882                    .map(|k| k.attribute_name.clone());
883                (&table.items, hk, rk)
884            } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
885                let hk = lsi
886                    .key_schema
887                    .iter()
888                    .find(|k| k.key_type == "HASH")
889                    .map(|k| k.attribute_name.clone())
890                    .unwrap_or_default();
891                let rk = lsi
892                    .key_schema
893                    .iter()
894                    .find(|k| k.key_type == "RANGE")
895                    .map(|k| k.attribute_name.clone());
896                (&table.items, hk, rk)
897            } else {
898                return Err(AwsServiceError::aws_error(
899                    StatusCode::BAD_REQUEST,
900                    "ValidationException",
901                    format!("The table does not have the specified index: {idx_name}"),
902                ));
903            }
904        } else {
905            (
906                &table.items[..],
907                table.hash_key_name().to_string(),
908                table.range_key_name().map(|s| s.to_string()),
909            )
910        };
911
912        let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
913            .iter()
914            .filter(|item| {
915                if let Some(kc) = key_condition {
916                    evaluate_key_condition(
917                        kc,
918                        item,
919                        &hash_key_name,
920                        range_key_name.as_deref(),
921                        &expr_attr_names,
922                        &expr_attr_values,
923                    )
924                } else {
925                    true
926                }
927            })
928            .collect();
929
930        if let Some(ref rk) = range_key_name {
931            matched.sort_by(|a, b| {
932                let av = a.get(rk.as_str());
933                let bv = b.get(rk.as_str());
934                compare_attribute_values(av, bv)
935            });
936            if !scan_forward {
937                matched.reverse();
938            }
939        }
940
941        // For GSI queries, we need the table's primary key attributes to uniquely
942        // identify items (GSI keys are not unique).
943        let table_pk_hash = table.hash_key_name().to_string();
944        let table_pk_range = table.range_key_name().map(|s| s.to_string());
945        let is_gsi_query = index_name.is_some()
946            && (hash_key_name != table_pk_hash
947                || range_key_name.as_deref() != table_pk_range.as_deref());
948
949        // Apply ExclusiveStartKey: skip items up to and including the start key.
950        // For GSI queries the start key contains both index keys and table PK, so
951        // we must match on ALL of them to find the exact item.
952        if let Some(ref start_key) = exclusive_start_key {
953            if let Some(pos) = matched.iter().position(|item| {
954                let index_match =
955                    item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref());
956                if is_gsi_query {
957                    index_match
958                        && item_matches_key(
959                            item,
960                            start_key,
961                            &table_pk_hash,
962                            table_pk_range.as_deref(),
963                        )
964                } else {
965                    index_match
966                }
967            }) {
968                matched = matched.split_off(pos + 1);
969            }
970        }
971
972        if let Some(filter) = filter_expression {
973            matched.retain(|item| {
974                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
975            });
976        }
977
978        let scanned_count = matched.len();
979
980        let has_more = if let Some(lim) = limit {
981            let more = matched.len() > lim;
982            matched.truncate(lim);
983            more
984        } else {
985            false
986        };
987
988        // Build LastEvaluatedKey from the last returned item if there are more results.
989        // For GSI queries, include both the index keys and the table's primary key
990        // so the item can be uniquely identified on resume.
991        let last_evaluated_key = if has_more {
992            matched.last().map(|item| {
993                let mut key =
994                    extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref());
995                if is_gsi_query {
996                    let table_key =
997                        extract_key_for_schema(item, &table_pk_hash, table_pk_range.as_deref());
998                    key.extend(table_key);
999                }
1000                key
1001            })
1002        } else {
1003            None
1004        };
1005
1006        // Collect partition key values for contributor insights
1007        let insights_enabled = table.contributor_insights_status == "ENABLED";
1008        let pk_name = table.hash_key_name().to_string();
1009        let accessed_keys: Vec<String> = if insights_enabled {
1010            matched
1011                .iter()
1012                .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1013                .collect()
1014        } else {
1015            Vec::new()
1016        };
1017
1018        let items: Vec<Value> = matched
1019            .iter()
1020            .map(|item| {
1021                let projected = project_item(item, &body);
1022                json!(projected)
1023            })
1024            .collect();
1025
1026        let mut result = json!({
1027            "Items": items,
1028            "Count": items.len(),
1029            "ScannedCount": scanned_count,
1030        });
1031
1032        if let Some(lek) = last_evaluated_key {
1033            result["LastEvaluatedKey"] = json!(lek);
1034        }
1035
1036        drop(state);
1037
1038        if !accessed_keys.is_empty() {
1039            let mut state = self.state.write();
1040            if let Some(table) = state.tables.get_mut(table_name) {
1041                // Re-check insights status after acquiring write lock in case it
1042                // was disabled between the read and write lock acquisitions.
1043                if table.contributor_insights_status == "ENABLED" {
1044                    for key_str in accessed_keys {
1045                        *table
1046                            .contributor_insights_counters
1047                            .entry(key_str)
1048                            .or_insert(0) += 1;
1049                    }
1050                }
1051            }
1052        }
1053
1054        Self::ok_json(result)
1055    }
1056
1057    fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1058        let body = Self::parse_body(req)?;
1059        let table_name = require_str(&body, "TableName")?;
1060
1061        let state = self.state.read();
1062        let table = get_table(&state.tables, table_name)?;
1063
1064        let expr_attr_names = parse_expression_attribute_names(&body);
1065        let expr_attr_values = parse_expression_attribute_values(&body);
1066        let filter_expression = body["FilterExpression"].as_str();
1067        let limit = body["Limit"].as_i64().map(|l| l as usize);
1068        let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
1069            parse_key_map(&body["ExclusiveStartKey"]);
1070
1071        let hash_key_name = table.hash_key_name().to_string();
1072        let range_key_name = table.range_key_name().map(|s| s.to_string());
1073
1074        let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
1075
1076        // Apply ExclusiveStartKey: skip items up to and including the start key
1077        if let Some(ref start_key) = exclusive_start_key {
1078            if let Some(pos) = matched.iter().position(|item| {
1079                item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref())
1080            }) {
1081                matched = matched.split_off(pos + 1);
1082            }
1083        }
1084
1085        let scanned_count = matched.len();
1086
1087        if let Some(filter) = filter_expression {
1088            matched.retain(|item| {
1089                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
1090            });
1091        }
1092
1093        let has_more = if let Some(lim) = limit {
1094            let more = matched.len() > lim;
1095            matched.truncate(lim);
1096            more
1097        } else {
1098            false
1099        };
1100
1101        // Build LastEvaluatedKey from the last returned item if there are more results
1102        let last_evaluated_key = if has_more {
1103            matched
1104                .last()
1105                .map(|item| extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref()))
1106        } else {
1107            None
1108        };
1109
1110        // Collect partition key values for contributor insights
1111        let insights_enabled = table.contributor_insights_status == "ENABLED";
1112        let pk_name = table.hash_key_name().to_string();
1113        let accessed_keys: Vec<String> = if insights_enabled {
1114            matched
1115                .iter()
1116                .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1117                .collect()
1118        } else {
1119            Vec::new()
1120        };
1121
1122        let items: Vec<Value> = matched
1123            .iter()
1124            .map(|item| {
1125                let projected = project_item(item, &body);
1126                json!(projected)
1127            })
1128            .collect();
1129
1130        let mut result = json!({
1131            "Items": items,
1132            "Count": items.len(),
1133            "ScannedCount": scanned_count,
1134        });
1135
1136        if let Some(lek) = last_evaluated_key {
1137            result["LastEvaluatedKey"] = json!(lek);
1138        }
1139
1140        drop(state);
1141
1142        if !accessed_keys.is_empty() {
1143            let mut state = self.state.write();
1144            if let Some(table) = state.tables.get_mut(table_name) {
1145                // Re-check insights status after acquiring write lock in case it
1146                // was disabled between the read and write lock acquisitions.
1147                if table.contributor_insights_status == "ENABLED" {
1148                    for key_str in accessed_keys {
1149                        *table
1150                            .contributor_insights_counters
1151                            .entry(key_str)
1152                            .or_insert(0) += 1;
1153                    }
1154                }
1155            }
1156        }
1157
1158        Self::ok_json(result)
1159    }
1160
1161    // ── Batch Operations ────────────────────────────────────────────────
1162
1163    fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1164        let body = Self::parse_body(req)?;
1165
1166        validate_optional_enum_value(
1167            "returnConsumedCapacity",
1168            &body["ReturnConsumedCapacity"],
1169            &["INDEXES", "TOTAL", "NONE"],
1170        )?;
1171
1172        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1173
1174        let request_items = body["RequestItems"]
1175            .as_object()
1176            .ok_or_else(|| {
1177                AwsServiceError::aws_error(
1178                    StatusCode::BAD_REQUEST,
1179                    "ValidationException",
1180                    "RequestItems is required",
1181                )
1182            })?
1183            .clone();
1184
1185        let state = self.state.read();
1186        let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
1187        let mut consumed_capacity: Vec<Value> = Vec::new();
1188
1189        for (table_name, params) in &request_items {
1190            let table = get_table(&state.tables, table_name)?;
1191            let keys = params["Keys"].as_array().ok_or_else(|| {
1192                AwsServiceError::aws_error(
1193                    StatusCode::BAD_REQUEST,
1194                    "ValidationException",
1195                    "Keys is required",
1196                )
1197            })?;
1198
1199            let mut items = Vec::new();
1200            for key_val in keys {
1201                let key: HashMap<String, AttributeValue> =
1202                    serde_json::from_value(key_val.clone()).unwrap_or_default();
1203                if let Some(idx) = table.find_item_index(&key) {
1204                    items.push(json!(table.items[idx]));
1205                }
1206            }
1207            responses.insert(table_name.clone(), items);
1208
1209            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1210                consumed_capacity.push(json!({
1211                    "TableName": table_name,
1212                    "CapacityUnits": 1.0,
1213                }));
1214            }
1215        }
1216
1217        let mut result = json!({
1218            "Responses": responses,
1219            "UnprocessedKeys": {},
1220        });
1221
1222        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1223            result["ConsumedCapacity"] = json!(consumed_capacity);
1224        }
1225
1226        Self::ok_json(result)
1227    }
1228
1229    fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1230        let body = Self::parse_body(req)?;
1231
1232        validate_optional_enum_value(
1233            "returnConsumedCapacity",
1234            &body["ReturnConsumedCapacity"],
1235            &["INDEXES", "TOTAL", "NONE"],
1236        )?;
1237        validate_optional_enum_value(
1238            "returnItemCollectionMetrics",
1239            &body["ReturnItemCollectionMetrics"],
1240            &["SIZE", "NONE"],
1241        )?;
1242
1243        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1244        let return_icm = body["ReturnItemCollectionMetrics"]
1245            .as_str()
1246            .unwrap_or("NONE");
1247
1248        let request_items = body["RequestItems"]
1249            .as_object()
1250            .ok_or_else(|| {
1251                AwsServiceError::aws_error(
1252                    StatusCode::BAD_REQUEST,
1253                    "ValidationException",
1254                    "RequestItems is required",
1255                )
1256            })?
1257            .clone();
1258
1259        let mut state = self.state.write();
1260        let mut consumed_capacity: Vec<Value> = Vec::new();
1261        let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
1262
1263        for (table_name, requests) in &request_items {
1264            let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
1265                AwsServiceError::aws_error(
1266                    StatusCode::BAD_REQUEST,
1267                    "ResourceNotFoundException",
1268                    format!("Requested resource not found: Table: {table_name} not found"),
1269                )
1270            })?;
1271
1272            let reqs = requests.as_array().ok_or_else(|| {
1273                AwsServiceError::aws_error(
1274                    StatusCode::BAD_REQUEST,
1275                    "ValidationException",
1276                    "Request list must be an array",
1277                )
1278            })?;
1279
1280            for request in reqs {
1281                if let Some(put_req) = request.get("PutRequest") {
1282                    let item: HashMap<String, AttributeValue> =
1283                        serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
1284                    let key = extract_key(table, &item);
1285                    if let Some(idx) = table.find_item_index(&key) {
1286                        table.items[idx] = item;
1287                    } else {
1288                        table.items.push(item);
1289                    }
1290                } else if let Some(del_req) = request.get("DeleteRequest") {
1291                    let key: HashMap<String, AttributeValue> =
1292                        serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
1293                    if let Some(idx) = table.find_item_index(&key) {
1294                        table.items.remove(idx);
1295                    }
1296                }
1297            }
1298
1299            table.recalculate_stats();
1300
1301            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1302                consumed_capacity.push(json!({
1303                    "TableName": table_name,
1304                    "CapacityUnits": 1.0,
1305                }));
1306            }
1307
1308            if return_icm == "SIZE" {
1309                item_collection_metrics.insert(table_name.clone(), vec![]);
1310            }
1311        }
1312
1313        let mut result = json!({
1314            "UnprocessedItems": {},
1315        });
1316
1317        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1318            result["ConsumedCapacity"] = json!(consumed_capacity);
1319        }
1320
1321        if return_icm == "SIZE" {
1322            result["ItemCollectionMetrics"] = json!(item_collection_metrics);
1323        }
1324
1325        Self::ok_json(result)
1326    }
1327
1328    // ── Tags ────────────────────────────────────────────────────────────
1329
1330    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1331        let body = Self::parse_body(req)?;
1332        let resource_arn = require_str(&body, "ResourceArn")?;
1333        validate_required("Tags", &body["Tags"])?;
1334
1335        let mut state = self.state.write();
1336        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1337
1338        fakecloud_core::tags::apply_tags(&mut table.tags, &body, "Tags", "Key", "Value").map_err(
1339            |f| {
1340                AwsServiceError::aws_error(
1341                    StatusCode::BAD_REQUEST,
1342                    "ValidationException",
1343                    format!("{f} must be a list"),
1344                )
1345            },
1346        )?;
1347
1348        Self::ok_json(json!({}))
1349    }
1350
1351    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1352        let body = Self::parse_body(req)?;
1353        let resource_arn = require_str(&body, "ResourceArn")?;
1354        validate_required("TagKeys", &body["TagKeys"])?;
1355
1356        let mut state = self.state.write();
1357        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1358
1359        fakecloud_core::tags::remove_tags(&mut table.tags, &body, "TagKeys").map_err(|f| {
1360            AwsServiceError::aws_error(
1361                StatusCode::BAD_REQUEST,
1362                "ValidationException",
1363                format!("{f} must be a list"),
1364            )
1365        })?;
1366
1367        Self::ok_json(json!({}))
1368    }
1369
1370    fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1371        let body = Self::parse_body(req)?;
1372        let resource_arn = require_str(&body, "ResourceArn")?;
1373
1374        let state = self.state.read();
1375        let table = find_table_by_arn(&state.tables, resource_arn)?;
1376
1377        let tags = fakecloud_core::tags::tags_to_json(&table.tags, "Key", "Value");
1378
1379        Self::ok_json(json!({ "Tags": tags }))
1380    }
1381
1382    // ── Transactions ────────────────────────────────────────────────────
1383
1384    fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1385        let body = Self::parse_body(req)?;
1386        validate_optional_enum_value(
1387            "returnConsumedCapacity",
1388            &body["ReturnConsumedCapacity"],
1389            &["INDEXES", "TOTAL", "NONE"],
1390        )?;
1391        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1392            AwsServiceError::aws_error(
1393                StatusCode::BAD_REQUEST,
1394                "ValidationException",
1395                "TransactItems is required",
1396            )
1397        })?;
1398
1399        let state = self.state.read();
1400        let mut responses: Vec<Value> = Vec::new();
1401
1402        for ti in transact_items {
1403            let get = &ti["Get"];
1404            let table_name = get["TableName"].as_str().ok_or_else(|| {
1405                AwsServiceError::aws_error(
1406                    StatusCode::BAD_REQUEST,
1407                    "ValidationException",
1408                    "TableName is required in Get",
1409                )
1410            })?;
1411            let key: HashMap<String, AttributeValue> =
1412                serde_json::from_value(get["Key"].clone()).unwrap_or_default();
1413
1414            let table = get_table(&state.tables, table_name)?;
1415            match table.find_item_index(&key) {
1416                Some(idx) => {
1417                    responses.push(json!({ "Item": table.items[idx] }));
1418                }
1419                None => {
1420                    responses.push(json!({}));
1421                }
1422            }
1423        }
1424
1425        Self::ok_json(json!({ "Responses": responses }))
1426    }
1427
1428    fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1429        let body = Self::parse_body(req)?;
1430        validate_optional_string_length(
1431            "clientRequestToken",
1432            body["ClientRequestToken"].as_str(),
1433            1,
1434            36,
1435        )?;
1436        validate_optional_enum_value(
1437            "returnConsumedCapacity",
1438            &body["ReturnConsumedCapacity"],
1439            &["INDEXES", "TOTAL", "NONE"],
1440        )?;
1441        validate_optional_enum_value(
1442            "returnItemCollectionMetrics",
1443            &body["ReturnItemCollectionMetrics"],
1444            &["SIZE", "NONE"],
1445        )?;
1446        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1447            AwsServiceError::aws_error(
1448                StatusCode::BAD_REQUEST,
1449                "ValidationException",
1450                "TransactItems is required",
1451            )
1452        })?;
1453
1454        let mut state = self.state.write();
1455
1456        // First pass: validate all conditions
1457        let mut cancellation_reasons: Vec<Value> = Vec::new();
1458        let mut any_failed = false;
1459
1460        for ti in transact_items {
1461            if let Some(put) = ti.get("Put") {
1462                let table_name = put["TableName"].as_str().unwrap_or_default();
1463                let item: HashMap<String, AttributeValue> =
1464                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1465                let condition = put["ConditionExpression"].as_str();
1466
1467                if let Some(cond) = condition {
1468                    let table = get_table(&state.tables, table_name)?;
1469                    let expr_attr_names = parse_expression_attribute_names(put);
1470                    let expr_attr_values = parse_expression_attribute_values(put);
1471                    let key = extract_key(table, &item);
1472                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1473                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1474                        .is_err()
1475                    {
1476                        cancellation_reasons.push(json!({
1477                            "Code": "ConditionalCheckFailed",
1478                            "Message": "The conditional request failed"
1479                        }));
1480                        any_failed = true;
1481                        continue;
1482                    }
1483                }
1484                cancellation_reasons.push(json!({ "Code": "None" }));
1485            } else if let Some(delete) = ti.get("Delete") {
1486                let table_name = delete["TableName"].as_str().unwrap_or_default();
1487                let key: HashMap<String, AttributeValue> =
1488                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1489                let condition = delete["ConditionExpression"].as_str();
1490
1491                if let Some(cond) = condition {
1492                    let table = get_table(&state.tables, table_name)?;
1493                    let expr_attr_names = parse_expression_attribute_names(delete);
1494                    let expr_attr_values = parse_expression_attribute_values(delete);
1495                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1496                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1497                        .is_err()
1498                    {
1499                        cancellation_reasons.push(json!({
1500                            "Code": "ConditionalCheckFailed",
1501                            "Message": "The conditional request failed"
1502                        }));
1503                        any_failed = true;
1504                        continue;
1505                    }
1506                }
1507                cancellation_reasons.push(json!({ "Code": "None" }));
1508            } else if let Some(update) = ti.get("Update") {
1509                let table_name = update["TableName"].as_str().unwrap_or_default();
1510                let key: HashMap<String, AttributeValue> =
1511                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1512                let condition = update["ConditionExpression"].as_str();
1513
1514                if let Some(cond) = condition {
1515                    let table = get_table(&state.tables, table_name)?;
1516                    let expr_attr_names = parse_expression_attribute_names(update);
1517                    let expr_attr_values = parse_expression_attribute_values(update);
1518                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1519                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1520                        .is_err()
1521                    {
1522                        cancellation_reasons.push(json!({
1523                            "Code": "ConditionalCheckFailed",
1524                            "Message": "The conditional request failed"
1525                        }));
1526                        any_failed = true;
1527                        continue;
1528                    }
1529                }
1530                cancellation_reasons.push(json!({ "Code": "None" }));
1531            } else if let Some(check) = ti.get("ConditionCheck") {
1532                let table_name = check["TableName"].as_str().unwrap_or_default();
1533                let key: HashMap<String, AttributeValue> =
1534                    serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1535                let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1536
1537                let table = get_table(&state.tables, table_name)?;
1538                let expr_attr_names = parse_expression_attribute_names(check);
1539                let expr_attr_values = parse_expression_attribute_values(check);
1540                let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1541                if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1542                {
1543                    cancellation_reasons.push(json!({
1544                        "Code": "ConditionalCheckFailed",
1545                        "Message": "The conditional request failed"
1546                    }));
1547                    any_failed = true;
1548                    continue;
1549                }
1550                cancellation_reasons.push(json!({ "Code": "None" }));
1551            } else {
1552                cancellation_reasons.push(json!({ "Code": "None" }));
1553            }
1554        }
1555
1556        if any_failed {
1557            let error_body = json!({
1558                "__type": "TransactionCanceledException",
1559                "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1560                "CancellationReasons": cancellation_reasons
1561            });
1562            return Ok(AwsResponse::json(
1563                StatusCode::BAD_REQUEST,
1564                serde_json::to_vec(&error_body).unwrap(),
1565            ));
1566        }
1567
1568        // Second pass: apply all writes
1569        for ti in transact_items {
1570            if let Some(put) = ti.get("Put") {
1571                let table_name = put["TableName"].as_str().unwrap_or_default();
1572                let item: HashMap<String, AttributeValue> =
1573                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1574                let table = get_table_mut(&mut state.tables, table_name)?;
1575                let key = extract_key(table, &item);
1576                if let Some(idx) = table.find_item_index(&key) {
1577                    table.items[idx] = item;
1578                } else {
1579                    table.items.push(item);
1580                }
1581                table.recalculate_stats();
1582            } else if let Some(delete) = ti.get("Delete") {
1583                let table_name = delete["TableName"].as_str().unwrap_or_default();
1584                let key: HashMap<String, AttributeValue> =
1585                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1586                let table = get_table_mut(&mut state.tables, table_name)?;
1587                if let Some(idx) = table.find_item_index(&key) {
1588                    table.items.remove(idx);
1589                }
1590                table.recalculate_stats();
1591            } else if let Some(update) = ti.get("Update") {
1592                let table_name = update["TableName"].as_str().unwrap_or_default();
1593                let key: HashMap<String, AttributeValue> =
1594                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1595                let update_expression = update["UpdateExpression"].as_str();
1596                let expr_attr_names = parse_expression_attribute_names(update);
1597                let expr_attr_values = parse_expression_attribute_values(update);
1598
1599                let table = get_table_mut(&mut state.tables, table_name)?;
1600                let idx = match table.find_item_index(&key) {
1601                    Some(i) => i,
1602                    None => {
1603                        let mut new_item = HashMap::new();
1604                        for (k, v) in &key {
1605                            new_item.insert(k.clone(), v.clone());
1606                        }
1607                        table.items.push(new_item);
1608                        table.items.len() - 1
1609                    }
1610                };
1611
1612                if let Some(expr) = update_expression {
1613                    apply_update_expression(
1614                        &mut table.items[idx],
1615                        expr,
1616                        &expr_attr_names,
1617                        &expr_attr_values,
1618                    )?;
1619                }
1620                table.recalculate_stats();
1621            }
1622            // ConditionCheck: no write needed
1623        }
1624
1625        Self::ok_json(json!({}))
1626    }
1627
1628    fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1629        let body = Self::parse_body(req)?;
1630        let statement = require_str(&body, "Statement")?;
1631        let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1632
1633        execute_partiql_statement(&self.state, statement, &parameters)
1634    }
1635
1636    fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1637        let body = Self::parse_body(req)?;
1638        validate_optional_enum_value(
1639            "returnConsumedCapacity",
1640            &body["ReturnConsumedCapacity"],
1641            &["INDEXES", "TOTAL", "NONE"],
1642        )?;
1643        let statements = body["Statements"].as_array().ok_or_else(|| {
1644            AwsServiceError::aws_error(
1645                StatusCode::BAD_REQUEST,
1646                "ValidationException",
1647                "Statements is required",
1648            )
1649        })?;
1650
1651        let mut responses: Vec<Value> = Vec::new();
1652        for stmt_obj in statements {
1653            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1654            let parameters = stmt_obj["Parameters"]
1655                .as_array()
1656                .cloned()
1657                .unwrap_or_default();
1658
1659            match execute_partiql_statement(&self.state, statement, &parameters) {
1660                Ok(resp) => {
1661                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1662                    responses.push(resp_body);
1663                }
1664                Err(e) => {
1665                    responses.push(json!({
1666                        "Error": {
1667                            "Code": "ValidationException",
1668                            "Message": e.to_string()
1669                        }
1670                    }));
1671                }
1672            }
1673        }
1674
1675        Self::ok_json(json!({ "Responses": responses }))
1676    }
1677
1678    fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1679        let body = Self::parse_body(req)?;
1680        validate_optional_string_length(
1681            "clientRequestToken",
1682            body["ClientRequestToken"].as_str(),
1683            1,
1684            36,
1685        )?;
1686        validate_optional_enum_value(
1687            "returnConsumedCapacity",
1688            &body["ReturnConsumedCapacity"],
1689            &["INDEXES", "TOTAL", "NONE"],
1690        )?;
1691        let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1692            AwsServiceError::aws_error(
1693                StatusCode::BAD_REQUEST,
1694                "ValidationException",
1695                "TransactStatements is required",
1696            )
1697        })?;
1698
1699        // Collect all results; if any fail, return TransactionCanceledException
1700        let mut results: Vec<Result<Value, String>> = Vec::new();
1701        for stmt_obj in transact_statements {
1702            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1703            let parameters = stmt_obj["Parameters"]
1704                .as_array()
1705                .cloned()
1706                .unwrap_or_default();
1707
1708            match execute_partiql_statement(&self.state, statement, &parameters) {
1709                Ok(resp) => {
1710                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1711                    results.push(Ok(resp_body));
1712                }
1713                Err(e) => {
1714                    results.push(Err(e.to_string()));
1715                }
1716            }
1717        }
1718
1719        let any_failed = results.iter().any(|r| r.is_err());
1720        if any_failed {
1721            let reasons: Vec<Value> = results
1722                .iter()
1723                .map(|r| match r {
1724                    Ok(_) => json!({ "Code": "None" }),
1725                    Err(msg) => json!({
1726                        "Code": "ValidationException",
1727                        "Message": msg
1728                    }),
1729                })
1730                .collect();
1731            let error_body = json!({
1732                "__type": "TransactionCanceledException",
1733                "message": "Transaction cancelled due to validation errors",
1734                "CancellationReasons": reasons
1735            });
1736            return Ok(AwsResponse::json(
1737                StatusCode::BAD_REQUEST,
1738                serde_json::to_vec(&error_body).unwrap(),
1739            ));
1740        }
1741
1742        let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1743        Self::ok_json(json!({ "Responses": responses }))
1744    }
1745
1746    // ── TTL ─────────────────────────────────────────────────────────────
1747
1748    fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1749        let body = Self::parse_body(req)?;
1750        let table_name = require_str(&body, "TableName")?;
1751        let spec = &body["TimeToLiveSpecification"];
1752        let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1753            AwsServiceError::aws_error(
1754                StatusCode::BAD_REQUEST,
1755                "ValidationException",
1756                "TimeToLiveSpecification.AttributeName is required",
1757            )
1758        })?;
1759        let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1760            AwsServiceError::aws_error(
1761                StatusCode::BAD_REQUEST,
1762                "ValidationException",
1763                "TimeToLiveSpecification.Enabled is required",
1764            )
1765        })?;
1766
1767        let mut state = self.state.write();
1768        let table = get_table_mut(&mut state.tables, table_name)?;
1769
1770        if enabled {
1771            table.ttl_attribute = Some(attr_name.to_string());
1772            table.ttl_enabled = true;
1773        } else {
1774            table.ttl_enabled = false;
1775        }
1776
1777        Self::ok_json(json!({
1778            "TimeToLiveSpecification": {
1779                "AttributeName": attr_name,
1780                "Enabled": enabled
1781            }
1782        }))
1783    }
1784
1785    fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1786        let body = Self::parse_body(req)?;
1787        let table_name = require_str(&body, "TableName")?;
1788
1789        let state = self.state.read();
1790        let table = get_table(&state.tables, table_name)?;
1791
1792        let status = if table.ttl_enabled {
1793            "ENABLED"
1794        } else {
1795            "DISABLED"
1796        };
1797
1798        let mut desc = json!({
1799            "TimeToLiveDescription": {
1800                "TimeToLiveStatus": status
1801            }
1802        });
1803
1804        if let Some(ref attr) = table.ttl_attribute {
1805            desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1806        }
1807
1808        Self::ok_json(desc)
1809    }
1810
1811    // ── Resource Policies ───────────────────────────────────────────────
1812
1813    fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1814        let body = Self::parse_body(req)?;
1815        let resource_arn = require_str(&body, "ResourceArn")?;
1816        let policy = require_str(&body, "Policy")?;
1817
1818        let mut state = self.state.write();
1819        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1820        table.resource_policy = Some(policy.to_string());
1821
1822        let revision_id = uuid::Uuid::new_v4().to_string();
1823        Self::ok_json(json!({ "RevisionId": revision_id }))
1824    }
1825
1826    fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1827        let body = Self::parse_body(req)?;
1828        let resource_arn = require_str(&body, "ResourceArn")?;
1829
1830        let state = self.state.read();
1831        let table = find_table_by_arn(&state.tables, resource_arn)?;
1832
1833        match &table.resource_policy {
1834            Some(policy) => {
1835                let revision_id = uuid::Uuid::new_v4().to_string();
1836                Self::ok_json(json!({
1837                    "Policy": policy,
1838                    "RevisionId": revision_id
1839                }))
1840            }
1841            None => Self::ok_json(json!({ "Policy": null })),
1842        }
1843    }
1844
1845    fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1846        let body = Self::parse_body(req)?;
1847        let resource_arn = require_str(&body, "ResourceArn")?;
1848
1849        let mut state = self.state.write();
1850        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1851        table.resource_policy = None;
1852
1853        Self::ok_json(json!({}))
1854    }
1855
1856    // ── Stubs ──────────────────────────────────────────────────────────
1857
1858    fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1859        Self::ok_json(json!({
1860            "Endpoints": [{
1861                "Address": "dynamodb.us-east-1.amazonaws.com",
1862                "CachePeriodInMinutes": 1440
1863            }]
1864        }))
1865    }
1866
1867    fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1868        Self::ok_json(json!({
1869            "AccountMaxReadCapacityUnits": 80000,
1870            "AccountMaxWriteCapacityUnits": 80000,
1871            "TableMaxReadCapacityUnits": 40000,
1872            "TableMaxWriteCapacityUnits": 40000
1873        }))
1874    }
1875
1876    // ── Backups ────────────────────────────────────────────────────────
1877
1878    fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1879        let body = Self::parse_body(req)?;
1880        let table_name = require_str(&body, "TableName")?;
1881        let backup_name = require_str(&body, "BackupName")?;
1882
1883        let mut state = self.state.write();
1884        let table = get_table(&state.tables, table_name)?;
1885
1886        let backup_arn = format!(
1887            "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1888            state.region,
1889            state.account_id,
1890            table_name,
1891            Utc::now().format("%Y%m%d%H%M%S")
1892        );
1893        let now = Utc::now();
1894
1895        let backup = BackupDescription {
1896            backup_arn: backup_arn.clone(),
1897            backup_name: backup_name.to_string(),
1898            table_name: table_name.to_string(),
1899            table_arn: table.arn.clone(),
1900            backup_status: "AVAILABLE".to_string(),
1901            backup_type: "USER".to_string(),
1902            backup_creation_date: now,
1903            key_schema: table.key_schema.clone(),
1904            attribute_definitions: table.attribute_definitions.clone(),
1905            provisioned_throughput: table.provisioned_throughput.clone(),
1906            billing_mode: table.billing_mode.clone(),
1907            item_count: table.item_count,
1908            size_bytes: table.size_bytes,
1909            items: table.items.clone(),
1910        };
1911
1912        state.backups.insert(backup_arn.clone(), backup);
1913
1914        Self::ok_json(json!({
1915            "BackupDetails": {
1916                "BackupArn": backup_arn,
1917                "BackupName": backup_name,
1918                "BackupStatus": "AVAILABLE",
1919                "BackupType": "USER",
1920                "BackupCreationDateTime": now.timestamp() as f64,
1921                "BackupSizeBytes": 0
1922            }
1923        }))
1924    }
1925
1926    fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1927        let body = Self::parse_body(req)?;
1928        let backup_arn = require_str(&body, "BackupArn")?;
1929
1930        let mut state = self.state.write();
1931        let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1932            AwsServiceError::aws_error(
1933                StatusCode::BAD_REQUEST,
1934                "BackupNotFoundException",
1935                format!("Backup not found: {backup_arn}"),
1936            )
1937        })?;
1938
1939        Self::ok_json(json!({
1940            "BackupDescription": {
1941                "BackupDetails": {
1942                    "BackupArn": backup.backup_arn,
1943                    "BackupName": backup.backup_name,
1944                    "BackupStatus": "DELETED",
1945                    "BackupType": backup.backup_type,
1946                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1947                    "BackupSizeBytes": backup.size_bytes
1948                },
1949                "SourceTableDetails": {
1950                    "TableName": backup.table_name,
1951                    "TableArn": backup.table_arn,
1952                    "TableId": uuid::Uuid::new_v4().to_string(),
1953                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1954                        "AttributeName": ks.attribute_name,
1955                        "KeyType": ks.key_type
1956                    })).collect::<Vec<_>>(),
1957                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1958                    "ProvisionedThroughput": {
1959                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1960                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1961                    },
1962                    "ItemCount": backup.item_count,
1963                    "BillingMode": backup.billing_mode,
1964                    "TableSizeBytes": backup.size_bytes
1965                },
1966                "SourceTableFeatureDetails": {}
1967            }
1968        }))
1969    }
1970
1971    fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1972        let body = Self::parse_body(req)?;
1973        let backup_arn = require_str(&body, "BackupArn")?;
1974
1975        let state = self.state.read();
1976        let backup = state.backups.get(backup_arn).ok_or_else(|| {
1977            AwsServiceError::aws_error(
1978                StatusCode::BAD_REQUEST,
1979                "BackupNotFoundException",
1980                format!("Backup not found: {backup_arn}"),
1981            )
1982        })?;
1983
1984        Self::ok_json(json!({
1985            "BackupDescription": {
1986                "BackupDetails": {
1987                    "BackupArn": backup.backup_arn,
1988                    "BackupName": backup.backup_name,
1989                    "BackupStatus": backup.backup_status,
1990                    "BackupType": backup.backup_type,
1991                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1992                    "BackupSizeBytes": backup.size_bytes
1993                },
1994                "SourceTableDetails": {
1995                    "TableName": backup.table_name,
1996                    "TableArn": backup.table_arn,
1997                    "TableId": uuid::Uuid::new_v4().to_string(),
1998                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1999                        "AttributeName": ks.attribute_name,
2000                        "KeyType": ks.key_type
2001                    })).collect::<Vec<_>>(),
2002                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
2003                    "ProvisionedThroughput": {
2004                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
2005                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
2006                    },
2007                    "ItemCount": backup.item_count,
2008                    "BillingMode": backup.billing_mode,
2009                    "TableSizeBytes": backup.size_bytes
2010                },
2011                "SourceTableFeatureDetails": {}
2012            }
2013        }))
2014    }
2015
2016    fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2017        let body = Self::parse_body(req)?;
2018        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2019        validate_optional_string_length(
2020            "exclusiveStartBackupArn",
2021            body["ExclusiveStartBackupArn"].as_str(),
2022            37,
2023            1024,
2024        )?;
2025        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2026        validate_optional_enum_value(
2027            "backupType",
2028            &body["BackupType"],
2029            &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
2030        )?;
2031        let table_name = body["TableName"].as_str();
2032
2033        let state = self.state.read();
2034        let summaries: Vec<Value> = state
2035            .backups
2036            .values()
2037            .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
2038            .map(|b| {
2039                json!({
2040                    "TableName": b.table_name,
2041                    "TableArn": b.table_arn,
2042                    "BackupArn": b.backup_arn,
2043                    "BackupName": b.backup_name,
2044                    "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
2045                    "BackupStatus": b.backup_status,
2046                    "BackupType": b.backup_type,
2047                    "BackupSizeBytes": b.size_bytes
2048                })
2049            })
2050            .collect();
2051
2052        Self::ok_json(json!({
2053            "BackupSummaries": summaries
2054        }))
2055    }
2056
2057    fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2058        let body = Self::parse_body(req)?;
2059        let backup_arn = require_str(&body, "BackupArn")?;
2060        let target_table_name = require_str(&body, "TargetTableName")?;
2061
2062        let mut state = self.state.write();
2063        let backup = state.backups.get(backup_arn).ok_or_else(|| {
2064            AwsServiceError::aws_error(
2065                StatusCode::BAD_REQUEST,
2066                "BackupNotFoundException",
2067                format!("Backup not found: {backup_arn}"),
2068            )
2069        })?;
2070
2071        if state.tables.contains_key(target_table_name) {
2072            return Err(AwsServiceError::aws_error(
2073                StatusCode::BAD_REQUEST,
2074                "TableAlreadyExistsException",
2075                format!("Table already exists: {target_table_name}"),
2076            ));
2077        }
2078
2079        let now = Utc::now();
2080        let arn = format!(
2081            "arn:aws:dynamodb:{}:{}:table/{}",
2082            state.region, state.account_id, target_table_name
2083        );
2084
2085        let restored_items = backup.items.clone();
2086        let mut table = DynamoTable {
2087            name: target_table_name.to_string(),
2088            arn: arn.clone(),
2089            key_schema: backup.key_schema.clone(),
2090            attribute_definitions: backup.attribute_definitions.clone(),
2091            provisioned_throughput: backup.provisioned_throughput.clone(),
2092            items: restored_items,
2093            gsi: Vec::new(),
2094            lsi: Vec::new(),
2095            tags: HashMap::new(),
2096            created_at: now,
2097            status: "ACTIVE".to_string(),
2098            item_count: 0,
2099            size_bytes: 0,
2100            billing_mode: backup.billing_mode.clone(),
2101            ttl_attribute: None,
2102            ttl_enabled: false,
2103            resource_policy: None,
2104            pitr_enabled: false,
2105            kinesis_destinations: Vec::new(),
2106            contributor_insights_status: "DISABLED".to_string(),
2107            contributor_insights_counters: HashMap::new(),
2108            stream_enabled: false,
2109            stream_view_type: None,
2110            stream_arn: None,
2111            stream_records: Arc::new(RwLock::new(Vec::new())),
2112            sse_type: None,
2113            sse_kms_key_arn: None,
2114        };
2115        table.recalculate_stats();
2116
2117        let desc = build_table_description(&table);
2118        state.tables.insert(target_table_name.to_string(), table);
2119
2120        Self::ok_json(json!({
2121            "TableDescription": desc
2122        }))
2123    }
2124
2125    fn restore_table_to_point_in_time(
2126        &self,
2127        req: &AwsRequest,
2128    ) -> Result<AwsResponse, AwsServiceError> {
2129        let body = Self::parse_body(req)?;
2130        let target_table_name = require_str(&body, "TargetTableName")?;
2131        let source_table_name = body["SourceTableName"].as_str();
2132        let source_table_arn = body["SourceTableArn"].as_str();
2133
2134        let mut state = self.state.write();
2135
2136        // Resolve source table
2137        let source = if let Some(name) = source_table_name {
2138            get_table(&state.tables, name)?.clone()
2139        } else if let Some(arn) = source_table_arn {
2140            find_table_by_arn(&state.tables, arn)?.clone()
2141        } else {
2142            return Err(AwsServiceError::aws_error(
2143                StatusCode::BAD_REQUEST,
2144                "ValidationException",
2145                "SourceTableName or SourceTableArn is required",
2146            ));
2147        };
2148
2149        if state.tables.contains_key(target_table_name) {
2150            return Err(AwsServiceError::aws_error(
2151                StatusCode::BAD_REQUEST,
2152                "TableAlreadyExistsException",
2153                format!("Table already exists: {target_table_name}"),
2154            ));
2155        }
2156
2157        let now = Utc::now();
2158        let arn = format!(
2159            "arn:aws:dynamodb:{}:{}:table/{}",
2160            state.region, state.account_id, target_table_name
2161        );
2162
2163        let mut table = DynamoTable {
2164            name: target_table_name.to_string(),
2165            arn: arn.clone(),
2166            key_schema: source.key_schema.clone(),
2167            attribute_definitions: source.attribute_definitions.clone(),
2168            provisioned_throughput: source.provisioned_throughput.clone(),
2169            items: source.items.clone(),
2170            gsi: Vec::new(),
2171            lsi: Vec::new(),
2172            tags: HashMap::new(),
2173            created_at: now,
2174            status: "ACTIVE".to_string(),
2175            item_count: 0,
2176            size_bytes: 0,
2177            billing_mode: source.billing_mode.clone(),
2178            ttl_attribute: None,
2179            ttl_enabled: false,
2180            resource_policy: None,
2181            pitr_enabled: false,
2182            kinesis_destinations: Vec::new(),
2183            contributor_insights_status: "DISABLED".to_string(),
2184            contributor_insights_counters: HashMap::new(),
2185            stream_enabled: false,
2186            stream_view_type: None,
2187            stream_arn: None,
2188            stream_records: Arc::new(RwLock::new(Vec::new())),
2189            sse_type: None,
2190            sse_kms_key_arn: None,
2191        };
2192        table.recalculate_stats();
2193
2194        let desc = build_table_description(&table);
2195        state.tables.insert(target_table_name.to_string(), table);
2196
2197        Self::ok_json(json!({
2198            "TableDescription": desc
2199        }))
2200    }
2201
2202    fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2203        let body = Self::parse_body(req)?;
2204        let table_name = require_str(&body, "TableName")?;
2205
2206        let pitr_spec = body["PointInTimeRecoverySpecification"]
2207            .as_object()
2208            .ok_or_else(|| {
2209                AwsServiceError::aws_error(
2210                    StatusCode::BAD_REQUEST,
2211                    "ValidationException",
2212                    "PointInTimeRecoverySpecification is required",
2213                )
2214            })?;
2215        let enabled = pitr_spec
2216            .get("PointInTimeRecoveryEnabled")
2217            .and_then(|v| v.as_bool())
2218            .unwrap_or(false);
2219
2220        let mut state = self.state.write();
2221        let table = get_table_mut(&mut state.tables, table_name)?;
2222        table.pitr_enabled = enabled;
2223
2224        let status = if enabled { "ENABLED" } else { "DISABLED" };
2225        Self::ok_json(json!({
2226            "ContinuousBackupsDescription": {
2227                "ContinuousBackupsStatus": status,
2228                "PointInTimeRecoveryDescription": {
2229                    "PointInTimeRecoveryStatus": status,
2230                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2231                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
2232                }
2233            }
2234        }))
2235    }
2236
2237    fn describe_continuous_backups(
2238        &self,
2239        req: &AwsRequest,
2240    ) -> Result<AwsResponse, AwsServiceError> {
2241        let body = Self::parse_body(req)?;
2242        let table_name = require_str(&body, "TableName")?;
2243
2244        let state = self.state.read();
2245        let table = get_table(&state.tables, table_name)?;
2246
2247        let status = if table.pitr_enabled {
2248            "ENABLED"
2249        } else {
2250            "DISABLED"
2251        };
2252        Self::ok_json(json!({
2253            "ContinuousBackupsDescription": {
2254                "ContinuousBackupsStatus": status,
2255                "PointInTimeRecoveryDescription": {
2256                    "PointInTimeRecoveryStatus": status,
2257                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2258                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
2259                }
2260            }
2261        }))
2262    }
2263
2264    // ── Global Tables ──────────────────────────────────────────────────
2265
2266    fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2267        let body = Self::parse_body(req)?;
2268        let global_table_name = require_str(&body, "GlobalTableName")?;
2269        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2270
2271        let replication_group = body["ReplicationGroup"]
2272            .as_array()
2273            .ok_or_else(|| {
2274                AwsServiceError::aws_error(
2275                    StatusCode::BAD_REQUEST,
2276                    "ValidationException",
2277                    "ReplicationGroup is required",
2278                )
2279            })?
2280            .iter()
2281            .filter_map(|r| {
2282                r["RegionName"].as_str().map(|rn| ReplicaDescription {
2283                    region_name: rn.to_string(),
2284                    replica_status: "ACTIVE".to_string(),
2285                })
2286            })
2287            .collect::<Vec<_>>();
2288
2289        let mut state = self.state.write();
2290
2291        if state.global_tables.contains_key(global_table_name) {
2292            return Err(AwsServiceError::aws_error(
2293                StatusCode::BAD_REQUEST,
2294                "GlobalTableAlreadyExistsException",
2295                format!("Global table already exists: {global_table_name}"),
2296            ));
2297        }
2298
2299        let arn = format!(
2300            "arn:aws:dynamodb::{}:global-table/{}",
2301            state.account_id, global_table_name
2302        );
2303        let now = Utc::now();
2304
2305        let gt = GlobalTableDescription {
2306            global_table_name: global_table_name.to_string(),
2307            global_table_arn: arn.clone(),
2308            global_table_status: "ACTIVE".to_string(),
2309            creation_date: now,
2310            replication_group: replication_group.clone(),
2311        };
2312
2313        state
2314            .global_tables
2315            .insert(global_table_name.to_string(), gt);
2316
2317        Self::ok_json(json!({
2318            "GlobalTableDescription": {
2319                "GlobalTableName": global_table_name,
2320                "GlobalTableArn": arn,
2321                "GlobalTableStatus": "ACTIVE",
2322                "CreationDateTime": now.timestamp() as f64,
2323                "ReplicationGroup": replication_group.iter().map(|r| json!({
2324                    "RegionName": r.region_name,
2325                    "ReplicaStatus": r.replica_status
2326                })).collect::<Vec<_>>()
2327            }
2328        }))
2329    }
2330
2331    fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2332        let body = Self::parse_body(req)?;
2333        let global_table_name = require_str(&body, "GlobalTableName")?;
2334        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2335
2336        let state = self.state.read();
2337        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2338            AwsServiceError::aws_error(
2339                StatusCode::BAD_REQUEST,
2340                "GlobalTableNotFoundException",
2341                format!("Global table not found: {global_table_name}"),
2342            )
2343        })?;
2344
2345        Self::ok_json(json!({
2346            "GlobalTableDescription": {
2347                "GlobalTableName": gt.global_table_name,
2348                "GlobalTableArn": gt.global_table_arn,
2349                "GlobalTableStatus": gt.global_table_status,
2350                "CreationDateTime": gt.creation_date.timestamp() as f64,
2351                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2352                    "RegionName": r.region_name,
2353                    "ReplicaStatus": r.replica_status
2354                })).collect::<Vec<_>>()
2355            }
2356        }))
2357    }
2358
2359    fn describe_global_table_settings(
2360        &self,
2361        req: &AwsRequest,
2362    ) -> Result<AwsResponse, AwsServiceError> {
2363        let body = Self::parse_body(req)?;
2364        let global_table_name = require_str(&body, "GlobalTableName")?;
2365        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2366
2367        let state = self.state.read();
2368        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2369            AwsServiceError::aws_error(
2370                StatusCode::BAD_REQUEST,
2371                "GlobalTableNotFoundException",
2372                format!("Global table not found: {global_table_name}"),
2373            )
2374        })?;
2375
2376        let replica_settings: Vec<Value> = gt
2377            .replication_group
2378            .iter()
2379            .map(|r| {
2380                json!({
2381                    "RegionName": r.region_name,
2382                    "ReplicaStatus": r.replica_status,
2383                    "ReplicaProvisionedReadCapacityUnits": 0,
2384                    "ReplicaProvisionedWriteCapacityUnits": 0
2385                })
2386            })
2387            .collect();
2388
2389        Self::ok_json(json!({
2390            "GlobalTableName": gt.global_table_name,
2391            "ReplicaSettings": replica_settings
2392        }))
2393    }
2394
2395    fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2396        let body = Self::parse_body(req)?;
2397        validate_optional_string_length(
2398            "exclusiveStartGlobalTableName",
2399            body["ExclusiveStartGlobalTableName"].as_str(),
2400            3,
2401            255,
2402        )?;
2403        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
2404        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2405
2406        let state = self.state.read();
2407        let tables: Vec<Value> = state
2408            .global_tables
2409            .values()
2410            .take(limit)
2411            .map(|gt| {
2412                json!({
2413                    "GlobalTableName": gt.global_table_name,
2414                    "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2415                        "RegionName": r.region_name
2416                    })).collect::<Vec<_>>()
2417                })
2418            })
2419            .collect();
2420
2421        Self::ok_json(json!({
2422            "GlobalTables": tables
2423        }))
2424    }
2425
2426    fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2427        let body = Self::parse_body(req)?;
2428        let global_table_name = require_str(&body, "GlobalTableName")?;
2429        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2430        validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
2431
2432        let mut state = self.state.write();
2433        let gt = state
2434            .global_tables
2435            .get_mut(global_table_name)
2436            .ok_or_else(|| {
2437                AwsServiceError::aws_error(
2438                    StatusCode::BAD_REQUEST,
2439                    "GlobalTableNotFoundException",
2440                    format!("Global table not found: {global_table_name}"),
2441                )
2442            })?;
2443
2444        if let Some(updates) = body["ReplicaUpdates"].as_array() {
2445            for update in updates {
2446                if let Some(create) = update["Create"].as_object() {
2447                    if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
2448                        gt.replication_group.push(ReplicaDescription {
2449                            region_name: region.to_string(),
2450                            replica_status: "ACTIVE".to_string(),
2451                        });
2452                    }
2453                }
2454                if let Some(delete) = update["Delete"].as_object() {
2455                    if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
2456                        gt.replication_group.retain(|r| r.region_name != region);
2457                    }
2458                }
2459            }
2460        }
2461
2462        Self::ok_json(json!({
2463            "GlobalTableDescription": {
2464                "GlobalTableName": gt.global_table_name,
2465                "GlobalTableArn": gt.global_table_arn,
2466                "GlobalTableStatus": gt.global_table_status,
2467                "CreationDateTime": gt.creation_date.timestamp() as f64,
2468                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2469                    "RegionName": r.region_name,
2470                    "ReplicaStatus": r.replica_status
2471                })).collect::<Vec<_>>()
2472            }
2473        }))
2474    }
2475
2476    fn update_global_table_settings(
2477        &self,
2478        req: &AwsRequest,
2479    ) -> Result<AwsResponse, AwsServiceError> {
2480        let body = Self::parse_body(req)?;
2481        let global_table_name = require_str(&body, "GlobalTableName")?;
2482        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2483        validate_optional_enum_value(
2484            "globalTableBillingMode",
2485            &body["GlobalTableBillingMode"],
2486            &["PROVISIONED", "PAY_PER_REQUEST"],
2487        )?;
2488        validate_optional_range_i64(
2489            "globalTableProvisionedWriteCapacityUnits",
2490            body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
2491            1,
2492            i64::MAX,
2493        )?;
2494
2495        let state = self.state.read();
2496        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2497            AwsServiceError::aws_error(
2498                StatusCode::BAD_REQUEST,
2499                "GlobalTableNotFoundException",
2500                format!("Global table not found: {global_table_name}"),
2501            )
2502        })?;
2503
2504        let replica_settings: Vec<Value> = gt
2505            .replication_group
2506            .iter()
2507            .map(|r| {
2508                json!({
2509                    "RegionName": r.region_name,
2510                    "ReplicaStatus": r.replica_status,
2511                    "ReplicaProvisionedReadCapacityUnits": 0,
2512                    "ReplicaProvisionedWriteCapacityUnits": 0
2513                })
2514            })
2515            .collect();
2516
2517        Self::ok_json(json!({
2518            "GlobalTableName": gt.global_table_name,
2519            "ReplicaSettings": replica_settings
2520        }))
2521    }
2522
2523    fn describe_table_replica_auto_scaling(
2524        &self,
2525        req: &AwsRequest,
2526    ) -> Result<AwsResponse, AwsServiceError> {
2527        let body = Self::parse_body(req)?;
2528        let table_name = require_str(&body, "TableName")?;
2529
2530        let state = self.state.read();
2531        let table = get_table(&state.tables, table_name)?;
2532
2533        Self::ok_json(json!({
2534            "TableAutoScalingDescription": {
2535                "TableName": table.name,
2536                "TableStatus": table.status,
2537                "Replicas": []
2538            }
2539        }))
2540    }
2541
2542    fn update_table_replica_auto_scaling(
2543        &self,
2544        req: &AwsRequest,
2545    ) -> Result<AwsResponse, AwsServiceError> {
2546        let body = Self::parse_body(req)?;
2547        let table_name = require_str(&body, "TableName")?;
2548
2549        let state = self.state.read();
2550        let table = get_table(&state.tables, table_name)?;
2551
2552        Self::ok_json(json!({
2553            "TableAutoScalingDescription": {
2554                "TableName": table.name,
2555                "TableStatus": table.status,
2556                "Replicas": []
2557            }
2558        }))
2559    }
2560
2561    // ── Kinesis Streaming ──────────────────────────────────────────────
2562
2563    fn enable_kinesis_streaming_destination(
2564        &self,
2565        req: &AwsRequest,
2566    ) -> Result<AwsResponse, AwsServiceError> {
2567        let body = Self::parse_body(req)?;
2568        let table_name = require_str(&body, "TableName")?;
2569        let stream_arn = require_str(&body, "StreamArn")?;
2570        let precision = body["EnableKinesisStreamingConfiguration"]
2571            ["ApproximateCreationDateTimePrecision"]
2572            .as_str()
2573            .unwrap_or("MILLISECOND");
2574
2575        let mut state = self.state.write();
2576        let table = get_table_mut(&mut state.tables, table_name)?;
2577
2578        table.kinesis_destinations.push(KinesisDestination {
2579            stream_arn: stream_arn.to_string(),
2580            destination_status: "ACTIVE".to_string(),
2581            approximate_creation_date_time_precision: precision.to_string(),
2582        });
2583
2584        Self::ok_json(json!({
2585            "TableName": table_name,
2586            "StreamArn": stream_arn,
2587            "DestinationStatus": "ACTIVE",
2588            "EnableKinesisStreamingConfiguration": {
2589                "ApproximateCreationDateTimePrecision": precision
2590            }
2591        }))
2592    }
2593
2594    fn disable_kinesis_streaming_destination(
2595        &self,
2596        req: &AwsRequest,
2597    ) -> Result<AwsResponse, AwsServiceError> {
2598        let body = Self::parse_body(req)?;
2599        let table_name = require_str(&body, "TableName")?;
2600        let stream_arn = require_str(&body, "StreamArn")?;
2601
2602        let mut state = self.state.write();
2603        let table = get_table_mut(&mut state.tables, table_name)?;
2604
2605        if let Some(dest) = table
2606            .kinesis_destinations
2607            .iter_mut()
2608            .find(|d| d.stream_arn == stream_arn)
2609        {
2610            dest.destination_status = "DISABLED".to_string();
2611        }
2612
2613        Self::ok_json(json!({
2614            "TableName": table_name,
2615            "StreamArn": stream_arn,
2616            "DestinationStatus": "DISABLED"
2617        }))
2618    }
2619
2620    fn describe_kinesis_streaming_destination(
2621        &self,
2622        req: &AwsRequest,
2623    ) -> Result<AwsResponse, AwsServiceError> {
2624        let body = Self::parse_body(req)?;
2625        let table_name = require_str(&body, "TableName")?;
2626
2627        let state = self.state.read();
2628        let table = get_table(&state.tables, table_name)?;
2629
2630        let destinations: Vec<Value> = table
2631            .kinesis_destinations
2632            .iter()
2633            .map(|d| {
2634                json!({
2635                    "StreamArn": d.stream_arn,
2636                    "DestinationStatus": d.destination_status,
2637                    "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2638                })
2639            })
2640            .collect();
2641
2642        Self::ok_json(json!({
2643            "TableName": table_name,
2644            "KinesisDataStreamDestinations": destinations
2645        }))
2646    }
2647
2648    fn update_kinesis_streaming_destination(
2649        &self,
2650        req: &AwsRequest,
2651    ) -> Result<AwsResponse, AwsServiceError> {
2652        let body = Self::parse_body(req)?;
2653        let table_name = require_str(&body, "TableName")?;
2654        let stream_arn = require_str(&body, "StreamArn")?;
2655        let precision = body["UpdateKinesisStreamingConfiguration"]
2656            ["ApproximateCreationDateTimePrecision"]
2657            .as_str()
2658            .unwrap_or("MILLISECOND");
2659
2660        let mut state = self.state.write();
2661        let table = get_table_mut(&mut state.tables, table_name)?;
2662
2663        if let Some(dest) = table
2664            .kinesis_destinations
2665            .iter_mut()
2666            .find(|d| d.stream_arn == stream_arn)
2667        {
2668            dest.approximate_creation_date_time_precision = precision.to_string();
2669        }
2670
2671        Self::ok_json(json!({
2672            "TableName": table_name,
2673            "StreamArn": stream_arn,
2674            "DestinationStatus": "ACTIVE",
2675            "UpdateKinesisStreamingConfiguration": {
2676                "ApproximateCreationDateTimePrecision": precision
2677            }
2678        }))
2679    }
2680
2681    // ── Contributor Insights ───────────────────────────────────────────
2682
2683    fn describe_contributor_insights(
2684        &self,
2685        req: &AwsRequest,
2686    ) -> Result<AwsResponse, AwsServiceError> {
2687        let body = Self::parse_body(req)?;
2688        let table_name = require_str(&body, "TableName")?;
2689        let index_name = body["IndexName"].as_str();
2690
2691        let state = self.state.read();
2692        let table = get_table(&state.tables, table_name)?;
2693
2694        let top = table.top_contributors(10);
2695        let contributors: Vec<Value> = top
2696            .iter()
2697            .map(|(key, count)| {
2698                json!({
2699                    "Key": key,
2700                    "Count": count
2701                })
2702            })
2703            .collect();
2704
2705        let mut result = json!({
2706            "TableName": table_name,
2707            "ContributorInsightsStatus": table.contributor_insights_status,
2708            "ContributorInsightsRuleList": ["DynamoDBContributorInsights"],
2709            "TopContributors": contributors
2710        });
2711        if let Some(idx) = index_name {
2712            result["IndexName"] = json!(idx);
2713        }
2714
2715        Self::ok_json(result)
2716    }
2717
2718    fn update_contributor_insights(
2719        &self,
2720        req: &AwsRequest,
2721    ) -> Result<AwsResponse, AwsServiceError> {
2722        let body = Self::parse_body(req)?;
2723        let table_name = require_str(&body, "TableName")?;
2724        let action = require_str(&body, "ContributorInsightsAction")?;
2725        let index_name = body["IndexName"].as_str();
2726
2727        let mut state = self.state.write();
2728        let table = get_table_mut(&mut state.tables, table_name)?;
2729
2730        let status = match action {
2731            "ENABLE" => "ENABLED",
2732            "DISABLE" => "DISABLED",
2733            _ => {
2734                return Err(AwsServiceError::aws_error(
2735                    StatusCode::BAD_REQUEST,
2736                    "ValidationException",
2737                    format!("Invalid ContributorInsightsAction: {action}"),
2738                ))
2739            }
2740        };
2741        table.contributor_insights_status = status.to_string();
2742        if status == "DISABLED" {
2743            table.contributor_insights_counters.clear();
2744        }
2745
2746        let mut result = json!({
2747            "TableName": table_name,
2748            "ContributorInsightsStatus": status
2749        });
2750        if let Some(idx) = index_name {
2751            result["IndexName"] = json!(idx);
2752        }
2753
2754        Self::ok_json(result)
2755    }
2756
2757    fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2758        let body = Self::parse_body(req)?;
2759        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2760        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2761        let table_name = body["TableName"].as_str();
2762
2763        let state = self.state.read();
2764        let summaries: Vec<Value> = state
2765            .tables
2766            .values()
2767            .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2768            .map(|t| {
2769                json!({
2770                    "TableName": t.name,
2771                    "ContributorInsightsStatus": t.contributor_insights_status
2772                })
2773            })
2774            .collect();
2775
2776        Self::ok_json(json!({
2777            "ContributorInsightsSummaries": summaries
2778        }))
2779    }
2780
2781    // ── Import/Export ──────────────────────────────────────────────────
2782
2783    fn export_table_to_point_in_time(
2784        &self,
2785        req: &AwsRequest,
2786    ) -> Result<AwsResponse, AwsServiceError> {
2787        let body = Self::parse_body(req)?;
2788        let table_arn = require_str(&body, "TableArn")?;
2789        let s3_bucket = require_str(&body, "S3Bucket")?;
2790        let s3_prefix = body["S3Prefix"].as_str();
2791        let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2792
2793        let state = self.state.read();
2794        // Verify table exists and get items
2795        let table = find_table_by_arn(&state.tables, table_arn)?;
2796        let items = table.items.clone();
2797        let item_count = items.len() as i64;
2798
2799        let now = Utc::now();
2800        let export_arn = format!(
2801            "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2802            state.region,
2803            state.account_id,
2804            table_arn.rsplit('/').next().unwrap_or("unknown"),
2805            uuid::Uuid::new_v4()
2806        );
2807
2808        drop(state);
2809
2810        // Serialize items as JSON Lines and write to S3
2811        let mut json_lines = String::new();
2812        for item in &items {
2813            let item_json = if export_format == "DYNAMODB_JSON" {
2814                json!({ "Item": item })
2815            } else {
2816                json!(item)
2817            };
2818            json_lines.push_str(&serde_json::to_string(&item_json).unwrap_or_default());
2819            json_lines.push('\n');
2820        }
2821        let data_size = json_lines.len() as i64;
2822
2823        // Build S3 key for the export data
2824        let s3_key = if let Some(prefix) = s3_prefix {
2825            format!("{prefix}/data/manifest-files.json")
2826        } else {
2827            "data/manifest-files.json".to_string()
2828        };
2829
2830        // Write to S3 if we have access to S3 state
2831        let mut export_failed = false;
2832        let mut failure_reason = String::new();
2833        if let Some(ref s3_state) = self.s3_state {
2834            let mut s3 = s3_state.write();
2835            if let Some(bucket) = s3.buckets.get_mut(s3_bucket) {
2836                let etag = uuid::Uuid::new_v4().to_string().replace('-', "");
2837                let obj = fakecloud_s3::state::S3Object {
2838                    key: s3_key.clone(),
2839                    data: bytes::Bytes::from(json_lines),
2840                    content_type: "application/json".to_string(),
2841                    etag,
2842                    size: data_size as u64,
2843                    last_modified: now,
2844                    metadata: HashMap::new(),
2845                    storage_class: "STANDARD".to_string(),
2846                    tags: HashMap::new(),
2847                    acl_grants: Vec::new(),
2848                    acl_owner_id: None,
2849                    parts_count: None,
2850                    part_sizes: None,
2851                    sse_algorithm: None,
2852                    sse_kms_key_id: None,
2853                    bucket_key_enabled: None,
2854                    version_id: None,
2855                    is_delete_marker: false,
2856                    content_encoding: None,
2857                    website_redirect_location: None,
2858                    restore_ongoing: None,
2859                    restore_expiry: None,
2860                    checksum_algorithm: None,
2861                    checksum_value: None,
2862                    lock_mode: None,
2863                    lock_retain_until: None,
2864                    lock_legal_hold: None,
2865                };
2866                bucket.objects.insert(s3_key, obj);
2867            } else {
2868                export_failed = true;
2869                failure_reason = format!("S3 bucket does not exist: {s3_bucket}");
2870            }
2871        }
2872
2873        let export_status = if export_failed { "FAILED" } else { "COMPLETED" };
2874
2875        let export = ExportDescription {
2876            export_arn: export_arn.clone(),
2877            export_status: export_status.to_string(),
2878            table_arn: table_arn.to_string(),
2879            s3_bucket: s3_bucket.to_string(),
2880            s3_prefix: s3_prefix.map(|s| s.to_string()),
2881            export_format: export_format.to_string(),
2882            start_time: now,
2883            end_time: now,
2884            export_time: now,
2885            item_count,
2886            billed_size_bytes: data_size,
2887        };
2888
2889        let mut state = self.state.write();
2890        state.exports.insert(export_arn.clone(), export);
2891
2892        let mut response = json!({
2893            "ExportDescription": {
2894                "ExportArn": export_arn,
2895                "ExportStatus": export_status,
2896                "TableArn": table_arn,
2897                "S3Bucket": s3_bucket,
2898                "S3Prefix": s3_prefix,
2899                "ExportFormat": export_format,
2900                "StartTime": now.timestamp() as f64,
2901                "EndTime": now.timestamp() as f64,
2902                "ExportTime": now.timestamp() as f64,
2903                "ItemCount": item_count,
2904                "BilledSizeBytes": data_size
2905            }
2906        });
2907        if export_failed {
2908            response["ExportDescription"]["FailureCode"] = json!("S3NoSuchBucket");
2909            response["ExportDescription"]["FailureMessage"] = json!(failure_reason);
2910        }
2911        Self::ok_json(response)
2912    }
2913
2914    fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2915        let body = Self::parse_body(req)?;
2916        let export_arn = require_str(&body, "ExportArn")?;
2917
2918        let state = self.state.read();
2919        let export = state.exports.get(export_arn).ok_or_else(|| {
2920            AwsServiceError::aws_error(
2921                StatusCode::BAD_REQUEST,
2922                "ExportNotFoundException",
2923                format!("Export not found: {export_arn}"),
2924            )
2925        })?;
2926
2927        Self::ok_json(json!({
2928            "ExportDescription": {
2929                "ExportArn": export.export_arn,
2930                "ExportStatus": export.export_status,
2931                "TableArn": export.table_arn,
2932                "S3Bucket": export.s3_bucket,
2933                "S3Prefix": export.s3_prefix,
2934                "ExportFormat": export.export_format,
2935                "StartTime": export.start_time.timestamp() as f64,
2936                "EndTime": export.end_time.timestamp() as f64,
2937                "ExportTime": export.export_time.timestamp() as f64,
2938                "ItemCount": export.item_count,
2939                "BilledSizeBytes": export.billed_size_bytes
2940            }
2941        }))
2942    }
2943
2944    fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2945        let body = Self::parse_body(req)?;
2946        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2947        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2948        let table_arn = body["TableArn"].as_str();
2949
2950        let state = self.state.read();
2951        let summaries: Vec<Value> = state
2952            .exports
2953            .values()
2954            .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2955            .map(|e| {
2956                json!({
2957                    "ExportArn": e.export_arn,
2958                    "ExportStatus": e.export_status,
2959                    "TableArn": e.table_arn
2960                })
2961            })
2962            .collect();
2963
2964        Self::ok_json(json!({
2965            "ExportSummaries": summaries
2966        }))
2967    }
2968
2969    fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2970        let body = Self::parse_body(req)?;
2971        let input_format = require_str(&body, "InputFormat")?;
2972        let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2973            AwsServiceError::aws_error(
2974                StatusCode::BAD_REQUEST,
2975                "ValidationException",
2976                "S3BucketSource is required",
2977            )
2978        })?;
2979        let s3_bucket = s3_source
2980            .get("S3Bucket")
2981            .and_then(|v| v.as_str())
2982            .unwrap_or("");
2983        let s3_key_prefix = s3_source
2984            .get("S3KeyPrefix")
2985            .and_then(|v| v.as_str())
2986            .unwrap_or("");
2987
2988        let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2989            AwsServiceError::aws_error(
2990                StatusCode::BAD_REQUEST,
2991                "ValidationException",
2992                "TableCreationParameters is required",
2993            )
2994        })?;
2995        let table_name = table_params
2996            .get("TableName")
2997            .and_then(|v| v.as_str())
2998            .ok_or_else(|| {
2999                AwsServiceError::aws_error(
3000                    StatusCode::BAD_REQUEST,
3001                    "ValidationException",
3002                    "TableCreationParameters.TableName is required",
3003                )
3004            })?;
3005
3006        let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
3007        let attribute_definitions = parse_attribute_definitions(
3008            table_params
3009                .get("AttributeDefinitions")
3010                .unwrap_or(&Value::Null),
3011        )?;
3012
3013        // Read items from S3 if we have access
3014        let mut imported_items: Vec<HashMap<String, Value>> = Vec::new();
3015        let mut processed_size_bytes: i64 = 0;
3016        if let Some(ref s3_state) = self.s3_state {
3017            let s3 = s3_state.read();
3018            let bucket = s3.buckets.get(s3_bucket).ok_or_else(|| {
3019                AwsServiceError::aws_error(
3020                    StatusCode::BAD_REQUEST,
3021                    "ImportConflictException",
3022                    format!("S3 bucket does not exist: {s3_bucket}"),
3023                )
3024            })?;
3025            // Find all objects under the prefix and try to parse JSON Lines from each
3026            let prefix = if s3_key_prefix.is_empty() {
3027                String::new()
3028            } else {
3029                s3_key_prefix.to_string()
3030            };
3031            for (key, obj) in &bucket.objects {
3032                if !prefix.is_empty() && !key.starts_with(&prefix) {
3033                    continue;
3034                }
3035                let data = std::str::from_utf8(&obj.data).unwrap_or("");
3036                processed_size_bytes += obj.size as i64;
3037                for line in data.lines() {
3038                    let line = line.trim();
3039                    if line.is_empty() {
3040                        continue;
3041                    }
3042                    if let Ok(parsed) = serde_json::from_str::<Value>(line) {
3043                        // DYNAMODB_JSON format wraps items in {"Item": {...}}
3044                        let item = if input_format == "DYNAMODB_JSON" {
3045                            if let Some(item_obj) = parsed.get("Item") {
3046                                item_obj.as_object().cloned().unwrap_or_default()
3047                            } else {
3048                                parsed.as_object().cloned().unwrap_or_default()
3049                            }
3050                        } else {
3051                            parsed.as_object().cloned().unwrap_or_default()
3052                        };
3053                        if !item.is_empty() {
3054                            imported_items
3055                                .push(item.into_iter().collect::<HashMap<String, Value>>());
3056                        }
3057                    }
3058                }
3059            }
3060        }
3061
3062        let mut state = self.state.write();
3063
3064        if state.tables.contains_key(table_name) {
3065            return Err(AwsServiceError::aws_error(
3066                StatusCode::BAD_REQUEST,
3067                "ResourceInUseException",
3068                format!("Table already exists: {table_name}"),
3069            ));
3070        }
3071
3072        let now = Utc::now();
3073        let table_arn = format!(
3074            "arn:aws:dynamodb:{}:{}:table/{}",
3075            state.region, state.account_id, table_name
3076        );
3077        let import_arn = format!(
3078            "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
3079            state.region,
3080            state.account_id,
3081            table_name,
3082            uuid::Uuid::new_v4()
3083        );
3084
3085        let processed_item_count = imported_items.len() as i64;
3086
3087        let mut table = DynamoTable {
3088            name: table_name.to_string(),
3089            arn: table_arn.clone(),
3090            key_schema,
3091            attribute_definitions,
3092            provisioned_throughput: ProvisionedThroughput {
3093                read_capacity_units: 0,
3094                write_capacity_units: 0,
3095            },
3096            items: imported_items,
3097            gsi: Vec::new(),
3098            lsi: Vec::new(),
3099            tags: HashMap::new(),
3100            created_at: now,
3101            status: "ACTIVE".to_string(),
3102            item_count: 0,
3103            size_bytes: 0,
3104            billing_mode: "PAY_PER_REQUEST".to_string(),
3105            ttl_attribute: None,
3106            ttl_enabled: false,
3107            resource_policy: None,
3108            pitr_enabled: false,
3109            kinesis_destinations: Vec::new(),
3110            contributor_insights_status: "DISABLED".to_string(),
3111            contributor_insights_counters: HashMap::new(),
3112            stream_enabled: false,
3113            stream_view_type: None,
3114            stream_arn: None,
3115            stream_records: Arc::new(RwLock::new(Vec::new())),
3116            sse_type: None,
3117            sse_kms_key_arn: None,
3118        };
3119        table.recalculate_stats();
3120        state.tables.insert(table_name.to_string(), table);
3121
3122        let import_desc = ImportDescription {
3123            import_arn: import_arn.clone(),
3124            import_status: "COMPLETED".to_string(),
3125            table_arn: table_arn.clone(),
3126            table_name: table_name.to_string(),
3127            s3_bucket_source: s3_bucket.to_string(),
3128            input_format: input_format.to_string(),
3129            start_time: now,
3130            end_time: now,
3131            processed_item_count,
3132            processed_size_bytes,
3133        };
3134        state.imports.insert(import_arn.clone(), import_desc);
3135
3136        let table_ref = state.tables.get(table_name).unwrap();
3137        let ks: Vec<Value> = table_ref
3138            .key_schema
3139            .iter()
3140            .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3141            .collect();
3142        let ad: Vec<Value> = table_ref
3143            .attribute_definitions
3144            .iter()
3145            .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3146            .collect();
3147
3148        Self::ok_json(json!({
3149            "ImportTableDescription": {
3150                "ImportArn": import_arn,
3151                "ImportStatus": "COMPLETED",
3152                "TableArn": table_arn,
3153                "TableId": uuid::Uuid::new_v4().to_string(),
3154                "S3BucketSource": {
3155                    "S3Bucket": s3_bucket
3156                },
3157                "InputFormat": input_format,
3158                "TableCreationParameters": {
3159                    "TableName": table_name,
3160                    "KeySchema": ks,
3161                    "AttributeDefinitions": ad
3162                },
3163                "StartTime": now.timestamp() as f64,
3164                "EndTime": now.timestamp() as f64,
3165                "ProcessedItemCount": processed_item_count,
3166                "ProcessedSizeBytes": processed_size_bytes
3167            }
3168        }))
3169    }
3170
3171    fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3172        let body = Self::parse_body(req)?;
3173        let import_arn = require_str(&body, "ImportArn")?;
3174
3175        let state = self.state.read();
3176        let import = state.imports.get(import_arn).ok_or_else(|| {
3177            AwsServiceError::aws_error(
3178                StatusCode::BAD_REQUEST,
3179                "ImportNotFoundException",
3180                format!("Import not found: {import_arn}"),
3181            )
3182        })?;
3183
3184        Self::ok_json(json!({
3185            "ImportTableDescription": {
3186                "ImportArn": import.import_arn,
3187                "ImportStatus": import.import_status,
3188                "TableArn": import.table_arn,
3189                "S3BucketSource": {
3190                    "S3Bucket": import.s3_bucket_source
3191                },
3192                "InputFormat": import.input_format,
3193                "StartTime": import.start_time.timestamp() as f64,
3194                "EndTime": import.end_time.timestamp() as f64,
3195                "ProcessedItemCount": import.processed_item_count,
3196                "ProcessedSizeBytes": import.processed_size_bytes
3197            }
3198        }))
3199    }
3200
3201    fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3202        let body = Self::parse_body(req)?;
3203        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
3204        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
3205        validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
3206        let table_arn = body["TableArn"].as_str();
3207
3208        let state = self.state.read();
3209        let summaries: Vec<Value> = state
3210            .imports
3211            .values()
3212            .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
3213            .map(|i| {
3214                json!({
3215                    "ImportArn": i.import_arn,
3216                    "ImportStatus": i.import_status,
3217                    "TableArn": i.table_arn
3218                })
3219            })
3220            .collect();
3221
3222        Self::ok_json(json!({
3223            "ImportSummaryList": summaries
3224        }))
3225    }
3226}
3227
3228#[async_trait]
3229impl AwsService for DynamoDbService {
3230    fn service_name(&self) -> &str {
3231        "dynamodb"
3232    }
3233
3234    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3235        match req.action.as_str() {
3236            "CreateTable" => self.create_table(&req),
3237            "DeleteTable" => self.delete_table(&req),
3238            "DescribeTable" => self.describe_table(&req),
3239            "ListTables" => self.list_tables(&req),
3240            "UpdateTable" => self.update_table(&req),
3241            "PutItem" => self.put_item(&req),
3242            "GetItem" => self.get_item(&req),
3243            "DeleteItem" => self.delete_item(&req),
3244            "UpdateItem" => self.update_item(&req),
3245            "Query" => self.query(&req),
3246            "Scan" => self.scan(&req),
3247            "BatchGetItem" => self.batch_get_item(&req),
3248            "BatchWriteItem" => self.batch_write_item(&req),
3249            "TagResource" => self.tag_resource(&req),
3250            "UntagResource" => self.untag_resource(&req),
3251            "ListTagsOfResource" => self.list_tags_of_resource(&req),
3252            "TransactGetItems" => self.transact_get_items(&req),
3253            "TransactWriteItems" => self.transact_write_items(&req),
3254            "ExecuteStatement" => self.execute_statement(&req),
3255            "BatchExecuteStatement" => self.batch_execute_statement(&req),
3256            "ExecuteTransaction" => self.execute_transaction(&req),
3257            "UpdateTimeToLive" => self.update_time_to_live(&req),
3258            "DescribeTimeToLive" => self.describe_time_to_live(&req),
3259            "PutResourcePolicy" => self.put_resource_policy(&req),
3260            "GetResourcePolicy" => self.get_resource_policy(&req),
3261            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
3262            // Stubs
3263            "DescribeEndpoints" => self.describe_endpoints(&req),
3264            "DescribeLimits" => self.describe_limits(&req),
3265            // Backups
3266            "CreateBackup" => self.create_backup(&req),
3267            "DeleteBackup" => self.delete_backup(&req),
3268            "DescribeBackup" => self.describe_backup(&req),
3269            "ListBackups" => self.list_backups(&req),
3270            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
3271            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
3272            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
3273            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
3274            // Global tables
3275            "CreateGlobalTable" => self.create_global_table(&req),
3276            "DescribeGlobalTable" => self.describe_global_table(&req),
3277            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
3278            "ListGlobalTables" => self.list_global_tables(&req),
3279            "UpdateGlobalTable" => self.update_global_table(&req),
3280            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
3281            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
3282            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
3283            // Kinesis streaming
3284            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
3285            "DisableKinesisStreamingDestination" => {
3286                self.disable_kinesis_streaming_destination(&req)
3287            }
3288            "DescribeKinesisStreamingDestination" => {
3289                self.describe_kinesis_streaming_destination(&req)
3290            }
3291            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
3292            // Contributor insights
3293            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
3294            "UpdateContributorInsights" => self.update_contributor_insights(&req),
3295            "ListContributorInsights" => self.list_contributor_insights(&req),
3296            // Import/Export
3297            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
3298            "DescribeExport" => self.describe_export(&req),
3299            "ListExports" => self.list_exports(&req),
3300            "ImportTable" => self.import_table(&req),
3301            "DescribeImport" => self.describe_import(&req),
3302            "ListImports" => self.list_imports(&req),
3303            _ => Err(AwsServiceError::action_not_implemented(
3304                "dynamodb",
3305                &req.action,
3306            )),
3307        }
3308    }
3309
3310    fn supported_actions(&self) -> &[&str] {
3311        &[
3312            "CreateTable",
3313            "DeleteTable",
3314            "DescribeTable",
3315            "ListTables",
3316            "UpdateTable",
3317            "PutItem",
3318            "GetItem",
3319            "DeleteItem",
3320            "UpdateItem",
3321            "Query",
3322            "Scan",
3323            "BatchGetItem",
3324            "BatchWriteItem",
3325            "TagResource",
3326            "UntagResource",
3327            "ListTagsOfResource",
3328            "TransactGetItems",
3329            "TransactWriteItems",
3330            "ExecuteStatement",
3331            "BatchExecuteStatement",
3332            "ExecuteTransaction",
3333            "UpdateTimeToLive",
3334            "DescribeTimeToLive",
3335            "PutResourcePolicy",
3336            "GetResourcePolicy",
3337            "DeleteResourcePolicy",
3338            "DescribeEndpoints",
3339            "DescribeLimits",
3340            "CreateBackup",
3341            "DeleteBackup",
3342            "DescribeBackup",
3343            "ListBackups",
3344            "RestoreTableFromBackup",
3345            "RestoreTableToPointInTime",
3346            "UpdateContinuousBackups",
3347            "DescribeContinuousBackups",
3348            "CreateGlobalTable",
3349            "DescribeGlobalTable",
3350            "DescribeGlobalTableSettings",
3351            "ListGlobalTables",
3352            "UpdateGlobalTable",
3353            "UpdateGlobalTableSettings",
3354            "DescribeTableReplicaAutoScaling",
3355            "UpdateTableReplicaAutoScaling",
3356            "EnableKinesisStreamingDestination",
3357            "DisableKinesisStreamingDestination",
3358            "DescribeKinesisStreamingDestination",
3359            "UpdateKinesisStreamingDestination",
3360            "DescribeContributorInsights",
3361            "UpdateContributorInsights",
3362            "ListContributorInsights",
3363            "ExportTableToPointInTime",
3364            "DescribeExport",
3365            "ListExports",
3366            "ImportTable",
3367            "DescribeImport",
3368            "ListImports",
3369        ]
3370    }
3371}
3372
3373// ── Helper functions ────────────────────────────────────────────────────
3374
3375fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
3376    body[field].as_str().ok_or_else(|| {
3377        AwsServiceError::aws_error(
3378            StatusCode::BAD_REQUEST,
3379            "ValidationException",
3380            format!("{field} is required"),
3381        )
3382    })
3383}
3384
3385fn require_object(
3386    body: &Value,
3387    field: &str,
3388) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
3389    let obj = body[field].as_object().ok_or_else(|| {
3390        AwsServiceError::aws_error(
3391            StatusCode::BAD_REQUEST,
3392            "ValidationException",
3393            format!("{field} is required"),
3394        )
3395    })?;
3396    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3397}
3398
3399fn get_table<'a>(
3400    tables: &'a HashMap<String, DynamoTable>,
3401    name: &str,
3402) -> Result<&'a DynamoTable, AwsServiceError> {
3403    tables.get(name).ok_or_else(|| {
3404        AwsServiceError::aws_error(
3405            StatusCode::BAD_REQUEST,
3406            "ResourceNotFoundException",
3407            format!("Requested resource not found: Table: {name} not found"),
3408        )
3409    })
3410}
3411
3412fn get_table_mut<'a>(
3413    tables: &'a mut HashMap<String, DynamoTable>,
3414    name: &str,
3415) -> Result<&'a mut DynamoTable, AwsServiceError> {
3416    tables.get_mut(name).ok_or_else(|| {
3417        AwsServiceError::aws_error(
3418            StatusCode::BAD_REQUEST,
3419            "ResourceNotFoundException",
3420            format!("Requested resource not found: Table: {name} not found"),
3421        )
3422    })
3423}
3424
3425fn find_table_by_arn<'a>(
3426    tables: &'a HashMap<String, DynamoTable>,
3427    arn: &str,
3428) -> Result<&'a DynamoTable, AwsServiceError> {
3429    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
3430        AwsServiceError::aws_error(
3431            StatusCode::BAD_REQUEST,
3432            "ResourceNotFoundException",
3433            format!("Requested resource not found: {arn}"),
3434        )
3435    })
3436}
3437
3438fn find_table_by_arn_mut<'a>(
3439    tables: &'a mut HashMap<String, DynamoTable>,
3440    arn: &str,
3441) -> Result<&'a mut DynamoTable, AwsServiceError> {
3442    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
3443        AwsServiceError::aws_error(
3444            StatusCode::BAD_REQUEST,
3445            "ResourceNotFoundException",
3446            format!("Requested resource not found: {arn}"),
3447        )
3448    })
3449}
3450
3451fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
3452    let arr = val.as_array().ok_or_else(|| {
3453        AwsServiceError::aws_error(
3454            StatusCode::BAD_REQUEST,
3455            "ValidationException",
3456            "KeySchema is required",
3457        )
3458    })?;
3459    Ok(arr
3460        .iter()
3461        .map(|elem| KeySchemaElement {
3462            attribute_name: elem["AttributeName"]
3463                .as_str()
3464                .unwrap_or_default()
3465                .to_string(),
3466            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
3467        })
3468        .collect())
3469}
3470
3471fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
3472    let arr = val.as_array().ok_or_else(|| {
3473        AwsServiceError::aws_error(
3474            StatusCode::BAD_REQUEST,
3475            "ValidationException",
3476            "AttributeDefinitions is required",
3477        )
3478    })?;
3479    Ok(arr
3480        .iter()
3481        .map(|elem| AttributeDefinition {
3482            attribute_name: elem["AttributeName"]
3483                .as_str()
3484                .unwrap_or_default()
3485                .to_string(),
3486            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
3487        })
3488        .collect())
3489}
3490
3491fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
3492    Ok(ProvisionedThroughput {
3493        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
3494        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
3495    })
3496}
3497
3498fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
3499    let Some(arr) = val.as_array() else {
3500        return Vec::new();
3501    };
3502    arr.iter()
3503        .filter_map(|g| {
3504            Some(GlobalSecondaryIndex {
3505                index_name: g["IndexName"].as_str()?.to_string(),
3506                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
3507                projection: parse_projection(&g["Projection"]),
3508                provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
3509                    .ok(),
3510            })
3511        })
3512        .collect()
3513}
3514
3515fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
3516    let Some(arr) = val.as_array() else {
3517        return Vec::new();
3518    };
3519    arr.iter()
3520        .filter_map(|l| {
3521            Some(LocalSecondaryIndex {
3522                index_name: l["IndexName"].as_str()?.to_string(),
3523                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
3524                projection: parse_projection(&l["Projection"]),
3525            })
3526        })
3527        .collect()
3528}
3529
3530fn parse_projection(val: &Value) -> Projection {
3531    Projection {
3532        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
3533        non_key_attributes: val["NonKeyAttributes"]
3534            .as_array()
3535            .map(|arr| {
3536                arr.iter()
3537                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3538                    .collect()
3539            })
3540            .unwrap_or_default(),
3541    }
3542}
3543
3544fn parse_tags(val: &Value) -> HashMap<String, String> {
3545    let mut tags = HashMap::new();
3546    if let Some(arr) = val.as_array() {
3547        for tag in arr {
3548            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
3549                tags.insert(k.to_string(), v.to_string());
3550            }
3551        }
3552    }
3553    tags
3554}
3555
3556fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
3557    let mut names = HashMap::new();
3558    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
3559        for (k, v) in obj {
3560            if let Some(s) = v.as_str() {
3561                names.insert(k.clone(), s.to_string());
3562            }
3563        }
3564    }
3565    names
3566}
3567
3568fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
3569    let mut values = HashMap::new();
3570    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
3571        for (k, v) in obj {
3572            values.insert(k.clone(), v.clone());
3573        }
3574    }
3575    values
3576}
3577
3578fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
3579    if name.starts_with('#') {
3580        expr_attr_names
3581            .get(name)
3582            .cloned()
3583            .unwrap_or_else(|| name.to_string())
3584    } else {
3585        name.to_string()
3586    }
3587}
3588
3589fn extract_key(
3590    table: &DynamoTable,
3591    item: &HashMap<String, AttributeValue>,
3592) -> HashMap<String, AttributeValue> {
3593    let mut key = HashMap::new();
3594    let hash_key = table.hash_key_name();
3595    if let Some(v) = item.get(hash_key) {
3596        key.insert(hash_key.to_string(), v.clone());
3597    }
3598    if let Some(range_key) = table.range_key_name() {
3599        if let Some(v) = item.get(range_key) {
3600            key.insert(range_key.to_string(), v.clone());
3601        }
3602    }
3603    key
3604}
3605
3606/// Parse a JSON object into a key map (used for ExclusiveStartKey).
3607fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
3608    let obj = value.as_object()?;
3609    if obj.is_empty() {
3610        return None;
3611    }
3612    Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3613}
3614
3615/// Check whether an item's key attributes match the given key map.
3616fn item_matches_key(
3617    item: &HashMap<String, AttributeValue>,
3618    key: &HashMap<String, AttributeValue>,
3619    hash_key_name: &str,
3620    range_key_name: Option<&str>,
3621) -> bool {
3622    let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
3623        (Some(a), Some(b)) => a == b,
3624        _ => false,
3625    };
3626    if !hash_match {
3627        return false;
3628    }
3629    match range_key_name {
3630        Some(rk) => match (item.get(rk), key.get(rk)) {
3631            (Some(a), Some(b)) => a == b,
3632            (None, None) => true,
3633            _ => false,
3634        },
3635        None => true,
3636    }
3637}
3638
3639/// Extract the primary key from an item given explicit key attribute names.
3640fn extract_key_for_schema(
3641    item: &HashMap<String, AttributeValue>,
3642    hash_key_name: &str,
3643    range_key_name: Option<&str>,
3644) -> HashMap<String, AttributeValue> {
3645    let mut key = HashMap::new();
3646    if let Some(v) = item.get(hash_key_name) {
3647        key.insert(hash_key_name.to_string(), v.clone());
3648    }
3649    if let Some(rk) = range_key_name {
3650        if let Some(v) = item.get(rk) {
3651            key.insert(rk.to_string(), v.clone());
3652        }
3653    }
3654    key
3655}
3656
3657fn validate_key_in_item(
3658    table: &DynamoTable,
3659    item: &HashMap<String, AttributeValue>,
3660) -> Result<(), AwsServiceError> {
3661    let hash_key = table.hash_key_name();
3662    if !item.contains_key(hash_key) {
3663        return Err(AwsServiceError::aws_error(
3664            StatusCode::BAD_REQUEST,
3665            "ValidationException",
3666            format!("Missing the key {hash_key} in the item"),
3667        ));
3668    }
3669    if let Some(range_key) = table.range_key_name() {
3670        if !item.contains_key(range_key) {
3671            return Err(AwsServiceError::aws_error(
3672                StatusCode::BAD_REQUEST,
3673                "ValidationException",
3674                format!("Missing the key {range_key} in the item"),
3675            ));
3676        }
3677    }
3678    Ok(())
3679}
3680
3681fn validate_key_attributes_in_key(
3682    table: &DynamoTable,
3683    key: &HashMap<String, AttributeValue>,
3684) -> Result<(), AwsServiceError> {
3685    let hash_key = table.hash_key_name();
3686    if !key.contains_key(hash_key) {
3687        return Err(AwsServiceError::aws_error(
3688            StatusCode::BAD_REQUEST,
3689            "ValidationException",
3690            format!("Missing the key {hash_key} in the item"),
3691        ));
3692    }
3693    Ok(())
3694}
3695
3696fn project_item(
3697    item: &HashMap<String, AttributeValue>,
3698    body: &Value,
3699) -> HashMap<String, AttributeValue> {
3700    let projection = body["ProjectionExpression"].as_str();
3701    match projection {
3702        Some(proj) if !proj.is_empty() => {
3703            let expr_attr_names = parse_expression_attribute_names(body);
3704            let attrs: Vec<String> = proj
3705                .split(',')
3706                .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3707                .collect();
3708            let mut result = HashMap::new();
3709            for attr in &attrs {
3710                if let Some(v) = resolve_nested_path(item, attr) {
3711                    insert_nested_value(&mut result, attr, v);
3712                }
3713            }
3714            result
3715        }
3716        _ => item.clone(),
3717    }
3718}
3719
3720/// Resolve expression attribute names within each segment of a projection path.
3721/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
3722fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3723    // Split on dots, resolve each part, rejoin
3724    let mut result = String::new();
3725    for (i, segment) in path.split('.').enumerate() {
3726        if i > 0 {
3727            result.push('.');
3728        }
3729        // A segment might be like "#n" or "people[0]" or "#attr[0]"
3730        if let Some(bracket_pos) = segment.find('[') {
3731            let key_part = &segment[..bracket_pos];
3732            let index_part = &segment[bracket_pos..];
3733            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3734            result.push_str(index_part);
3735        } else {
3736            result.push_str(&resolve_attr_name(segment, expr_attr_names));
3737        }
3738    }
3739    result
3740}
3741
3742/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
3743fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3744    let segments = parse_path_segments(path);
3745    if segments.is_empty() {
3746        return None;
3747    }
3748
3749    let first = &segments[0];
3750    let top_key = match first {
3751        PathSegment::Key(k) => k.as_str(),
3752        _ => return None,
3753    };
3754
3755    let mut current = item.get(top_key)?.clone();
3756
3757    for segment in &segments[1..] {
3758        match segment {
3759            PathSegment::Key(k) => {
3760                // Navigate into a Map: {"M": {"key": ...}}
3761                current = current.get("M")?.get(k)?.clone();
3762            }
3763            PathSegment::Index(idx) => {
3764                // Navigate into a List: {"L": [...]}
3765                current = current.get("L")?.get(*idx)?.clone();
3766            }
3767        }
3768    }
3769
3770    Some(current)
3771}
3772
3773#[derive(Debug)]
3774enum PathSegment {
3775    Key(String),
3776    Index(usize),
3777}
3778
3779/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
3780fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3781    let mut segments = Vec::new();
3782    let mut current = String::new();
3783
3784    let chars: Vec<char> = path.chars().collect();
3785    let mut i = 0;
3786    while i < chars.len() {
3787        match chars[i] {
3788            '.' => {
3789                if !current.is_empty() {
3790                    segments.push(PathSegment::Key(current.clone()));
3791                    current.clear();
3792                }
3793            }
3794            '[' => {
3795                if !current.is_empty() {
3796                    segments.push(PathSegment::Key(current.clone()));
3797                    current.clear();
3798                }
3799                i += 1;
3800                let mut num = String::new();
3801                while i < chars.len() && chars[i] != ']' {
3802                    num.push(chars[i]);
3803                    i += 1;
3804                }
3805                if let Ok(idx) = num.parse::<usize>() {
3806                    segments.push(PathSegment::Index(idx));
3807                }
3808                // skip ']'
3809            }
3810            c => {
3811                current.push(c);
3812            }
3813        }
3814        i += 1;
3815    }
3816    if !current.is_empty() {
3817        segments.push(PathSegment::Key(current));
3818    }
3819    segments
3820}
3821
3822/// Insert a value at a nested path in the result HashMap.
3823/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
3824fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3825    // Simple case: no nesting
3826    if !path.contains('.') && !path.contains('[') {
3827        result.insert(path.to_string(), value);
3828        return;
3829    }
3830
3831    let segments = parse_path_segments(path);
3832    if segments.is_empty() {
3833        return;
3834    }
3835
3836    let top_key = match &segments[0] {
3837        PathSegment::Key(k) => k.clone(),
3838        _ => return,
3839    };
3840
3841    if segments.len() == 1 {
3842        result.insert(top_key, value);
3843        return;
3844    }
3845
3846    // For nested paths, wrap the value back into the nested structure
3847    let wrapped = wrap_value_in_path(&segments[1..], value);
3848    // Merge into existing value if present
3849    let existing = result.remove(&top_key);
3850    let merged = match existing {
3851        Some(existing) => merge_attribute_values(existing, wrapped),
3852        None => wrapped,
3853    };
3854    result.insert(top_key, merged);
3855}
3856
3857/// Wrap a value in the nested path structure.
3858fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3859    if segments.is_empty() {
3860        return value;
3861    }
3862    let inner = wrap_value_in_path(&segments[1..], value);
3863    match &segments[0] {
3864        PathSegment::Key(k) => {
3865            json!({"M": {k.clone(): inner}})
3866        }
3867        PathSegment::Index(idx) => {
3868            let mut arr = vec![Value::Null; idx + 1];
3869            arr[*idx] = inner;
3870            json!({"L": arr})
3871        }
3872    }
3873}
3874
3875/// Merge two attribute values (for overlapping projections).
3876fn merge_attribute_values(a: Value, b: Value) -> Value {
3877    if let (Some(a_map), Some(b_map)) = (
3878        a.get("M").and_then(|v| v.as_object()),
3879        b.get("M").and_then(|v| v.as_object()),
3880    ) {
3881        let mut merged = a_map.clone();
3882        for (k, v) in b_map {
3883            if let Some(existing) = merged.get(k) {
3884                merged.insert(
3885                    k.clone(),
3886                    merge_attribute_values(existing.clone(), v.clone()),
3887                );
3888            } else {
3889                merged.insert(k.clone(), v.clone());
3890            }
3891        }
3892        json!({"M": merged})
3893    } else {
3894        b
3895    }
3896}
3897
3898fn evaluate_condition(
3899    condition: &str,
3900    existing: Option<&HashMap<String, AttributeValue>>,
3901    expr_attr_names: &HashMap<String, String>,
3902    expr_attr_values: &HashMap<String, Value>,
3903) -> Result<(), AwsServiceError> {
3904    // ConditionExpression and FilterExpression share the same DynamoDB grammar,
3905    // so we delegate to evaluate_filter_expression. An empty map models "item
3906    // doesn't exist" correctly: attribute_exists → false, attribute_not_exists
3907    // → true, comparisons against missing attributes → None vs Some(val).
3908    let empty = HashMap::new();
3909    let item = existing.unwrap_or(&empty);
3910    if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
3911        Ok(())
3912    } else {
3913        Err(AwsServiceError::aws_error(
3914            StatusCode::BAD_REQUEST,
3915            "ConditionalCheckFailedException",
3916            "The conditional request failed",
3917        ))
3918    }
3919}
3920
3921fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3922    // aws-sdk-go v2's expression builder emits function calls with a space
3923    // between the name and the opening paren (`attribute_exists (#0)`),
3924    // while hand-written expressions usually don't — accept both.
3925    let with_paren = format!("{func_name}(");
3926    let with_space = format!("{func_name} (");
3927    let rest = expr
3928        .strip_prefix(&with_paren)
3929        .or_else(|| expr.strip_prefix(&with_space))?;
3930    let inner = rest.strip_suffix(')')?;
3931    Some(inner.trim())
3932}
3933
3934fn evaluate_key_condition(
3935    expr: &str,
3936    item: &HashMap<String, AttributeValue>,
3937    hash_key_name: &str,
3938    _range_key_name: Option<&str>,
3939    expr_attr_names: &HashMap<String, String>,
3940    expr_attr_values: &HashMap<String, Value>,
3941) -> bool {
3942    let parts: Vec<&str> = split_on_and(expr);
3943    for part in &parts {
3944        let part = part.trim();
3945        if !evaluate_single_key_condition(
3946            part,
3947            item,
3948            hash_key_name,
3949            expr_attr_names,
3950            expr_attr_values,
3951        ) {
3952            return false;
3953        }
3954    }
3955    true
3956}
3957
3958fn split_on_and(expr: &str) -> Vec<&str> {
3959    let mut parts = Vec::new();
3960    let mut start = 0;
3961    let len = expr.len();
3962    let mut i = 0;
3963    let mut depth = 0;
3964    while i < len {
3965        let ch = expr.as_bytes()[i];
3966        if ch == b'(' {
3967            depth += 1;
3968        } else if ch == b')' {
3969            if depth > 0 {
3970                depth -= 1;
3971            }
3972        } else if depth == 0
3973            && i + 5 <= len
3974            && expr.is_char_boundary(i)
3975            && expr.is_char_boundary(i + 5)
3976            && expr[i..i + 5].eq_ignore_ascii_case(" AND ")
3977        {
3978            parts.push(&expr[start..i]);
3979            start = i + 5;
3980            i = start;
3981            continue;
3982        }
3983        i += 1;
3984    }
3985    parts.push(&expr[start..]);
3986    parts
3987}
3988
3989fn split_on_or(expr: &str) -> Vec<&str> {
3990    let mut parts = Vec::new();
3991    let mut start = 0;
3992    let len = expr.len();
3993    let mut i = 0;
3994    let mut depth = 0;
3995    while i < len {
3996        let ch = expr.as_bytes()[i];
3997        if ch == b'(' {
3998            depth += 1;
3999        } else if ch == b')' {
4000            if depth > 0 {
4001                depth -= 1;
4002            }
4003        } else if depth == 0
4004            && i + 4 <= len
4005            && expr.is_char_boundary(i)
4006            && expr.is_char_boundary(i + 4)
4007            && expr[i..i + 4].eq_ignore_ascii_case(" OR ")
4008        {
4009            parts.push(&expr[start..i]);
4010            start = i + 4;
4011            i = start;
4012            continue;
4013        }
4014        i += 1;
4015    }
4016    parts.push(&expr[start..]);
4017    parts
4018}
4019
4020fn evaluate_single_key_condition(
4021    part: &str,
4022    item: &HashMap<String, AttributeValue>,
4023    _hash_key_name: &str,
4024    expr_attr_names: &HashMap<String, String>,
4025    expr_attr_values: &HashMap<String, Value>,
4026) -> bool {
4027    let part = part.trim();
4028
4029    // begins_with(attr, :val) — S type only
4030    if let Some(rest) = part
4031        .strip_prefix("begins_with(")
4032        .or_else(|| part.strip_prefix("begins_with ("))
4033    {
4034        if let Some(inner) = rest.strip_suffix(')') {
4035            let mut split = inner.splitn(2, ',');
4036            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4037                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4038                let val_ref = val_ref.trim();
4039                let expected = expr_attr_values.get(val_ref);
4040                let actual = item.get(&attr_name);
4041                return match (actual, expected) {
4042                    (Some(a), Some(e)) => {
4043                        let a_str = a.get("S").and_then(|v| v.as_str());
4044                        let e_str = e.get("S").and_then(|v| v.as_str());
4045                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
4046                    }
4047                    _ => false,
4048                };
4049            }
4050        }
4051        return false;
4052    }
4053
4054    // BETWEEN
4055    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
4056        let attr_part = part[..between_pos].trim();
4057        let attr_name = resolve_attr_name(attr_part, expr_attr_names);
4058        let range_part = &part[between_pos + 7..];
4059        if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
4060            let lo_ref = range_part[..and_pos].trim();
4061            let hi_ref = range_part[and_pos + 5..].trim();
4062            let lo = expr_attr_values.get(lo_ref);
4063            let hi = expr_attr_values.get(hi_ref);
4064            let actual = item.get(&attr_name);
4065            return match (actual, lo, hi) {
4066                (Some(a), Some(l), Some(h)) => {
4067                    compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
4068                        && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
4069                }
4070                _ => false,
4071            };
4072        }
4073    }
4074
4075    // Simple comparison: attr <op> :val
4076    for op in &["<=", ">=", "<>", "=", "<", ">"] {
4077        if let Some(pos) = part.find(op) {
4078            let left = part[..pos].trim();
4079            let right = part[pos + op.len()..].trim();
4080            let attr_name = resolve_attr_name(left, expr_attr_names);
4081            let expected = expr_attr_values.get(right);
4082            let actual = item.get(&attr_name);
4083
4084            return match *op {
4085                "=" => actual == expected,
4086                "<>" => actual != expected,
4087                "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
4088                ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
4089                "<=" => {
4090                    let cmp = compare_attribute_values(actual, expected);
4091                    cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
4092                }
4093                ">=" => {
4094                    let cmp = compare_attribute_values(actual, expected);
4095                    cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
4096                }
4097                _ => false,
4098            };
4099        }
4100    }
4101
4102    false
4103}
4104
4105/// Returns the "size" of a DynamoDB attribute value per AWS docs:
4106/// S → character count, N → always 0 (AWS returns size of internal representation, we approximate),
4107/// B → byte count, SS/NS/BS → element count, L → element count, M → element count,
4108/// BOOL/NULL → 1.
4109fn attribute_size(val: &Value) -> Option<usize> {
4110    if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
4111        return Some(s.len());
4112    }
4113    if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
4114        // B is base64-encoded — return decoded byte count
4115        let decoded_len = base64::engine::general_purpose::STANDARD
4116            .decode(b)
4117            .map(|v| v.len())
4118            .unwrap_or(b.len());
4119        return Some(decoded_len);
4120    }
4121    if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
4122        return Some(arr.len());
4123    }
4124    if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
4125        return Some(arr.len());
4126    }
4127    if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
4128        return Some(arr.len());
4129    }
4130    if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
4131        return Some(arr.len());
4132    }
4133    if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
4134        return Some(obj.len());
4135    }
4136    if val.get("N").is_some() {
4137        // AWS returns numeric representation size; approximate with string length
4138        return val.get("N").and_then(|v| v.as_str()).map(|s| s.len());
4139    }
4140    if val.get("BOOL").is_some() || val.get("NULL").is_some() {
4141        return Some(1);
4142    }
4143    None
4144}
4145
4146/// Evaluate a `size(path) op :val` comparison expression.
4147fn evaluate_size_comparison(
4148    part: &str,
4149    item: &HashMap<String, AttributeValue>,
4150    expr_attr_names: &HashMap<String, String>,
4151    expr_attr_values: &HashMap<String, Value>,
4152) -> Option<bool> {
4153    // Find the closing paren of size(...)
4154    let open = part.find('(')?;
4155    let close = part[open..].find(')')? + open;
4156    let path = part[open + 1..close].trim();
4157    let remainder = part[close + 1..].trim();
4158
4159    // Parse operator and value ref
4160    let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
4161        ("<=", rest.trim())
4162    } else if let Some(rest) = remainder.strip_prefix(">=") {
4163        (">=", rest.trim())
4164    } else if let Some(rest) = remainder.strip_prefix("<>") {
4165        ("<>", rest.trim())
4166    } else if let Some(rest) = remainder.strip_prefix('<') {
4167        ("<", rest.trim())
4168    } else if let Some(rest) = remainder.strip_prefix('>') {
4169        (">", rest.trim())
4170    } else if let Some(rest) = remainder.strip_prefix('=') {
4171        ("=", rest.trim())
4172    } else {
4173        return None;
4174    };
4175
4176    let attr_name = resolve_attr_name(path, expr_attr_names);
4177    let actual = item.get(&attr_name)?;
4178    let size = attribute_size(actual)? as f64;
4179
4180    let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
4181
4182    Some(match op {
4183        "=" => (size - expected).abs() < f64::EPSILON,
4184        "<>" => (size - expected).abs() >= f64::EPSILON,
4185        "<" => size < expected,
4186        ">" => size > expected,
4187        "<=" => size <= expected,
4188        ">=" => size >= expected,
4189        _ => false,
4190    })
4191}
4192
4193fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
4194    match (a, b) {
4195        (None, None) => std::cmp::Ordering::Equal,
4196        (None, Some(_)) => std::cmp::Ordering::Less,
4197        (Some(_), None) => std::cmp::Ordering::Greater,
4198        (Some(a), Some(b)) => {
4199            let a_type = attribute_type_and_value(a);
4200            let b_type = attribute_type_and_value(b);
4201            match (a_type, b_type) {
4202                (Some(("S", a_val)), Some(("S", b_val))) => {
4203                    let a_str = a_val.as_str().unwrap_or("");
4204                    let b_str = b_val.as_str().unwrap_or("");
4205                    a_str.cmp(b_str)
4206                }
4207                (Some(("N", a_val)), Some(("N", b_val))) => {
4208                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4209                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4210                    a_num
4211                        .partial_cmp(&b_num)
4212                        .unwrap_or(std::cmp::Ordering::Equal)
4213                }
4214                (Some(("B", a_val)), Some(("B", b_val))) => {
4215                    let a_str = a_val.as_str().unwrap_or("");
4216                    let b_str = b_val.as_str().unwrap_or("");
4217                    a_str.cmp(b_str)
4218                }
4219                _ => std::cmp::Ordering::Equal,
4220            }
4221        }
4222    }
4223}
4224
4225fn evaluate_filter_expression(
4226    expr: &str,
4227    item: &HashMap<String, AttributeValue>,
4228    expr_attr_names: &HashMap<String, String>,
4229    expr_attr_values: &HashMap<String, Value>,
4230) -> bool {
4231    let trimmed = expr.trim();
4232
4233    // Split on OR first (lower precedence), respecting parentheses
4234    let or_parts = split_on_or(trimmed);
4235    if or_parts.len() > 1 {
4236        return or_parts.iter().any(|part| {
4237            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4238        });
4239    }
4240
4241    // Then split on AND (higher precedence), respecting parentheses
4242    let and_parts = split_on_and(trimmed);
4243    if and_parts.len() > 1 {
4244        return and_parts.iter().all(|part| {
4245            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4246        });
4247    }
4248
4249    // Strip outer parentheses if present
4250    let stripped = strip_outer_parens(trimmed);
4251    if stripped != trimmed {
4252        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
4253    }
4254
4255    // Handle NOT prefix (case-insensitive)
4256    if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
4257        return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
4258    }
4259
4260    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
4261}
4262
4263/// Strip matching outer parentheses from an expression.
4264fn strip_outer_parens(expr: &str) -> &str {
4265    let trimmed = expr.trim();
4266    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
4267        return trimmed;
4268    }
4269    // Verify the outer parens actually match each other
4270    let inner = &trimmed[1..trimmed.len() - 1];
4271    let mut depth = 0;
4272    for ch in inner.bytes() {
4273        match ch {
4274            b'(' => depth += 1,
4275            b')' => {
4276                if depth == 0 {
4277                    return trimmed; // closing paren matches something inside, not the outer one
4278                }
4279                depth -= 1;
4280            }
4281            _ => {}
4282        }
4283    }
4284    if depth == 0 {
4285        inner
4286    } else {
4287        trimmed
4288    }
4289}
4290
4291fn evaluate_single_filter_condition(
4292    part: &str,
4293    item: &HashMap<String, AttributeValue>,
4294    expr_attr_names: &HashMap<String, String>,
4295    expr_attr_values: &HashMap<String, Value>,
4296) -> bool {
4297    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
4298        let attr = resolve_attr_name(inner, expr_attr_names);
4299        return item.contains_key(&attr);
4300    }
4301
4302    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
4303        let attr = resolve_attr_name(inner, expr_attr_names);
4304        return !item.contains_key(&attr);
4305    }
4306
4307    // begins_with only works on S (string) type — not N
4308    if let Some(rest) = part
4309        .strip_prefix("begins_with(")
4310        .or_else(|| part.strip_prefix("begins_with ("))
4311    {
4312        if let Some(inner) = rest.strip_suffix(')') {
4313            let mut split = inner.splitn(2, ',');
4314            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4315                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4316                let expected = expr_attr_values.get(val_ref.trim());
4317                let actual = item.get(&attr_name);
4318                return match (actual, expected) {
4319                    (Some(a), Some(e)) => {
4320                        let a_str = a.get("S").and_then(|v| v.as_str());
4321                        let e_str = e.get("S").and_then(|v| v.as_str());
4322                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
4323                    }
4324                    _ => false,
4325                };
4326            }
4327        }
4328    }
4329
4330    // contains: works on S (substring), SS/NS/BS/L (set membership)
4331    if let Some(rest) = part
4332        .strip_prefix("contains(")
4333        .or_else(|| part.strip_prefix("contains ("))
4334    {
4335        if let Some(inner) = rest.strip_suffix(')') {
4336            let mut split = inner.splitn(2, ',');
4337            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4338                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4339                let expected = expr_attr_values.get(val_ref.trim());
4340                let actual = item.get(&attr_name);
4341                return match (actual, expected) {
4342                    (Some(a), Some(e)) => {
4343                        // String substring check (S type only)
4344                        if let (Some(a_s), Some(e_s)) = (
4345                            a.get("S").and_then(|v| v.as_str()),
4346                            e.get("S").and_then(|v| v.as_str()),
4347                        ) {
4348                            return a_s.contains(e_s);
4349                        }
4350                        // Set/list membership
4351                        if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
4352                            if let Some(val) = e.get("S") {
4353                                return set.contains(val);
4354                            }
4355                        }
4356                        if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
4357                            if let Some(val) = e.get("N") {
4358                                return set.contains(val);
4359                            }
4360                        }
4361                        if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
4362                            if let Some(val) = e.get("B") {
4363                                return set.contains(val);
4364                            }
4365                        }
4366                        if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
4367                            return list.contains(e);
4368                        }
4369                        false
4370                    }
4371                    _ => false,
4372                };
4373            }
4374        }
4375    }
4376
4377    // size(path) op :val — attribute size comparison
4378    if part.starts_with("size(") || part.starts_with("size (") {
4379        if let Some(result) =
4380            evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
4381        {
4382            return result;
4383        }
4384    }
4385
4386    // attribute_type(path, :type)
4387    if part.starts_with("attribute_type(") || part.starts_with("attribute_type (") {
4388        if let Some(rest) = part
4389            .strip_prefix("attribute_type(")
4390            .or_else(|| part.strip_prefix("attribute_type ("))
4391        {
4392            if let Some(inner) = rest.strip_suffix(')') {
4393                let mut split = inner.splitn(2, ',');
4394                if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4395                    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4396                    let expected_type = expr_attr_values
4397                        .get(val_ref.trim())
4398                        .and_then(|v| v.get("S"))
4399                        .and_then(|v| v.as_str());
4400                    let actual = item.get(&attr_name);
4401                    return match (actual, expected_type) {
4402                        (Some(val), Some(t)) => match t {
4403                            "S" => val.get("S").is_some(),
4404                            "N" => val.get("N").is_some(),
4405                            "B" => val.get("B").is_some(),
4406                            "BOOL" => val.get("BOOL").is_some(),
4407                            "NULL" => val.get("NULL").is_some(),
4408                            "SS" => val.get("SS").is_some(),
4409                            "NS" => val.get("NS").is_some(),
4410                            "BS" => val.get("BS").is_some(),
4411                            "L" => val.get("L").is_some(),
4412                            "M" => val.get("M").is_some(),
4413                            _ => false,
4414                        },
4415                        _ => false,
4416                    };
4417                }
4418            }
4419        }
4420    }
4421
4422    if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
4423        let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
4424        let actual = item.get(&attr_name);
4425        return evaluate_in_match(actual, &value_refs, expr_attr_values);
4426    }
4427
4428    evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
4429}
4430
4431/// Parse an `attr IN (:v1, :v2, ...)` expression. Mirrors the DynamoDB
4432/// ConditionExpression / FilterExpression grammar where IN takes a single
4433/// operand on the left and 1–100 comma-separated value refs inside parens
4434/// on the right. Case-insensitive; tolerates missing spaces after commas
4435/// (aws-sdk-go's `expression` builder emits ", " but hand-built expressions
4436/// often use `strings.Join(..., ",")`). Returns None for non-IN inputs so
4437/// callers can fall through to their other grammar branches.
4438fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
4439    let upper = expr.to_ascii_uppercase();
4440    let in_pos = upper.find(" IN ")?;
4441    let attr_ref = expr[..in_pos].trim();
4442    if attr_ref.is_empty() {
4443        return None;
4444    }
4445    let rest = expr[in_pos + 4..].trim_start();
4446    let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
4447    let values: Vec<&str> = inner
4448        .split(',')
4449        .map(|s| s.trim())
4450        .filter(|s| !s.is_empty())
4451        .collect();
4452    if values.is_empty() {
4453        return None;
4454    }
4455    Some((attr_ref, values))
4456}
4457
4458/// Return true iff `actual` equals any of the `value_refs` resolved through
4459/// `expr_attr_values`. A missing attribute never matches (mirrors AWS, which
4460/// evaluates `IN` against undefined attributes as false).
4461fn evaluate_in_match(
4462    actual: Option<&AttributeValue>,
4463    value_refs: &[&str],
4464    expr_attr_values: &HashMap<String, Value>,
4465) -> bool {
4466    value_refs.iter().any(|v_ref| {
4467        let expected = expr_attr_values.get(*v_ref);
4468        matches!((actual, expected), (Some(a), Some(e)) if a == e)
4469    })
4470}
4471
4472fn apply_update_expression(
4473    item: &mut HashMap<String, AttributeValue>,
4474    expr: &str,
4475    expr_attr_names: &HashMap<String, String>,
4476    expr_attr_values: &HashMap<String, Value>,
4477) -> Result<(), AwsServiceError> {
4478    let clauses = parse_update_clauses(expr);
4479    if clauses.is_empty() && !expr.trim().is_empty() {
4480        return Err(AwsServiceError::aws_error(
4481            StatusCode::BAD_REQUEST,
4482            "ValidationException",
4483            "Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
4484        ));
4485    }
4486    for (action, assignments) in &clauses {
4487        match action.to_ascii_uppercase().as_str() {
4488            "SET" => {
4489                for assignment in assignments {
4490                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4491                }
4492            }
4493            "REMOVE" => {
4494                for attr_ref in assignments {
4495                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4496                    item.remove(&attr);
4497                }
4498            }
4499            "ADD" => {
4500                for assignment in assignments {
4501                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4502                }
4503            }
4504            "DELETE" => {
4505                for assignment in assignments {
4506                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4507                }
4508            }
4509            other => {
4510                return Err(AwsServiceError::aws_error(
4511                    StatusCode::BAD_REQUEST,
4512                    "ValidationException",
4513                    format!("Invalid UpdateExpression: Invalid action: {}", other),
4514                ));
4515            }
4516        }
4517    }
4518    Ok(())
4519}
4520
4521fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
4522    let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
4523    let upper = expr.to_ascii_uppercase();
4524    let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
4525    let mut positions: Vec<(usize, &str)> = Vec::new();
4526
4527    for kw in &keywords {
4528        let mut search_from = 0;
4529        while let Some(pos) = upper[search_from..].find(kw) {
4530            let abs_pos = search_from + pos;
4531            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
4532            let after_pos = abs_pos + kw.len();
4533            let after_ok =
4534                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
4535            if before_ok && after_ok {
4536                positions.push((abs_pos, kw));
4537            }
4538            search_from = abs_pos + kw.len();
4539        }
4540    }
4541
4542    positions.sort_by_key(|(pos, _)| *pos);
4543
4544    for (i, &(pos, kw)) in positions.iter().enumerate() {
4545        let start = pos + kw.len();
4546        let end = if i + 1 < positions.len() {
4547            positions[i + 1].0
4548        } else {
4549            expr.len()
4550        };
4551        let content = expr[start..end].trim();
4552        let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
4553        clauses.push((kw.to_string(), assignments));
4554    }
4555
4556    clauses
4557}
4558
4559fn apply_set_assignment(
4560    item: &mut HashMap<String, AttributeValue>,
4561    assignment: &str,
4562    expr_attr_names: &HashMap<String, String>,
4563    expr_attr_values: &HashMap<String, Value>,
4564) -> Result<(), AwsServiceError> {
4565    let Some((left, right)) = assignment.split_once('=') else {
4566        return Ok(());
4567    };
4568
4569    let left_trimmed = left.trim();
4570    // Split off a trailing `[N]` list-index suffix so we can resolve the
4571    // attribute name ref on its own. Without this, `resolve_attr_name` sees
4572    // "#items[0]" as a whole and misses the `#items` → `items` mapping.
4573    let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
4574        Some((name, idx)) => (name, Some(idx)),
4575        None => (left_trimmed, None),
4576    };
4577    let attr = resolve_attr_name(attr_ref, expr_attr_names);
4578    let right = right.trim();
4579
4580    // if_not_exists(attr, :val)
4581    if let Some(rest) = right
4582        .strip_prefix("if_not_exists(")
4583        .or_else(|| right.strip_prefix("if_not_exists ("))
4584    {
4585        if let Some(inner) = rest.strip_suffix(')') {
4586            let mut split = inner.splitn(2, ',');
4587            if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
4588                let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
4589                if !item.contains_key(&check_name) {
4590                    if let Some(val) = expr_attr_values.get(default_ref.trim()) {
4591                        item.insert(attr, val.clone());
4592                    }
4593                }
4594                return Ok(());
4595            }
4596        }
4597    }
4598
4599    // list_append(a, b)
4600    if let Some(rest) = right
4601        .strip_prefix("list_append(")
4602        .or_else(|| right.strip_prefix("list_append ("))
4603    {
4604        if let Some(inner) = rest.strip_suffix(')') {
4605            let mut split = inner.splitn(2, ',');
4606            if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
4607                let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
4608                let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
4609
4610                let mut merged = Vec::new();
4611                if let Some(Value::Object(obj)) = &a_val {
4612                    if let Some(Value::Array(arr)) = obj.get("L") {
4613                        merged.extend(arr.clone());
4614                    }
4615                }
4616                if let Some(Value::Object(obj)) = &b_val {
4617                    if let Some(Value::Array(arr)) = obj.get("L") {
4618                        merged.extend(arr.clone());
4619                    }
4620                }
4621
4622                item.insert(attr, json!({"L": merged}));
4623                return Ok(());
4624            }
4625        }
4626    }
4627
4628    // Arithmetic: attr + :val or attr - :val
4629    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
4630        let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
4631        let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
4632
4633        // Both operands must be numeric (N type)
4634        let left_num = match extract_number(&left_val) {
4635            Some(n) => n,
4636            None if left_val.is_some() => {
4637                return Err(AwsServiceError::aws_error(
4638                    StatusCode::BAD_REQUEST,
4639                    "ValidationException",
4640                    "An operand in the update expression has an incorrect data type",
4641                ));
4642            }
4643            None => 0.0, // attribute doesn't exist yet — treat as 0
4644        };
4645        let right_num = match extract_number(&right_val) {
4646            Some(n) => n,
4647            None => {
4648                return Err(AwsServiceError::aws_error(
4649                    StatusCode::BAD_REQUEST,
4650                    "ValidationException",
4651                    "An operand in the update expression has an incorrect data type",
4652                ));
4653            }
4654        };
4655
4656        let result = if is_add {
4657            left_num + right_num
4658        } else {
4659            left_num - right_num
4660        };
4661
4662        let num_str = if result == result.trunc() {
4663            format!("{}", result as i64)
4664        } else {
4665            format!("{result}")
4666        };
4667
4668        item.insert(attr, json!({"N": num_str}));
4669        return Ok(());
4670    }
4671
4672    // Simple assignment
4673    let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
4674    if let Some(v) = val {
4675        match list_index {
4676            Some(idx) => assign_list_index(item, &attr, idx, v)?,
4677            None => {
4678                item.insert(attr, v);
4679            }
4680        }
4681    }
4682
4683    Ok(())
4684}
4685
4686/// Parse a trailing `[N]` list-index suffix off the LHS of a SET assignment.
4687/// Returns the bare attribute reference and the index, or None when the LHS
4688/// is a plain attribute (or a path shape we don't yet support).
4689fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
4690    let path = path.trim();
4691    if !path.ends_with(']') {
4692        return None;
4693    }
4694    let open = path.rfind('[')?;
4695    // Require no further `.` / `[` / `]` inside the bracketed portion and no
4696    // further path segments after — we only handle the single-index case
4697    // `name[N]`, not nested shapes like `a.b[0].c`.
4698    let idx_str = &path[open + 1..path.len() - 1];
4699    let idx: usize = idx_str.parse().ok()?;
4700    let name = &path[..open];
4701    if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
4702        return None;
4703    }
4704    Some((name, idx))
4705}
4706
4707/// Assign a value to a specific index of a `L`-typed attribute. If `idx` is
4708/// within the current list, replaces that slot; if it's at the end, appends.
4709/// AWS rejects writes beyond `len`, so we return a `ValidationException` for
4710/// out-of-range indices and non-list attributes.
4711fn assign_list_index(
4712    item: &mut HashMap<String, AttributeValue>,
4713    attr: &str,
4714    idx: usize,
4715    value: Value,
4716) -> Result<(), AwsServiceError> {
4717    let Some(existing) = item.get_mut(attr) else {
4718        return Err(AwsServiceError::aws_error(
4719            StatusCode::BAD_REQUEST,
4720            "ValidationException",
4721            "The document path provided in the update expression is invalid for update",
4722        ));
4723    };
4724    let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
4725        return Err(AwsServiceError::aws_error(
4726            StatusCode::BAD_REQUEST,
4727            "ValidationException",
4728            "The document path provided in the update expression is invalid for update",
4729        ));
4730    };
4731    if idx < list.len() {
4732        list[idx] = value;
4733    } else if idx == list.len() {
4734        list.push(value);
4735    } else {
4736        return Err(AwsServiceError::aws_error(
4737            StatusCode::BAD_REQUEST,
4738            "ValidationException",
4739            "The document path provided in the update expression is invalid for update",
4740        ));
4741    }
4742    Ok(())
4743}
4744
4745fn resolve_value(
4746    reference: &str,
4747    item: &HashMap<String, AttributeValue>,
4748    expr_attr_names: &HashMap<String, String>,
4749    expr_attr_values: &HashMap<String, Value>,
4750) -> Option<Value> {
4751    let reference = reference.trim();
4752    if reference.starts_with(':') {
4753        expr_attr_values.get(reference).cloned()
4754    } else {
4755        let attr_name = resolve_attr_name(reference, expr_attr_names);
4756        item.get(&attr_name).cloned()
4757    }
4758}
4759
4760fn extract_number(val: &Option<Value>) -> Option<f64> {
4761    val.as_ref()
4762        .and_then(|v| v.get("N"))
4763        .and_then(|n| n.as_str())
4764        .and_then(|s| s.parse().ok())
4765}
4766
4767fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
4768    let mut depth = 0;
4769    for (i, c) in expr.char_indices() {
4770        match c {
4771            '(' => depth += 1,
4772            ')' => depth -= 1,
4773            '+' if depth == 0 && i > 0 => {
4774                return Some((&expr[..i], &expr[i + 1..], true));
4775            }
4776            '-' if depth == 0 && i > 0 => {
4777                return Some((&expr[..i], &expr[i + 1..], false));
4778            }
4779            _ => {}
4780        }
4781    }
4782    None
4783}
4784
4785fn apply_add_assignment(
4786    item: &mut HashMap<String, AttributeValue>,
4787    assignment: &str,
4788    expr_attr_names: &HashMap<String, String>,
4789    expr_attr_values: &HashMap<String, Value>,
4790) -> Result<(), AwsServiceError> {
4791    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4792    if parts.len() != 2 {
4793        return Ok(());
4794    }
4795
4796    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4797    let val_ref = parts[1].trim();
4798    let add_val = expr_attr_values.get(val_ref);
4799
4800    if let Some(add_val) = add_val {
4801        if let Some(existing) = item.get(&attr) {
4802            if let (Some(existing_num), Some(add_num)) = (
4803                extract_number(&Some(existing.clone())),
4804                extract_number(&Some(add_val.clone())),
4805            ) {
4806                let result = existing_num + add_num;
4807                let num_str = if result == result.trunc() {
4808                    format!("{}", result as i64)
4809                } else {
4810                    format!("{result}")
4811                };
4812                item.insert(attr, json!({"N": num_str}));
4813            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
4814                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
4815                    let mut merged: Vec<Value> = existing_set.clone();
4816                    for v in add_set {
4817                        if !merged.contains(v) {
4818                            merged.push(v.clone());
4819                        }
4820                    }
4821                    item.insert(attr, json!({"SS": merged}));
4822                }
4823            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
4824                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
4825                    let mut merged: Vec<Value> = existing_set.clone();
4826                    for v in add_set {
4827                        if !merged.contains(v) {
4828                            merged.push(v.clone());
4829                        }
4830                    }
4831                    item.insert(attr, json!({"NS": merged}));
4832                }
4833            } else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
4834                if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
4835                    let mut merged: Vec<Value> = existing_set.clone();
4836                    for v in add_set {
4837                        if !merged.contains(v) {
4838                            merged.push(v.clone());
4839                        }
4840                    }
4841                    item.insert(attr, json!({"BS": merged}));
4842                }
4843            }
4844        } else {
4845            item.insert(attr, add_val.clone());
4846        }
4847    }
4848
4849    Ok(())
4850}
4851
4852fn apply_delete_assignment(
4853    item: &mut HashMap<String, AttributeValue>,
4854    assignment: &str,
4855    expr_attr_names: &HashMap<String, String>,
4856    expr_attr_values: &HashMap<String, Value>,
4857) -> Result<(), AwsServiceError> {
4858    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4859    if parts.len() != 2 {
4860        return Ok(());
4861    }
4862
4863    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4864    let val_ref = parts[1].trim();
4865    let del_val = expr_attr_values.get(val_ref);
4866
4867    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
4868        if let (Some(existing_set), Some(del_set)) = (
4869            existing.get("SS").and_then(|v| v.as_array()),
4870            del_val.get("SS").and_then(|v| v.as_array()),
4871        ) {
4872            let filtered: Vec<Value> = existing_set
4873                .iter()
4874                .filter(|v| !del_set.contains(v))
4875                .cloned()
4876                .collect();
4877            if filtered.is_empty() {
4878                item.remove(&attr);
4879            } else {
4880                item.insert(attr, json!({"SS": filtered}));
4881            }
4882        } else if let (Some(existing_set), Some(del_set)) = (
4883            existing.get("NS").and_then(|v| v.as_array()),
4884            del_val.get("NS").and_then(|v| v.as_array()),
4885        ) {
4886            let filtered: Vec<Value> = existing_set
4887                .iter()
4888                .filter(|v| !del_set.contains(v))
4889                .cloned()
4890                .collect();
4891            if filtered.is_empty() {
4892                item.remove(&attr);
4893            } else {
4894                item.insert(attr, json!({"NS": filtered}));
4895            }
4896        } else if let (Some(existing_set), Some(del_set)) = (
4897            existing.get("BS").and_then(|v| v.as_array()),
4898            del_val.get("BS").and_then(|v| v.as_array()),
4899        ) {
4900            let filtered: Vec<Value> = existing_set
4901                .iter()
4902                .filter(|v| !del_set.contains(v))
4903                .cloned()
4904                .collect();
4905            if filtered.is_empty() {
4906                item.remove(&attr);
4907            } else {
4908                item.insert(attr, json!({"BS": filtered}));
4909            }
4910        }
4911    }
4912
4913    Ok(())
4914}
4915
4916#[allow(clippy::too_many_arguments)]
4917fn build_table_description_json(
4918    arn: &str,
4919    key_schema: &[KeySchemaElement],
4920    attribute_definitions: &[AttributeDefinition],
4921    provisioned_throughput: &ProvisionedThroughput,
4922    gsi: &[GlobalSecondaryIndex],
4923    lsi: &[LocalSecondaryIndex],
4924    billing_mode: &str,
4925    created_at: chrono::DateTime<chrono::Utc>,
4926    item_count: i64,
4927    size_bytes: i64,
4928    status: &str,
4929) -> Value {
4930    let table_name = arn.rsplit('/').next().unwrap_or("");
4931    let creation_timestamp =
4932        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
4933
4934    let ks: Vec<Value> = key_schema
4935        .iter()
4936        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4937        .collect();
4938
4939    let ad: Vec<Value> = attribute_definitions
4940        .iter()
4941        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
4942        .collect();
4943
4944    let mut desc = json!({
4945        "TableName": table_name,
4946        "TableArn": arn,
4947        "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
4948        "TableStatus": status,
4949        "KeySchema": ks,
4950        "AttributeDefinitions": ad,
4951        "CreationDateTime": creation_timestamp,
4952        "ItemCount": item_count,
4953        "TableSizeBytes": size_bytes,
4954        "BillingModeSummary": { "BillingMode": billing_mode },
4955    });
4956
4957    if billing_mode != "PAY_PER_REQUEST" {
4958        desc["ProvisionedThroughput"] = json!({
4959            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
4960            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
4961            "NumberOfDecreasesToday": 0,
4962        });
4963    } else {
4964        desc["ProvisionedThroughput"] = json!({
4965            "ReadCapacityUnits": 0,
4966            "WriteCapacityUnits": 0,
4967            "NumberOfDecreasesToday": 0,
4968        });
4969    }
4970
4971    if !gsi.is_empty() {
4972        let gsi_json: Vec<Value> = gsi
4973            .iter()
4974            .map(|g| {
4975                let gks: Vec<Value> = g
4976                    .key_schema
4977                    .iter()
4978                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4979                    .collect();
4980                let mut idx = json!({
4981                    "IndexName": g.index_name,
4982                    "KeySchema": gks,
4983                    "Projection": { "ProjectionType": g.projection.projection_type },
4984                    "IndexStatus": "ACTIVE",
4985                    "IndexArn": format!("{arn}/index/{}", g.index_name),
4986                    "ItemCount": 0,
4987                    "IndexSizeBytes": 0,
4988                });
4989                if !g.projection.non_key_attributes.is_empty() {
4990                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
4991                }
4992                if let Some(ref pt) = g.provisioned_throughput {
4993                    idx["ProvisionedThroughput"] = json!({
4994                        "ReadCapacityUnits": pt.read_capacity_units,
4995                        "WriteCapacityUnits": pt.write_capacity_units,
4996                        "NumberOfDecreasesToday": 0,
4997                    });
4998                }
4999                idx
5000            })
5001            .collect();
5002        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
5003    }
5004
5005    if !lsi.is_empty() {
5006        let lsi_json: Vec<Value> = lsi
5007            .iter()
5008            .map(|l| {
5009                let lks: Vec<Value> = l
5010                    .key_schema
5011                    .iter()
5012                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
5013                    .collect();
5014                let mut idx = json!({
5015                    "IndexName": l.index_name,
5016                    "KeySchema": lks,
5017                    "Projection": { "ProjectionType": l.projection.projection_type },
5018                    "IndexArn": format!("{arn}/index/{}", l.index_name),
5019                    "ItemCount": 0,
5020                    "IndexSizeBytes": 0,
5021                });
5022                if !l.projection.non_key_attributes.is_empty() {
5023                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
5024                }
5025                idx
5026            })
5027            .collect();
5028        desc["LocalSecondaryIndexes"] = json!(lsi_json);
5029    }
5030
5031    desc
5032}
5033
5034fn build_table_description(table: &DynamoTable) -> Value {
5035    let mut desc = build_table_description_json(
5036        &table.arn,
5037        &table.key_schema,
5038        &table.attribute_definitions,
5039        &table.provisioned_throughput,
5040        &table.gsi,
5041        &table.lsi,
5042        &table.billing_mode,
5043        table.created_at,
5044        table.item_count,
5045        table.size_bytes,
5046        &table.status,
5047    );
5048
5049    // Add stream specification if streams are enabled
5050    if table.stream_enabled {
5051        if let Some(ref stream_arn) = table.stream_arn {
5052            desc["LatestStreamArn"] = json!(stream_arn);
5053            desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
5054        }
5055        if let Some(ref view_type) = table.stream_view_type {
5056            desc["StreamSpecification"] = json!({
5057                "StreamEnabled": true,
5058                "StreamViewType": view_type,
5059            });
5060        }
5061    }
5062
5063    // Add SSE description
5064    if let Some(ref sse_type) = table.sse_type {
5065        let mut sse_desc = json!({
5066            "Status": "ENABLED",
5067            "SSEType": sse_type,
5068        });
5069        if let Some(ref key_arn) = table.sse_kms_key_arn {
5070            sse_desc["KMSMasterKeyArn"] = json!(key_arn);
5071        }
5072        desc["SSEDescription"] = sse_desc;
5073    } else {
5074        // Default: AWS owned key encryption (always enabled in real AWS)
5075        desc["SSEDescription"] = json!({
5076            "Status": "ENABLED",
5077            "SSEType": "AES256",
5078        });
5079    }
5080
5081    desc
5082}
5083
5084fn execute_partiql_statement(
5085    state: &SharedDynamoDbState,
5086    statement: &str,
5087    parameters: &[Value],
5088) -> Result<AwsResponse, AwsServiceError> {
5089    let trimmed = statement.trim();
5090    let upper = trimmed.to_ascii_uppercase();
5091
5092    if upper.starts_with("SELECT") {
5093        execute_partiql_select(state, trimmed, parameters)
5094    } else if upper.starts_with("INSERT") {
5095        execute_partiql_insert(state, trimmed, parameters)
5096    } else if upper.starts_with("UPDATE") {
5097        execute_partiql_update(state, trimmed, parameters)
5098    } else if upper.starts_with("DELETE") {
5099        execute_partiql_delete(state, trimmed, parameters)
5100    } else {
5101        Err(AwsServiceError::aws_error(
5102            StatusCode::BAD_REQUEST,
5103            "ValidationException",
5104            format!("Unsupported PartiQL statement: {trimmed}"),
5105        ))
5106    }
5107}
5108
5109/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
5110fn execute_partiql_select(
5111    state: &SharedDynamoDbState,
5112    statement: &str,
5113    parameters: &[Value],
5114) -> Result<AwsResponse, AwsServiceError> {
5115    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
5116    let upper = statement.to_ascii_uppercase();
5117    let from_pos = upper.find("FROM").ok_or_else(|| {
5118        AwsServiceError::aws_error(
5119            StatusCode::BAD_REQUEST,
5120            "ValidationException",
5121            "Invalid SELECT statement: missing FROM",
5122        )
5123    })?;
5124
5125    let after_from = statement[from_pos + 4..].trim();
5126    let (table_name, rest) = parse_partiql_table_name(after_from);
5127
5128    let state = state.read();
5129    let table = get_table(&state.tables, &table_name)?;
5130
5131    let rest_upper = rest.trim().to_ascii_uppercase();
5132    if rest_upper.starts_with("WHERE") {
5133        let where_clause = rest.trim()[5..].trim();
5134        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
5135        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
5136        DynamoDbService::ok_json(json!({ "Items": items }))
5137    } else {
5138        // No WHERE, return all items
5139        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
5140        DynamoDbService::ok_json(json!({ "Items": items }))
5141    }
5142}
5143
5144fn execute_partiql_insert(
5145    state: &SharedDynamoDbState,
5146    statement: &str,
5147    parameters: &[Value],
5148) -> Result<AwsResponse, AwsServiceError> {
5149    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
5150    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
5151    let upper = statement.to_ascii_uppercase();
5152    let into_pos = upper.find("INTO").ok_or_else(|| {
5153        AwsServiceError::aws_error(
5154            StatusCode::BAD_REQUEST,
5155            "ValidationException",
5156            "Invalid INSERT statement: missing INTO",
5157        )
5158    })?;
5159
5160    let after_into = statement[into_pos + 4..].trim();
5161    let (table_name, rest) = parse_partiql_table_name(after_into);
5162
5163    let rest_upper = rest.trim().to_ascii_uppercase();
5164    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
5165        AwsServiceError::aws_error(
5166            StatusCode::BAD_REQUEST,
5167            "ValidationException",
5168            "Invalid INSERT statement: missing VALUE",
5169        )
5170    })?;
5171
5172    let value_str = rest.trim()[value_pos + 5..].trim();
5173    let item = parse_partiql_value_object(value_str, parameters)?;
5174
5175    let mut state = state.write();
5176    let table = get_table_mut(&mut state.tables, &table_name)?;
5177    let key = extract_key(table, &item);
5178    if table.find_item_index(&key).is_some() {
5179        // DynamoDB PartiQL INSERT fails if item exists
5180        return Err(AwsServiceError::aws_error(
5181            StatusCode::BAD_REQUEST,
5182            "DuplicateItemException",
5183            "Duplicate primary key exists in table",
5184        ));
5185    } else {
5186        table.items.push(item);
5187    }
5188    table.recalculate_stats();
5189
5190    DynamoDbService::ok_json(json!({}))
5191}
5192
5193fn execute_partiql_update(
5194    state: &SharedDynamoDbState,
5195    statement: &str,
5196    parameters: &[Value],
5197) -> Result<AwsResponse, AwsServiceError> {
5198    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
5199    // or: UPDATE "tablename" SET attr=? WHERE pk=?
5200    let after_update = statement[6..].trim(); // skip "UPDATE"
5201    let (table_name, rest) = parse_partiql_table_name(after_update);
5202
5203    let rest_upper = rest.trim().to_ascii_uppercase();
5204    let set_pos = rest_upper.find("SET").ok_or_else(|| {
5205        AwsServiceError::aws_error(
5206            StatusCode::BAD_REQUEST,
5207            "ValidationException",
5208            "Invalid UPDATE statement: missing SET",
5209        )
5210    })?;
5211
5212    let after_set = rest.trim()[set_pos + 3..].trim();
5213
5214    // Split on WHERE
5215    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
5216    let (set_clause, where_clause) = if let Some(wp) = where_pos {
5217        (&after_set[..wp], after_set[wp + 5..].trim())
5218    } else {
5219        (after_set, "")
5220    };
5221
5222    let mut state = state.write();
5223    let table = get_table_mut(&mut state.tables, &table_name)?;
5224
5225    let matched_indices = if !where_clause.is_empty() {
5226        find_partiql_where_indices(table, where_clause, parameters)?
5227    } else {
5228        (0..table.items.len()).collect()
5229    };
5230
5231    // Parse SET assignments: attr=value, attr2=value2
5232    let param_offset = count_params_in_str(where_clause);
5233    let assignments: Vec<&str> = set_clause.split(',').collect();
5234    for idx in &matched_indices {
5235        let mut local_offset = param_offset;
5236        for assignment in &assignments {
5237            let assignment = assignment.trim();
5238            if let Some((attr, val_str)) = assignment.split_once('=') {
5239                let attr = attr.trim().trim_matches('"');
5240                let val_str = val_str.trim();
5241                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
5242                if let Some(v) = value {
5243                    table.items[*idx].insert(attr.to_string(), v);
5244                }
5245            }
5246        }
5247    }
5248    table.recalculate_stats();
5249
5250    DynamoDbService::ok_json(json!({}))
5251}
5252
5253fn execute_partiql_delete(
5254    state: &SharedDynamoDbState,
5255    statement: &str,
5256    parameters: &[Value],
5257) -> Result<AwsResponse, AwsServiceError> {
5258    // Pattern: DELETE FROM "tablename" WHERE pk='val'
5259    let upper = statement.to_ascii_uppercase();
5260    let from_pos = upper.find("FROM").ok_or_else(|| {
5261        AwsServiceError::aws_error(
5262            StatusCode::BAD_REQUEST,
5263            "ValidationException",
5264            "Invalid DELETE statement: missing FROM",
5265        )
5266    })?;
5267
5268    let after_from = statement[from_pos + 4..].trim();
5269    let (table_name, rest) = parse_partiql_table_name(after_from);
5270
5271    let rest_upper = rest.trim().to_ascii_uppercase();
5272    if !rest_upper.starts_with("WHERE") {
5273        return Err(AwsServiceError::aws_error(
5274            StatusCode::BAD_REQUEST,
5275            "ValidationException",
5276            "DELETE requires a WHERE clause",
5277        ));
5278    }
5279    let where_clause = rest.trim()[5..].trim();
5280
5281    let mut state = state.write();
5282    let table = get_table_mut(&mut state.tables, &table_name)?;
5283
5284    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
5285    // Remove from highest index first to avoid invalidating lower indices
5286    indices.sort_unstable();
5287    indices.reverse();
5288    for idx in indices {
5289        table.items.remove(idx);
5290    }
5291    table.recalculate_stats();
5292
5293    DynamoDbService::ok_json(json!({}))
5294}
5295
5296/// Parse a table name that may be quoted with double quotes.
5297/// Returns (table_name, rest_of_string).
5298fn parse_partiql_table_name(s: &str) -> (String, &str) {
5299    let s = s.trim();
5300    if let Some(stripped) = s.strip_prefix('"') {
5301        // Quoted name
5302        if let Some(end) = stripped.find('"') {
5303            let name = &stripped[..end];
5304            let rest = &stripped[end + 1..];
5305            (name.to_string(), rest)
5306        } else {
5307            let end = s.find(' ').unwrap_or(s.len());
5308            (s[..end].trim_matches('"').to_string(), &s[end..])
5309        }
5310    } else {
5311        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
5312        (s[..end].to_string(), &s[end..])
5313    }
5314}
5315
5316/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
5317/// Returns matching items.
5318fn evaluate_partiql_where<'a>(
5319    table: &'a DynamoTable,
5320    where_clause: &str,
5321    parameters: &[Value],
5322) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
5323    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
5324    Ok(indices.iter().map(|i| &table.items[*i]).collect())
5325}
5326
5327fn find_partiql_where_indices(
5328    table: &DynamoTable,
5329    where_clause: &str,
5330    parameters: &[Value],
5331) -> Result<Vec<usize>, AwsServiceError> {
5332    // Support: col = 'val' AND col2 = 'val2'  or  col = ? AND col2 = ?
5333    // Case-insensitive AND splitting
5334    let upper = where_clause.to_uppercase();
5335    let conditions = if upper.contains(" AND ") {
5336        // Find positions of " AND " case-insensitively and split
5337        let mut parts = Vec::new();
5338        let mut last = 0;
5339        for (i, _) in upper.match_indices(" AND ") {
5340            parts.push(where_clause[last..i].trim());
5341            last = i + 5;
5342        }
5343        parts.push(where_clause[last..].trim());
5344        parts
5345    } else {
5346        vec![where_clause.trim()]
5347    };
5348
5349    let mut param_idx = 0usize;
5350    let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
5351
5352    for cond in &conditions {
5353        let cond = cond.trim();
5354        if let Some((left, right)) = cond.split_once('=') {
5355            let attr = left.trim().trim_matches('"').to_string();
5356            let val_str = right.trim();
5357            let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
5358            if let Some(v) = value {
5359                parsed_conditions.push((attr, v));
5360            }
5361        }
5362    }
5363
5364    let mut indices = Vec::new();
5365    for (i, item) in table.items.iter().enumerate() {
5366        let all_match = parsed_conditions
5367            .iter()
5368            .all(|(attr, expected)| item.get(attr) == Some(expected));
5369        if all_match {
5370            indices.push(i);
5371        }
5372    }
5373
5374    Ok(indices)
5375}
5376
5377/// Parse a PartiQL literal value. Supports:
5378/// - 'string' -> {"S": "string"}
5379/// - 123 -> {"N": "123"}
5380/// - ? -> parameter from list
5381fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
5382    let s = s.trim();
5383    if s == "?" {
5384        let idx = *param_idx;
5385        *param_idx += 1;
5386        parameters.get(idx).cloned()
5387    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
5388        let inner = &s[1..s.len() - 1];
5389        Some(json!({"S": inner}))
5390    } else if let Ok(n) = s.parse::<f64>() {
5391        let num_str = if n == n.trunc() {
5392            format!("{}", n as i64)
5393        } else {
5394            format!("{n}")
5395        };
5396        Some(json!({"N": num_str}))
5397    } else {
5398        None
5399    }
5400}
5401
5402/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
5403fn parse_partiql_value_object(
5404    s: &str,
5405    parameters: &[Value],
5406) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
5407    let s = s.trim();
5408    let inner = s
5409        .strip_prefix('{')
5410        .and_then(|s| s.strip_suffix('}'))
5411        .ok_or_else(|| {
5412            AwsServiceError::aws_error(
5413                StatusCode::BAD_REQUEST,
5414                "ValidationException",
5415                "Invalid VALUE: expected object literal",
5416            )
5417        })?;
5418
5419    let mut item = HashMap::new();
5420    let mut param_idx = 0usize;
5421
5422    // Simple comma-separated key:value parsing
5423    for pair in split_partiql_pairs(inner) {
5424        let pair = pair.trim();
5425        if pair.is_empty() {
5426            continue;
5427        }
5428        if let Some((key_part, val_part)) = pair.split_once(':') {
5429            let key = key_part
5430                .trim()
5431                .trim_matches('\'')
5432                .trim_matches('"')
5433                .to_string();
5434            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
5435                item.insert(key, val);
5436            }
5437        }
5438    }
5439
5440    Ok(item)
5441}
5442
5443/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
5444fn split_partiql_pairs(s: &str) -> Vec<&str> {
5445    let mut parts = Vec::new();
5446    let mut start = 0;
5447    let mut depth = 0;
5448    let mut in_quote = false;
5449
5450    for (i, c) in s.char_indices() {
5451        match c {
5452            '\'' if !in_quote => in_quote = true,
5453            '\'' if in_quote => in_quote = false,
5454            '{' if !in_quote => depth += 1,
5455            '}' if !in_quote => depth -= 1,
5456            ',' if !in_quote && depth == 0 => {
5457                parts.push(&s[start..i]);
5458                start = i + 1;
5459            }
5460            _ => {}
5461        }
5462    }
5463    parts.push(&s[start..]);
5464    parts
5465}
5466
5467/// Count ? parameters in a string.
5468fn count_params_in_str(s: &str) -> usize {
5469    s.chars().filter(|c| *c == '?').count()
5470}
5471
5472#[cfg(test)]
5473mod tests {
5474    use super::*;
5475    use serde_json::json;
5476
5477    #[test]
5478    fn test_parse_update_clauses_set() {
5479        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
5480        assert_eq!(clauses.len(), 1);
5481        assert_eq!(clauses[0].0, "SET");
5482        assert_eq!(clauses[0].1.len(), 2);
5483    }
5484
5485    #[test]
5486    fn test_parse_update_clauses_set_and_remove() {
5487        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
5488        assert_eq!(clauses.len(), 2);
5489        assert_eq!(clauses[0].0, "SET");
5490        assert_eq!(clauses[1].0, "REMOVE");
5491    }
5492
5493    #[test]
5494    fn test_evaluate_key_condition_simple() {
5495        let mut item = HashMap::new();
5496        item.insert("pk".to_string(), json!({"S": "user1"}));
5497        item.insert("sk".to_string(), json!({"S": "order1"}));
5498
5499        let mut expr_values = HashMap::new();
5500        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
5501
5502        assert!(evaluate_key_condition(
5503            "pk = :pk",
5504            &item,
5505            "pk",
5506            Some("sk"),
5507            &HashMap::new(),
5508            &expr_values,
5509        ));
5510    }
5511
5512    #[test]
5513    fn test_compare_attribute_values_numbers() {
5514        let a = json!({"N": "10"});
5515        let b = json!({"N": "20"});
5516        assert_eq!(
5517            compare_attribute_values(Some(&a), Some(&b)),
5518            std::cmp::Ordering::Less
5519        );
5520    }
5521
5522    #[test]
5523    fn test_compare_attribute_values_strings() {
5524        let a = json!({"S": "apple"});
5525        let b = json!({"S": "banana"});
5526        assert_eq!(
5527            compare_attribute_values(Some(&a), Some(&b)),
5528            std::cmp::Ordering::Less
5529        );
5530    }
5531
5532    #[test]
5533    fn test_split_on_and() {
5534        let parts = split_on_and("pk = :pk AND sk > :sk");
5535        assert_eq!(parts.len(), 2);
5536        assert_eq!(parts[0].trim(), "pk = :pk");
5537        assert_eq!(parts[1].trim(), "sk > :sk");
5538    }
5539
5540    #[test]
5541    fn test_split_on_and_respects_parentheses() {
5542        // Before fix: split_on_and would split inside the parens
5543        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
5544        // Should NOT split on the AND inside parentheses
5545        assert_eq!(parts.len(), 1);
5546        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
5547    }
5548
5549    #[test]
5550    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
5551        // (a AND b) OR c — should match when c is true but a is false
5552        let mut item = HashMap::new();
5553        item.insert("x".to_string(), json!({"S": "no"}));
5554        item.insert("y".to_string(), json!({"S": "no"}));
5555        item.insert("z".to_string(), json!({"S": "yes"}));
5556
5557        let mut expr_values = HashMap::new();
5558        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
5559
5560        // x=yes AND y=yes => false, but z=yes => true => overall true
5561        let result = evaluate_filter_expression(
5562            "(x = :yes AND y = :yes) OR z = :yes",
5563            &item,
5564            &HashMap::new(),
5565            &expr_values,
5566        );
5567        assert!(result, "should match because z = :yes is true");
5568
5569        // x=yes AND y=yes => false, z=yes => false => overall false
5570        let mut item2 = HashMap::new();
5571        item2.insert("x".to_string(), json!({"S": "no"}));
5572        item2.insert("y".to_string(), json!({"S": "no"}));
5573        item2.insert("z".to_string(), json!({"S": "no"}));
5574
5575        let result2 = evaluate_filter_expression(
5576            "(x = :yes AND y = :yes) OR z = :yes",
5577            &item2,
5578            &HashMap::new(),
5579            &expr_values,
5580        );
5581        assert!(!result2, "should not match because nothing is true");
5582    }
5583
5584    #[test]
5585    fn test_project_item_nested_path() {
5586        // Item with a list attribute containing maps
5587        let mut item = HashMap::new();
5588        item.insert("pk".to_string(), json!({"S": "key1"}));
5589        item.insert(
5590            "data".to_string(),
5591            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
5592        );
5593
5594        let body = json!({
5595            "ProjectionExpression": "data[0].name"
5596        });
5597
5598        let projected = project_item(&item, &body);
5599        // Should contain data[0].name = "Alice", not the entire data[0] element
5600        let name = projected
5601            .get("data")
5602            .and_then(|v| v.get("L"))
5603            .and_then(|v| v.get(0))
5604            .and_then(|v| v.get("M"))
5605            .and_then(|v| v.get("name"))
5606            .and_then(|v| v.get("S"))
5607            .and_then(|v| v.as_str());
5608        assert_eq!(name, Some("Alice"));
5609
5610        // Should NOT contain the "age" field
5611        let age = projected
5612            .get("data")
5613            .and_then(|v| v.get("L"))
5614            .and_then(|v| v.get(0))
5615            .and_then(|v| v.get("M"))
5616            .and_then(|v| v.get("age"));
5617        assert!(age.is_none(), "age should not be present in projection");
5618    }
5619
5620    #[test]
5621    fn test_resolve_nested_path_map() {
5622        let mut item = HashMap::new();
5623        item.insert(
5624            "info".to_string(),
5625            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
5626        );
5627
5628        let result = resolve_nested_path(&item, "info.address.city");
5629        assert_eq!(result, Some(json!({"S": "NYC"})));
5630    }
5631
5632    #[test]
5633    fn test_resolve_nested_path_list_then_map() {
5634        let mut item = HashMap::new();
5635        item.insert(
5636            "items".to_string(),
5637            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
5638        );
5639
5640        let result = resolve_nested_path(&item, "items[0].sku");
5641        assert_eq!(result, Some(json!({"S": "ABC"})));
5642    }
5643
5644    // -- Integration-style tests using DynamoDbService --
5645
5646    use crate::state::SharedDynamoDbState;
5647    use parking_lot::RwLock;
5648    use std::sync::Arc;
5649
5650    fn make_service() -> DynamoDbService {
5651        let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
5652            "123456789012",
5653            "us-east-1",
5654        )));
5655        DynamoDbService::new(state)
5656    }
5657
5658    fn make_request(action: &str, body: Value) -> AwsRequest {
5659        AwsRequest {
5660            service: "dynamodb".to_string(),
5661            action: action.to_string(),
5662            region: "us-east-1".to_string(),
5663            account_id: "123456789012".to_string(),
5664            request_id: "test-id".to_string(),
5665            headers: http::HeaderMap::new(),
5666            query_params: HashMap::new(),
5667            body: serde_json::to_vec(&body).unwrap().into(),
5668            path_segments: vec![],
5669            raw_path: "/".to_string(),
5670            raw_query: String::new(),
5671            method: http::Method::POST,
5672            is_query_protocol: false,
5673            access_key_id: None,
5674        }
5675    }
5676
5677    fn create_test_table(svc: &DynamoDbService) {
5678        let req = make_request(
5679            "CreateTable",
5680            json!({
5681                "TableName": "test-table",
5682                "KeySchema": [
5683                    { "AttributeName": "pk", "KeyType": "HASH" }
5684                ],
5685                "AttributeDefinitions": [
5686                    { "AttributeName": "pk", "AttributeType": "S" }
5687                ],
5688                "BillingMode": "PAY_PER_REQUEST"
5689            }),
5690        );
5691        svc.create_table(&req).unwrap();
5692    }
5693
5694    #[test]
5695    fn delete_item_return_values_all_old() {
5696        let svc = make_service();
5697        create_test_table(&svc);
5698
5699        // Put an item
5700        let req = make_request(
5701            "PutItem",
5702            json!({
5703                "TableName": "test-table",
5704                "Item": {
5705                    "pk": { "S": "key1" },
5706                    "name": { "S": "Alice" },
5707                    "age": { "N": "30" }
5708                }
5709            }),
5710        );
5711        svc.put_item(&req).unwrap();
5712
5713        // Delete with ReturnValues=ALL_OLD
5714        let req = make_request(
5715            "DeleteItem",
5716            json!({
5717                "TableName": "test-table",
5718                "Key": { "pk": { "S": "key1" } },
5719                "ReturnValues": "ALL_OLD"
5720            }),
5721        );
5722        let resp = svc.delete_item(&req).unwrap();
5723        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5724
5725        // Verify the old item is returned
5726        let attrs = &body["Attributes"];
5727        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
5728        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
5729        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
5730
5731        // Verify the item is actually deleted
5732        let req = make_request(
5733            "GetItem",
5734            json!({
5735                "TableName": "test-table",
5736                "Key": { "pk": { "S": "key1" } }
5737            }),
5738        );
5739        let resp = svc.get_item(&req).unwrap();
5740        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5741        assert!(body.get("Item").is_none(), "item should be deleted");
5742    }
5743
5744    #[test]
5745    fn transact_get_items_returns_existing_and_missing() {
5746        let svc = make_service();
5747        create_test_table(&svc);
5748
5749        // Put one item
5750        let req = make_request(
5751            "PutItem",
5752            json!({
5753                "TableName": "test-table",
5754                "Item": {
5755                    "pk": { "S": "exists" },
5756                    "val": { "S": "hello" }
5757                }
5758            }),
5759        );
5760        svc.put_item(&req).unwrap();
5761
5762        let req = make_request(
5763            "TransactGetItems",
5764            json!({
5765                "TransactItems": [
5766                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
5767                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
5768                ]
5769            }),
5770        );
5771        let resp = svc.transact_get_items(&req).unwrap();
5772        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5773        let responses = body["Responses"].as_array().unwrap();
5774        assert_eq!(responses.len(), 2);
5775        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
5776        assert!(responses[1].get("Item").is_none());
5777    }
5778
5779    #[test]
5780    fn transact_write_items_put_and_delete() {
5781        let svc = make_service();
5782        create_test_table(&svc);
5783
5784        // Put initial item
5785        let req = make_request(
5786            "PutItem",
5787            json!({
5788                "TableName": "test-table",
5789                "Item": {
5790                    "pk": { "S": "to-delete" },
5791                    "val": { "S": "bye" }
5792                }
5793            }),
5794        );
5795        svc.put_item(&req).unwrap();
5796
5797        // TransactWrite: put new + delete existing
5798        let req = make_request(
5799            "TransactWriteItems",
5800            json!({
5801                "TransactItems": [
5802                    {
5803                        "Put": {
5804                            "TableName": "test-table",
5805                            "Item": {
5806                                "pk": { "S": "new-item" },
5807                                "val": { "S": "hi" }
5808                            }
5809                        }
5810                    },
5811                    {
5812                        "Delete": {
5813                            "TableName": "test-table",
5814                            "Key": { "pk": { "S": "to-delete" } }
5815                        }
5816                    }
5817                ]
5818            }),
5819        );
5820        let resp = svc.transact_write_items(&req).unwrap();
5821        assert_eq!(resp.status, StatusCode::OK);
5822
5823        // Verify new item exists
5824        let req = make_request(
5825            "GetItem",
5826            json!({
5827                "TableName": "test-table",
5828                "Key": { "pk": { "S": "new-item" } }
5829            }),
5830        );
5831        let resp = svc.get_item(&req).unwrap();
5832        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5833        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
5834
5835        // Verify deleted item is gone
5836        let req = make_request(
5837            "GetItem",
5838            json!({
5839                "TableName": "test-table",
5840                "Key": { "pk": { "S": "to-delete" } }
5841            }),
5842        );
5843        let resp = svc.get_item(&req).unwrap();
5844        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5845        assert!(body.get("Item").is_none());
5846    }
5847
5848    #[test]
5849    fn transact_write_items_condition_check_failure() {
5850        let svc = make_service();
5851        create_test_table(&svc);
5852
5853        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
5854        let req = make_request(
5855            "TransactWriteItems",
5856            json!({
5857                "TransactItems": [
5858                    {
5859                        "ConditionCheck": {
5860                            "TableName": "test-table",
5861                            "Key": { "pk": { "S": "nonexistent" } },
5862                            "ConditionExpression": "attribute_exists(pk)"
5863                        }
5864                    }
5865                ]
5866            }),
5867        );
5868        let resp = svc.transact_write_items(&req).unwrap();
5869        // Should be a 400 error response
5870        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
5871        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5872        assert_eq!(
5873            body["__type"].as_str().unwrap(),
5874            "TransactionCanceledException"
5875        );
5876        assert!(body["CancellationReasons"].as_array().is_some());
5877    }
5878
5879    #[test]
5880    fn update_and_describe_time_to_live() {
5881        let svc = make_service();
5882        create_test_table(&svc);
5883
5884        // Enable TTL
5885        let req = make_request(
5886            "UpdateTimeToLive",
5887            json!({
5888                "TableName": "test-table",
5889                "TimeToLiveSpecification": {
5890                    "AttributeName": "ttl",
5891                    "Enabled": true
5892                }
5893            }),
5894        );
5895        let resp = svc.update_time_to_live(&req).unwrap();
5896        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5897        assert_eq!(
5898            body["TimeToLiveSpecification"]["AttributeName"]
5899                .as_str()
5900                .unwrap(),
5901            "ttl"
5902        );
5903        assert!(body["TimeToLiveSpecification"]["Enabled"]
5904            .as_bool()
5905            .unwrap());
5906
5907        // Describe TTL
5908        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5909        let resp = svc.describe_time_to_live(&req).unwrap();
5910        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5911        assert_eq!(
5912            body["TimeToLiveDescription"]["TimeToLiveStatus"]
5913                .as_str()
5914                .unwrap(),
5915            "ENABLED"
5916        );
5917        assert_eq!(
5918            body["TimeToLiveDescription"]["AttributeName"]
5919                .as_str()
5920                .unwrap(),
5921            "ttl"
5922        );
5923
5924        // Disable TTL
5925        let req = make_request(
5926            "UpdateTimeToLive",
5927            json!({
5928                "TableName": "test-table",
5929                "TimeToLiveSpecification": {
5930                    "AttributeName": "ttl",
5931                    "Enabled": false
5932                }
5933            }),
5934        );
5935        svc.update_time_to_live(&req).unwrap();
5936
5937        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5938        let resp = svc.describe_time_to_live(&req).unwrap();
5939        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5940        assert_eq!(
5941            body["TimeToLiveDescription"]["TimeToLiveStatus"]
5942                .as_str()
5943                .unwrap(),
5944            "DISABLED"
5945        );
5946    }
5947
5948    #[test]
5949    fn resource_policy_lifecycle() {
5950        let svc = make_service();
5951        create_test_table(&svc);
5952
5953        let table_arn = {
5954            let state = svc.state.read();
5955            state.tables.get("test-table").unwrap().arn.clone()
5956        };
5957
5958        // Put policy
5959        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
5960        let req = make_request(
5961            "PutResourcePolicy",
5962            json!({
5963                "ResourceArn": table_arn,
5964                "Policy": policy_doc
5965            }),
5966        );
5967        let resp = svc.put_resource_policy(&req).unwrap();
5968        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5969        assert!(body["RevisionId"].as_str().is_some());
5970
5971        // Get policy
5972        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5973        let resp = svc.get_resource_policy(&req).unwrap();
5974        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5975        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
5976
5977        // Delete policy
5978        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
5979        svc.delete_resource_policy(&req).unwrap();
5980
5981        // Get should return null now
5982        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5983        let resp = svc.get_resource_policy(&req).unwrap();
5984        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5985        assert!(body["Policy"].is_null());
5986    }
5987
5988    #[test]
5989    fn describe_endpoints() {
5990        let svc = make_service();
5991        let req = make_request("DescribeEndpoints", json!({}));
5992        let resp = svc.describe_endpoints(&req).unwrap();
5993        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5994        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
5995    }
5996
5997    #[test]
5998    fn describe_limits() {
5999        let svc = make_service();
6000        let req = make_request("DescribeLimits", json!({}));
6001        let resp = svc.describe_limits(&req).unwrap();
6002        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6003        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
6004    }
6005
6006    #[test]
6007    fn backup_lifecycle() {
6008        let svc = make_service();
6009        create_test_table(&svc);
6010
6011        // Create backup
6012        let req = make_request(
6013            "CreateBackup",
6014            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
6015        );
6016        let resp = svc.create_backup(&req).unwrap();
6017        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6018        let backup_arn = body["BackupDetails"]["BackupArn"]
6019            .as_str()
6020            .unwrap()
6021            .to_string();
6022        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
6023
6024        // Describe backup
6025        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
6026        let resp = svc.describe_backup(&req).unwrap();
6027        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6028        assert_eq!(
6029            body["BackupDescription"]["BackupDetails"]["BackupName"],
6030            "my-backup"
6031        );
6032
6033        // List backups
6034        let req = make_request("ListBackups", json!({}));
6035        let resp = svc.list_backups(&req).unwrap();
6036        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6037        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
6038
6039        // Restore from backup
6040        let req = make_request(
6041            "RestoreTableFromBackup",
6042            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
6043        );
6044        svc.restore_table_from_backup(&req).unwrap();
6045
6046        // Verify restored table exists
6047        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
6048        let resp = svc.describe_table(&req).unwrap();
6049        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6050        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6051
6052        // Delete backup
6053        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
6054        svc.delete_backup(&req).unwrap();
6055
6056        // List should be empty
6057        let req = make_request("ListBackups", json!({}));
6058        let resp = svc.list_backups(&req).unwrap();
6059        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6060        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
6061    }
6062
6063    #[test]
6064    fn continuous_backups() {
6065        let svc = make_service();
6066        create_test_table(&svc);
6067
6068        // Initially disabled
6069        let req = make_request(
6070            "DescribeContinuousBackups",
6071            json!({ "TableName": "test-table" }),
6072        );
6073        let resp = svc.describe_continuous_backups(&req).unwrap();
6074        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6075        assert_eq!(
6076            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
6077                ["PointInTimeRecoveryStatus"],
6078            "DISABLED"
6079        );
6080
6081        // Enable
6082        let req = make_request(
6083            "UpdateContinuousBackups",
6084            json!({
6085                "TableName": "test-table",
6086                "PointInTimeRecoverySpecification": {
6087                    "PointInTimeRecoveryEnabled": true
6088                }
6089            }),
6090        );
6091        svc.update_continuous_backups(&req).unwrap();
6092
6093        // Verify
6094        let req = make_request(
6095            "DescribeContinuousBackups",
6096            json!({ "TableName": "test-table" }),
6097        );
6098        let resp = svc.describe_continuous_backups(&req).unwrap();
6099        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6100        assert_eq!(
6101            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
6102                ["PointInTimeRecoveryStatus"],
6103            "ENABLED"
6104        );
6105    }
6106
6107    #[test]
6108    fn restore_table_to_point_in_time() {
6109        let svc = make_service();
6110        create_test_table(&svc);
6111
6112        let req = make_request(
6113            "RestoreTableToPointInTime",
6114            json!({
6115                "SourceTableName": "test-table",
6116                "TargetTableName": "pitr-restored"
6117            }),
6118        );
6119        svc.restore_table_to_point_in_time(&req).unwrap();
6120
6121        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
6122        let resp = svc.describe_table(&req).unwrap();
6123        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6124        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6125    }
6126
6127    #[test]
6128    fn global_table_lifecycle() {
6129        let svc = make_service();
6130
6131        // Create global table
6132        let req = make_request(
6133            "CreateGlobalTable",
6134            json!({
6135                "GlobalTableName": "my-global",
6136                "ReplicationGroup": [
6137                    { "RegionName": "us-east-1" },
6138                    { "RegionName": "eu-west-1" }
6139                ]
6140            }),
6141        );
6142        let resp = svc.create_global_table(&req).unwrap();
6143        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6144        assert_eq!(
6145            body["GlobalTableDescription"]["GlobalTableStatus"],
6146            "ACTIVE"
6147        );
6148
6149        // Describe
6150        let req = make_request(
6151            "DescribeGlobalTable",
6152            json!({ "GlobalTableName": "my-global" }),
6153        );
6154        let resp = svc.describe_global_table(&req).unwrap();
6155        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6156        assert_eq!(
6157            body["GlobalTableDescription"]["ReplicationGroup"]
6158                .as_array()
6159                .unwrap()
6160                .len(),
6161            2
6162        );
6163
6164        // List
6165        let req = make_request("ListGlobalTables", json!({}));
6166        let resp = svc.list_global_tables(&req).unwrap();
6167        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6168        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
6169
6170        // Update - add a region
6171        let req = make_request(
6172            "UpdateGlobalTable",
6173            json!({
6174                "GlobalTableName": "my-global",
6175                "ReplicaUpdates": [
6176                    { "Create": { "RegionName": "ap-southeast-1" } }
6177                ]
6178            }),
6179        );
6180        let resp = svc.update_global_table(&req).unwrap();
6181        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6182        assert_eq!(
6183            body["GlobalTableDescription"]["ReplicationGroup"]
6184                .as_array()
6185                .unwrap()
6186                .len(),
6187            3
6188        );
6189
6190        // Describe settings
6191        let req = make_request(
6192            "DescribeGlobalTableSettings",
6193            json!({ "GlobalTableName": "my-global" }),
6194        );
6195        let resp = svc.describe_global_table_settings(&req).unwrap();
6196        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6197        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
6198
6199        // Update settings (no-op, just verify no error)
6200        let req = make_request(
6201            "UpdateGlobalTableSettings",
6202            json!({ "GlobalTableName": "my-global" }),
6203        );
6204        svc.update_global_table_settings(&req).unwrap();
6205    }
6206
6207    #[test]
6208    fn table_replica_auto_scaling() {
6209        let svc = make_service();
6210        create_test_table(&svc);
6211
6212        let req = make_request(
6213            "DescribeTableReplicaAutoScaling",
6214            json!({ "TableName": "test-table" }),
6215        );
6216        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
6217        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6218        assert_eq!(
6219            body["TableAutoScalingDescription"]["TableName"],
6220            "test-table"
6221        );
6222
6223        let req = make_request(
6224            "UpdateTableReplicaAutoScaling",
6225            json!({ "TableName": "test-table" }),
6226        );
6227        svc.update_table_replica_auto_scaling(&req).unwrap();
6228    }
6229
6230    #[test]
6231    fn kinesis_streaming_lifecycle() {
6232        let svc = make_service();
6233        create_test_table(&svc);
6234
6235        // Enable
6236        let req = make_request(
6237            "EnableKinesisStreamingDestination",
6238            json!({
6239                "TableName": "test-table",
6240                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
6241            }),
6242        );
6243        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
6244        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6245        assert_eq!(body["DestinationStatus"], "ACTIVE");
6246
6247        // Describe
6248        let req = make_request(
6249            "DescribeKinesisStreamingDestination",
6250            json!({ "TableName": "test-table" }),
6251        );
6252        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
6253        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6254        assert_eq!(
6255            body["KinesisDataStreamDestinations"]
6256                .as_array()
6257                .unwrap()
6258                .len(),
6259            1
6260        );
6261
6262        // Update
6263        let req = make_request(
6264            "UpdateKinesisStreamingDestination",
6265            json!({
6266                "TableName": "test-table",
6267                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
6268                "UpdateKinesisStreamingConfiguration": {
6269                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
6270                }
6271            }),
6272        );
6273        svc.update_kinesis_streaming_destination(&req).unwrap();
6274
6275        // Disable
6276        let req = make_request(
6277            "DisableKinesisStreamingDestination",
6278            json!({
6279                "TableName": "test-table",
6280                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
6281            }),
6282        );
6283        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
6284        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6285        assert_eq!(body["DestinationStatus"], "DISABLED");
6286    }
6287
6288    #[test]
6289    fn contributor_insights_lifecycle() {
6290        let svc = make_service();
6291        create_test_table(&svc);
6292
6293        // Initially disabled
6294        let req = make_request(
6295            "DescribeContributorInsights",
6296            json!({ "TableName": "test-table" }),
6297        );
6298        let resp = svc.describe_contributor_insights(&req).unwrap();
6299        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6300        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6301
6302        // Enable
6303        let req = make_request(
6304            "UpdateContributorInsights",
6305            json!({
6306                "TableName": "test-table",
6307                "ContributorInsightsAction": "ENABLE"
6308            }),
6309        );
6310        let resp = svc.update_contributor_insights(&req).unwrap();
6311        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6312        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6313
6314        // List
6315        let req = make_request("ListContributorInsights", json!({}));
6316        let resp = svc.list_contributor_insights(&req).unwrap();
6317        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6318        assert_eq!(
6319            body["ContributorInsightsSummaries"]
6320                .as_array()
6321                .unwrap()
6322                .len(),
6323            1
6324        );
6325    }
6326
6327    #[test]
6328    fn export_lifecycle() {
6329        let svc = make_service();
6330        create_test_table(&svc);
6331
6332        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
6333
6334        // Export
6335        let req = make_request(
6336            "ExportTableToPointInTime",
6337            json!({
6338                "TableArn": table_arn,
6339                "S3Bucket": "my-bucket"
6340            }),
6341        );
6342        let resp = svc.export_table_to_point_in_time(&req).unwrap();
6343        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6344        let export_arn = body["ExportDescription"]["ExportArn"]
6345            .as_str()
6346            .unwrap()
6347            .to_string();
6348        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
6349
6350        // Describe
6351        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
6352        let resp = svc.describe_export(&req).unwrap();
6353        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6354        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
6355
6356        // List
6357        let req = make_request("ListExports", json!({}));
6358        let resp = svc.list_exports(&req).unwrap();
6359        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6360        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
6361    }
6362
6363    #[test]
6364    fn import_lifecycle() {
6365        let svc = make_service();
6366
6367        let req = make_request(
6368            "ImportTable",
6369            json!({
6370                "InputFormat": "DYNAMODB_JSON",
6371                "S3BucketSource": { "S3Bucket": "import-bucket" },
6372                "TableCreationParameters": {
6373                    "TableName": "imported-table",
6374                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
6375                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
6376                }
6377            }),
6378        );
6379        let resp = svc.import_table(&req).unwrap();
6380        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6381        let import_arn = body["ImportTableDescription"]["ImportArn"]
6382            .as_str()
6383            .unwrap()
6384            .to_string();
6385        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6386
6387        // Describe import
6388        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
6389        let resp = svc.describe_import(&req).unwrap();
6390        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6391        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6392
6393        // List imports
6394        let req = make_request("ListImports", json!({}));
6395        let resp = svc.list_imports(&req).unwrap();
6396        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6397        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
6398
6399        // Verify the table was created
6400        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
6401        let resp = svc.describe_table(&req).unwrap();
6402        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6403        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6404    }
6405
6406    #[test]
6407    fn backup_restore_preserves_items() {
6408        let svc = make_service();
6409        create_test_table(&svc);
6410
6411        // Put 3 items
6412        for i in 1..=3 {
6413            let req = make_request(
6414                "PutItem",
6415                json!({
6416                    "TableName": "test-table",
6417                    "Item": {
6418                        "pk": { "S": format!("key{i}") },
6419                        "data": { "S": format!("value{i}") }
6420                    }
6421                }),
6422            );
6423            svc.put_item(&req).unwrap();
6424        }
6425
6426        // Create backup
6427        let req = make_request(
6428            "CreateBackup",
6429            json!({
6430                "TableName": "test-table",
6431                "BackupName": "my-backup"
6432            }),
6433        );
6434        let resp = svc.create_backup(&req).unwrap();
6435        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6436        let backup_arn = body["BackupDetails"]["BackupArn"]
6437            .as_str()
6438            .unwrap()
6439            .to_string();
6440
6441        // Delete all items from the original table
6442        for i in 1..=3 {
6443            let req = make_request(
6444                "DeleteItem",
6445                json!({
6446                    "TableName": "test-table",
6447                    "Key": { "pk": { "S": format!("key{i}") } }
6448                }),
6449            );
6450            svc.delete_item(&req).unwrap();
6451        }
6452
6453        // Verify original table is empty
6454        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6455        let resp = svc.scan(&req).unwrap();
6456        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6457        assert_eq!(body["Count"], 0);
6458
6459        // Restore from backup
6460        let req = make_request(
6461            "RestoreTableFromBackup",
6462            json!({
6463                "BackupArn": backup_arn,
6464                "TargetTableName": "restored-table"
6465            }),
6466        );
6467        svc.restore_table_from_backup(&req).unwrap();
6468
6469        // Scan restored table — should have 3 items
6470        let req = make_request("Scan", json!({ "TableName": "restored-table" }));
6471        let resp = svc.scan(&req).unwrap();
6472        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6473        assert_eq!(body["Count"], 3);
6474        assert_eq!(body["Items"].as_array().unwrap().len(), 3);
6475    }
6476
6477    #[test]
6478    fn global_table_replicates_writes() {
6479        let svc = make_service();
6480        create_test_table(&svc);
6481
6482        // Create global table with replicas
6483        let req = make_request(
6484            "CreateGlobalTable",
6485            json!({
6486                "GlobalTableName": "test-table",
6487                "ReplicationGroup": [
6488                    { "RegionName": "us-east-1" },
6489                    { "RegionName": "eu-west-1" }
6490                ]
6491            }),
6492        );
6493        let resp = svc.create_global_table(&req).unwrap();
6494        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6495        assert_eq!(
6496            body["GlobalTableDescription"]["GlobalTableStatus"],
6497            "ACTIVE"
6498        );
6499
6500        // Put an item
6501        let req = make_request(
6502            "PutItem",
6503            json!({
6504                "TableName": "test-table",
6505                "Item": {
6506                    "pk": { "S": "replicated-key" },
6507                    "data": { "S": "replicated-value" }
6508                }
6509            }),
6510        );
6511        svc.put_item(&req).unwrap();
6512
6513        // Verify the item is readable (since all replicas share the same table)
6514        let req = make_request(
6515            "GetItem",
6516            json!({
6517                "TableName": "test-table",
6518                "Key": { "pk": { "S": "replicated-key" } }
6519            }),
6520        );
6521        let resp = svc.get_item(&req).unwrap();
6522        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6523        assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
6524        assert_eq!(body["Item"]["data"]["S"], "replicated-value");
6525    }
6526
6527    #[test]
6528    fn contributor_insights_tracks_access() {
6529        let svc = make_service();
6530        create_test_table(&svc);
6531
6532        // Enable contributor insights
6533        let req = make_request(
6534            "UpdateContributorInsights",
6535            json!({
6536                "TableName": "test-table",
6537                "ContributorInsightsAction": "ENABLE"
6538            }),
6539        );
6540        svc.update_contributor_insights(&req).unwrap();
6541
6542        // Put items with different partition keys
6543        for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
6544            let req = make_request(
6545                "PutItem",
6546                json!({
6547                    "TableName": "test-table",
6548                    "Item": {
6549                        "pk": { "S": key },
6550                        "data": { "S": "value" }
6551                    }
6552                }),
6553            );
6554            svc.put_item(&req).unwrap();
6555        }
6556
6557        // Get items (to also track read access)
6558        for _ in 0..3 {
6559            let req = make_request(
6560                "GetItem",
6561                json!({
6562                    "TableName": "test-table",
6563                    "Key": { "pk": { "S": "alpha" } }
6564                }),
6565            );
6566            svc.get_item(&req).unwrap();
6567        }
6568
6569        // Describe contributor insights — should show top contributors
6570        let req = make_request(
6571            "DescribeContributorInsights",
6572            json!({ "TableName": "test-table" }),
6573        );
6574        let resp = svc.describe_contributor_insights(&req).unwrap();
6575        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6576        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6577
6578        let contributors = body["TopContributors"].as_array().unwrap();
6579        assert!(
6580            !contributors.is_empty(),
6581            "TopContributors should not be empty"
6582        );
6583
6584        // alpha was accessed 3 (put) + 3 (get) = 6 times, beta 2 times
6585        // alpha should be the top contributor
6586        let top = &contributors[0];
6587        assert!(top["Count"].as_u64().unwrap() > 0);
6588
6589        // Verify the rule list is populated
6590        let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
6591        assert!(!rules.is_empty());
6592    }
6593
6594    #[test]
6595    fn contributor_insights_not_tracked_when_disabled() {
6596        let svc = make_service();
6597        create_test_table(&svc);
6598
6599        // Put items without enabling insights
6600        let req = make_request(
6601            "PutItem",
6602            json!({
6603                "TableName": "test-table",
6604                "Item": {
6605                    "pk": { "S": "key1" },
6606                    "data": { "S": "value" }
6607                }
6608            }),
6609        );
6610        svc.put_item(&req).unwrap();
6611
6612        // Describe — should show empty contributors
6613        let req = make_request(
6614            "DescribeContributorInsights",
6615            json!({ "TableName": "test-table" }),
6616        );
6617        let resp = svc.describe_contributor_insights(&req).unwrap();
6618        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6619        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6620
6621        let contributors = body["TopContributors"].as_array().unwrap();
6622        assert!(contributors.is_empty());
6623    }
6624
6625    #[test]
6626    fn contributor_insights_disabled_table_no_counters_after_scan() {
6627        let svc = make_service();
6628        create_test_table(&svc);
6629
6630        // Put items
6631        for key in &["alpha", "beta"] {
6632            let req = make_request(
6633                "PutItem",
6634                json!({
6635                    "TableName": "test-table",
6636                    "Item": { "pk": { "S": key } }
6637                }),
6638            );
6639            svc.put_item(&req).unwrap();
6640        }
6641
6642        // Enable insights, then scan, then disable, then check counters are cleared
6643        let req = make_request(
6644            "UpdateContributorInsights",
6645            json!({
6646                "TableName": "test-table",
6647                "ContributorInsightsAction": "ENABLE"
6648            }),
6649        );
6650        svc.update_contributor_insights(&req).unwrap();
6651
6652        // Scan to trigger counter collection
6653        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6654        svc.scan(&req).unwrap();
6655
6656        // Verify counters were collected
6657        let req = make_request(
6658            "DescribeContributorInsights",
6659            json!({ "TableName": "test-table" }),
6660        );
6661        let resp = svc.describe_contributor_insights(&req).unwrap();
6662        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6663        let contributors = body["TopContributors"].as_array().unwrap();
6664        assert!(
6665            !contributors.is_empty(),
6666            "counters should be non-empty while enabled"
6667        );
6668
6669        // Disable insights (this clears counters)
6670        let req = make_request(
6671            "UpdateContributorInsights",
6672            json!({
6673                "TableName": "test-table",
6674                "ContributorInsightsAction": "DISABLE"
6675            }),
6676        );
6677        svc.update_contributor_insights(&req).unwrap();
6678
6679        // Scan again -- should NOT accumulate counters since insights is disabled
6680        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6681        svc.scan(&req).unwrap();
6682
6683        // Verify counters are still empty
6684        let req = make_request(
6685            "DescribeContributorInsights",
6686            json!({ "TableName": "test-table" }),
6687        );
6688        let resp = svc.describe_contributor_insights(&req).unwrap();
6689        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6690        let contributors = body["TopContributors"].as_array().unwrap();
6691        assert!(
6692            contributors.is_empty(),
6693            "counters should be empty after disabling insights"
6694        );
6695    }
6696
6697    #[test]
6698    fn scan_pagination_with_limit() {
6699        let svc = make_service();
6700        create_test_table(&svc);
6701
6702        // Insert 5 items
6703        for i in 0..5 {
6704            let req = make_request(
6705                "PutItem",
6706                json!({
6707                    "TableName": "test-table",
6708                    "Item": {
6709                        "pk": { "S": format!("item{i}") },
6710                        "data": { "S": format!("value{i}") }
6711                    }
6712                }),
6713            );
6714            svc.put_item(&req).unwrap();
6715        }
6716
6717        // Scan with limit=2
6718        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
6719        let resp = svc.scan(&req).unwrap();
6720        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6721        assert_eq!(body["Count"], 2);
6722        assert!(
6723            body["LastEvaluatedKey"].is_object(),
6724            "should have LastEvaluatedKey when limit < total items"
6725        );
6726        assert!(body["LastEvaluatedKey"]["pk"].is_object());
6727
6728        // Page through all items
6729        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6730        let mut lek = body["LastEvaluatedKey"].clone();
6731
6732        while lek.is_object() {
6733            let req = make_request(
6734                "Scan",
6735                json!({
6736                    "TableName": "test-table",
6737                    "Limit": 2,
6738                    "ExclusiveStartKey": lek
6739                }),
6740            );
6741            let resp = svc.scan(&req).unwrap();
6742            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6743            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6744            lek = body["LastEvaluatedKey"].clone();
6745        }
6746
6747        assert_eq!(
6748            all_items.len(),
6749            5,
6750            "should retrieve all 5 items via pagination"
6751        );
6752    }
6753
6754    #[test]
6755    fn scan_no_pagination_when_all_fit() {
6756        let svc = make_service();
6757        create_test_table(&svc);
6758
6759        for i in 0..3 {
6760            let req = make_request(
6761                "PutItem",
6762                json!({
6763                    "TableName": "test-table",
6764                    "Item": {
6765                        "pk": { "S": format!("item{i}") }
6766                    }
6767                }),
6768            );
6769            svc.put_item(&req).unwrap();
6770        }
6771
6772        // Scan with limit > item count
6773        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
6774        let resp = svc.scan(&req).unwrap();
6775        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6776        assert_eq!(body["Count"], 3);
6777        assert!(
6778            body["LastEvaluatedKey"].is_null(),
6779            "should not have LastEvaluatedKey when all items fit"
6780        );
6781
6782        // Scan without limit
6783        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6784        let resp = svc.scan(&req).unwrap();
6785        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6786        assert_eq!(body["Count"], 3);
6787        assert!(body["LastEvaluatedKey"].is_null());
6788    }
6789
6790    fn create_composite_table(svc: &DynamoDbService) {
6791        let req = make_request(
6792            "CreateTable",
6793            json!({
6794                "TableName": "composite-table",
6795                "KeySchema": [
6796                    { "AttributeName": "pk", "KeyType": "HASH" },
6797                    { "AttributeName": "sk", "KeyType": "RANGE" }
6798                ],
6799                "AttributeDefinitions": [
6800                    { "AttributeName": "pk", "AttributeType": "S" },
6801                    { "AttributeName": "sk", "AttributeType": "S" }
6802                ],
6803                "BillingMode": "PAY_PER_REQUEST"
6804            }),
6805        );
6806        svc.create_table(&req).unwrap();
6807    }
6808
6809    #[test]
6810    fn query_pagination_with_composite_key() {
6811        let svc = make_service();
6812        create_composite_table(&svc);
6813
6814        // Insert 5 items under the same partition key
6815        for i in 0..5 {
6816            let req = make_request(
6817                "PutItem",
6818                json!({
6819                    "TableName": "composite-table",
6820                    "Item": {
6821                        "pk": { "S": "user1" },
6822                        "sk": { "S": format!("item{i:03}") },
6823                        "data": { "S": format!("value{i}") }
6824                    }
6825                }),
6826            );
6827            svc.put_item(&req).unwrap();
6828        }
6829
6830        // Query with limit=2
6831        let req = make_request(
6832            "Query",
6833            json!({
6834                "TableName": "composite-table",
6835                "KeyConditionExpression": "pk = :pk",
6836                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6837                "Limit": 2
6838            }),
6839        );
6840        let resp = svc.query(&req).unwrap();
6841        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6842        assert_eq!(body["Count"], 2);
6843        assert!(body["LastEvaluatedKey"].is_object());
6844        assert!(body["LastEvaluatedKey"]["pk"].is_object());
6845        assert!(body["LastEvaluatedKey"]["sk"].is_object());
6846
6847        // Page through all items
6848        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6849        let mut lek = body["LastEvaluatedKey"].clone();
6850
6851        while lek.is_object() {
6852            let req = make_request(
6853                "Query",
6854                json!({
6855                    "TableName": "composite-table",
6856                    "KeyConditionExpression": "pk = :pk",
6857                    "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6858                    "Limit": 2,
6859                    "ExclusiveStartKey": lek
6860                }),
6861            );
6862            let resp = svc.query(&req).unwrap();
6863            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6864            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6865            lek = body["LastEvaluatedKey"].clone();
6866        }
6867
6868        assert_eq!(
6869            all_items.len(),
6870            5,
6871            "should retrieve all 5 items via pagination"
6872        );
6873
6874        // Verify items came back sorted by sort key
6875        let sks: Vec<String> = all_items
6876            .iter()
6877            .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
6878            .collect();
6879        let mut sorted = sks.clone();
6880        sorted.sort();
6881        assert_eq!(sks, sorted, "items should be sorted by sort key");
6882    }
6883
6884    #[test]
6885    fn query_no_pagination_when_all_fit() {
6886        let svc = make_service();
6887        create_composite_table(&svc);
6888
6889        for i in 0..2 {
6890            let req = make_request(
6891                "PutItem",
6892                json!({
6893                    "TableName": "composite-table",
6894                    "Item": {
6895                        "pk": { "S": "user1" },
6896                        "sk": { "S": format!("item{i}") }
6897                    }
6898                }),
6899            );
6900            svc.put_item(&req).unwrap();
6901        }
6902
6903        let req = make_request(
6904            "Query",
6905            json!({
6906                "TableName": "composite-table",
6907                "KeyConditionExpression": "pk = :pk",
6908                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6909                "Limit": 10
6910            }),
6911        );
6912        let resp = svc.query(&req).unwrap();
6913        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6914        assert_eq!(body["Count"], 2);
6915        assert!(
6916            body["LastEvaluatedKey"].is_null(),
6917            "should not have LastEvaluatedKey when all items fit"
6918        );
6919    }
6920
6921    fn create_gsi_table(svc: &DynamoDbService) {
6922        let req = make_request(
6923            "CreateTable",
6924            json!({
6925                "TableName": "gsi-table",
6926                "KeySchema": [
6927                    { "AttributeName": "pk", "KeyType": "HASH" }
6928                ],
6929                "AttributeDefinitions": [
6930                    { "AttributeName": "pk", "AttributeType": "S" },
6931                    { "AttributeName": "gsi_pk", "AttributeType": "S" },
6932                    { "AttributeName": "gsi_sk", "AttributeType": "S" }
6933                ],
6934                "BillingMode": "PAY_PER_REQUEST",
6935                "GlobalSecondaryIndexes": [
6936                    {
6937                        "IndexName": "gsi-index",
6938                        "KeySchema": [
6939                            { "AttributeName": "gsi_pk", "KeyType": "HASH" },
6940                            { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
6941                        ],
6942                        "Projection": { "ProjectionType": "ALL" }
6943                    }
6944                ]
6945            }),
6946        );
6947        svc.create_table(&req).unwrap();
6948    }
6949
6950    #[test]
6951    fn gsi_query_last_evaluated_key_includes_table_pk() {
6952        let svc = make_service();
6953        create_gsi_table(&svc);
6954
6955        // Insert 3 items with the SAME GSI key but different table PKs
6956        for i in 0..3 {
6957            let req = make_request(
6958                "PutItem",
6959                json!({
6960                    "TableName": "gsi-table",
6961                    "Item": {
6962                        "pk": { "S": format!("item{i}") },
6963                        "gsi_pk": { "S": "shared" },
6964                        "gsi_sk": { "S": "sort" }
6965                    }
6966                }),
6967            );
6968            svc.put_item(&req).unwrap();
6969        }
6970
6971        // Query GSI with Limit=1 to trigger pagination
6972        let req = make_request(
6973            "Query",
6974            json!({
6975                "TableName": "gsi-table",
6976                "IndexName": "gsi-index",
6977                "KeyConditionExpression": "gsi_pk = :v",
6978                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6979                "Limit": 1
6980            }),
6981        );
6982        let resp = svc.query(&req).unwrap();
6983        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6984        assert_eq!(body["Count"], 1);
6985        let lek = &body["LastEvaluatedKey"];
6986        assert!(lek.is_object(), "should have LastEvaluatedKey");
6987        // Must contain the index keys
6988        assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
6989        assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
6990        // Must also contain the table PK
6991        assert!(
6992            lek["pk"].is_object(),
6993            "LEK must contain table PK for GSI queries"
6994        );
6995    }
6996
6997    #[test]
6998    fn gsi_query_pagination_returns_all_items() {
6999        let svc = make_service();
7000        create_gsi_table(&svc);
7001
7002        // Insert 4 items with the SAME GSI key but different table PKs
7003        for i in 0..4 {
7004            let req = make_request(
7005                "PutItem",
7006                json!({
7007                    "TableName": "gsi-table",
7008                    "Item": {
7009                        "pk": { "S": format!("item{i:03}") },
7010                        "gsi_pk": { "S": "shared" },
7011                        "gsi_sk": { "S": "sort" }
7012                    }
7013                }),
7014            );
7015            svc.put_item(&req).unwrap();
7016        }
7017
7018        // Paginate through all items with Limit=2
7019        let mut all_pks = Vec::new();
7020        let mut lek: Option<Value> = None;
7021
7022        loop {
7023            let mut query = json!({
7024                "TableName": "gsi-table",
7025                "IndexName": "gsi-index",
7026                "KeyConditionExpression": "gsi_pk = :v",
7027                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
7028                "Limit": 2
7029            });
7030            if let Some(ref start_key) = lek {
7031                query["ExclusiveStartKey"] = start_key.clone();
7032            }
7033
7034            let req = make_request("Query", query);
7035            let resp = svc.query(&req).unwrap();
7036            let body: Value = serde_json::from_slice(&resp.body).unwrap();
7037
7038            for item in body["Items"].as_array().unwrap() {
7039                let pk = item["pk"]["S"].as_str().unwrap().to_string();
7040                all_pks.push(pk);
7041            }
7042
7043            if body["LastEvaluatedKey"].is_object() {
7044                lek = Some(body["LastEvaluatedKey"].clone());
7045            } else {
7046                break;
7047            }
7048        }
7049
7050        all_pks.sort();
7051        assert_eq!(
7052            all_pks,
7053            vec!["item000", "item001", "item002", "item003"],
7054            "pagination should return all items without duplicates"
7055        );
7056    }
7057
7058    fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
7059        pairs
7060            .iter()
7061            .map(|(k, v)| (k.to_string(), json!({"S": v})))
7062            .collect()
7063    }
7064
7065    fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
7066        pairs
7067            .iter()
7068            .map(|(k, v)| (k.to_string(), v.to_string()))
7069            .collect()
7070    }
7071
7072    fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
7073        pairs
7074            .iter()
7075            .map(|(k, v)| (k.to_string(), json!({"S": v})))
7076            .collect()
7077    }
7078
7079    #[test]
7080    fn test_evaluate_condition_bare_not_equal() {
7081        let item = cond_item(&[("state", "active")]);
7082        let names = cond_names(&[("#s", "state")]);
7083        let values = cond_values(&[(":c", "complete")]);
7084
7085        assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
7086
7087        let item2 = cond_item(&[("state", "complete")]);
7088        assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
7089    }
7090
7091    #[test]
7092    fn test_evaluate_condition_parenthesized_not_equal() {
7093        let item = cond_item(&[("state", "active")]);
7094        let names = cond_names(&[("#s", "state")]);
7095        let values = cond_values(&[(":c", "complete")]);
7096
7097        assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
7098    }
7099
7100    #[test]
7101    fn test_evaluate_condition_parenthesized_equal_mismatch() {
7102        let item = cond_item(&[("state", "active")]);
7103        let names = cond_names(&[("#s", "state")]);
7104        let values = cond_values(&[(":c", "complete")]);
7105
7106        assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
7107    }
7108
7109    #[test]
7110    fn test_evaluate_condition_compound_and() {
7111        let item = cond_item(&[("state", "active")]);
7112        let names = cond_names(&[("#s", "state")]);
7113        let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
7114
7115        // active <> complete AND active <> failed => true
7116        assert!(
7117            evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
7118        );
7119    }
7120
7121    #[test]
7122    fn test_evaluate_condition_compound_and_mismatch() {
7123        let item = cond_item(&[("state", "inactive")]);
7124        let names = cond_names(&[("#s", "state")]);
7125        let values = cond_values(&[(":a", "active"), (":b", "active")]);
7126
7127        // inactive = active AND inactive = active => false
7128        assert!(
7129            evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
7130        );
7131    }
7132
7133    #[test]
7134    fn test_evaluate_condition_compound_or() {
7135        let item = cond_item(&[("state", "running")]);
7136        let names = cond_names(&[("#s", "state")]);
7137        let values = cond_values(&[(":a", "active"), (":b", "idle")]);
7138
7139        // running = active OR running = idle => false
7140        assert!(
7141            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
7142        );
7143
7144        // running = active OR running = running => true
7145        let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
7146        assert!(
7147            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
7148        );
7149    }
7150
7151    #[test]
7152    fn test_evaluate_condition_not_operator() {
7153        let item = cond_item(&[("state", "active")]);
7154        let names = cond_names(&[("#s", "state")]);
7155        let values = cond_values(&[(":c", "complete")]);
7156
7157        // NOT (active = complete) => NOT false => true
7158        assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
7159
7160        // NOT (active <> complete) => NOT true => false
7161        assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
7162
7163        // NOT attribute_exists(#s) on existing item => NOT true => false
7164        assert!(
7165            evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
7166        );
7167
7168        // NOT attribute_exists(#s) on missing item => NOT false => true
7169        assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
7170    }
7171
7172    #[test]
7173    fn test_evaluate_condition_begins_with() {
7174        // After unification, conditions support begins_with via
7175        // evaluate_single_filter_condition (previously only filters had it).
7176        let item = cond_item(&[("name", "fakecloud-dynamodb")]);
7177        let names = cond_names(&[("#n", "name")]);
7178        let values = cond_values(&[(":p", "fakecloud")]);
7179
7180        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
7181
7182        let values2 = cond_values(&[(":p", "realcloud")]);
7183        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
7184    }
7185
7186    #[test]
7187    fn test_evaluate_condition_contains() {
7188        let item = cond_item(&[("tags", "alpha,beta,gamma")]);
7189        let names = cond_names(&[("#t", "tags")]);
7190        let values = cond_values(&[(":v", "beta")]);
7191
7192        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
7193
7194        let values2 = cond_values(&[(":v", "delta")]);
7195        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
7196    }
7197
7198    #[test]
7199    fn test_evaluate_condition_no_existing_item() {
7200        // When no item exists (PutItem with condition), attribute_not_exists
7201        // should succeed and attribute_exists should fail.
7202        let names = cond_names(&[("#s", "state")]);
7203        let values = cond_values(&[(":v", "active")]);
7204
7205        assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
7206        assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
7207        // Comparison against missing item: None != Some(val) => true for <>
7208        assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
7209        // None == Some(val) => false for =
7210        assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
7211    }
7212
7213    #[test]
7214    fn test_evaluate_filter_not_operator() {
7215        let item = cond_item(&[("status", "pending")]);
7216        let names = cond_names(&[("#s", "status")]);
7217        let values = cond_values(&[(":v", "pending")]);
7218
7219        assert!(!evaluate_filter_expression(
7220            "NOT (#s = :v)",
7221            &item,
7222            &names,
7223            &values
7224        ));
7225        assert!(evaluate_filter_expression(
7226            "NOT (#s <> :v)",
7227            &item,
7228            &names,
7229            &values
7230        ));
7231    }
7232
7233    #[test]
7234    fn test_evaluate_filter_expression_in_match() {
7235        // aws-sdk-go v2's expression.Name("state").In(Value("active"), Value("pending"))
7236        // emits "#0 IN (:0, :1)". Before fix: neither evaluate_single_filter_condition
7237        // nor evaluate_single_key_condition handled IN, so the filter leaf fell through
7238        // to the simple-comparison loop, hit no operators, and returned `true` — meaning
7239        // every item matched every IN filter regardless of value.
7240        let item = cond_item(&[("state", "active")]);
7241        let names = cond_names(&[("#s", "state")]);
7242        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7243
7244        assert!(
7245            evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
7246            "state=active should match IN (active, pending)"
7247        );
7248    }
7249
7250    #[test]
7251    fn test_evaluate_filter_expression_in_no_match() {
7252        let item = cond_item(&[("state", "complete")]);
7253        let names = cond_names(&[("#s", "state")]);
7254        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7255
7256        assert!(
7257            !evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
7258            "state=complete should not match IN (active, pending)"
7259        );
7260    }
7261
7262    #[test]
7263    fn test_evaluate_filter_expression_in_no_spaces() {
7264        // orderbot emits the raw form
7265        //     "#status IN (" + strings.Join(keys, ",") + ")"
7266        // which produces "IN (:v0,:v1,:v2)" — no spaces after commas. Must parse.
7267        let item = cond_item(&[("status", "shipped")]);
7268        let names = cond_names(&[("#s", "status")]);
7269        let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
7270
7271        assert!(
7272            evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
7273            "no-space IN list should still parse"
7274        );
7275    }
7276
7277    #[test]
7278    fn test_evaluate_filter_expression_in_missing_attribute() {
7279        // A missing attribute must not match any IN list — the silent-true
7280        // fallthrough would wrongly accept these items.
7281        let item: HashMap<String, AttributeValue> = HashMap::new();
7282        let names = cond_names(&[("#s", "state")]);
7283        let values = cond_values(&[(":a", "active")]);
7284
7285        assert!(
7286            !evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
7287            "missing attribute should not match any IN list"
7288        );
7289    }
7290
7291    #[test]
7292    fn test_evaluate_filter_expression_compound_in_and_eq() {
7293        // Shape emitted by `Name("state").In(...).And(Name("priority").Equal(...))`:
7294        //     "(#0 IN (:0, :1)) AND (#1 = :2)"
7295        // split_on_and handles the outer parens, but the IN leaf had the
7296        // silent-true fallthrough, so any item with priority=high would match
7297        // regardless of state.
7298        let item = cond_item(&[("state", "active"), ("priority", "high")]);
7299        let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
7300        let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
7301
7302        assert!(
7303            evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
7304            "(active IN (active, pending)) AND (high = high) should match"
7305        );
7306
7307        let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
7308        assert!(
7309            !evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
7310            "(complete IN (active, pending)) AND (high = high) should not match"
7311        );
7312    }
7313
7314    #[test]
7315    fn test_evaluate_condition_attribute_exists_with_space() {
7316        // aws-sdk-go v2's expression.NewBuilder emits function calls with a
7317        // space between the name and the opening paren:
7318        //     "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))"
7319        // Before fix: extract_function_arg used strip_prefix("attribute_exists(")
7320        // with no space, so these fell through the filter leaf entirely and
7321        // hit evaluate_single_key_condition's silent-true fallthrough —
7322        // every conditional write was silently accepted.
7323        let item = cond_item(&[("store_id", "s-1")]);
7324        let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
7325        let values = cond_values(&[(":0", "tab-A")]);
7326
7327        // On an existing item without active_viewer_tab_id: exists(store_id)
7328        // is true, not_exists(active_viewer_tab_id) is true → OK.
7329        assert!(
7330            evaluate_condition(
7331                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7332                Some(&item),
7333                &names,
7334                &values,
7335            )
7336            .is_ok(),
7337            "claim-lease compound on free item should succeed"
7338        );
7339
7340        // On a missing item: exists(store_id) is false → whole AND false → Err.
7341        assert!(
7342            evaluate_condition(
7343                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7344                None,
7345                &names,
7346                &values,
7347            )
7348            .is_err(),
7349            "claim-lease compound on missing item must fail attribute_exists branch"
7350        );
7351
7352        // On an item already held by tab-B: exists ✓, not_exists ✗, #1 = :0 ✗
7353        // → (✓) AND ((✗) OR (✗)) → false → Err.
7354        let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
7355        assert!(
7356            evaluate_condition(
7357                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7358                Some(&held),
7359                &names,
7360                &values,
7361            )
7362            .is_err(),
7363            "claim-lease compound on item held by another tab must fail"
7364        );
7365
7366        // Same tab re-claiming: exists ✓, not_exists ✗, #1 = :0 ✓
7367        // → (✓) AND ((✗) OR (✓)) → true → Ok.
7368        let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
7369        assert!(
7370            evaluate_condition(
7371                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7372                Some(&self_held),
7373                &names,
7374                &values,
7375            )
7376            .is_ok(),
7377            "same-tab re-claim must succeed"
7378        );
7379    }
7380
7381    #[test]
7382    fn test_evaluate_condition_in_match() {
7383        // evaluate_condition delegates to evaluate_filter_expression, so this
7384        // also proves the ConditionExpression path. Before fix: silently Ok.
7385        let item = cond_item(&[("state", "active")]);
7386        let names = cond_names(&[("#s", "state")]);
7387        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7388
7389        assert!(
7390            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
7391            "IN should succeed when actual value is in the list"
7392        );
7393    }
7394
7395    #[test]
7396    fn test_evaluate_condition_in_no_match() {
7397        // Before fix: evaluate_condition silently returned Ok(()) for IN — any
7398        // conditional write was accepted regardless of actual state, the
7399        // opposite of what the caller asked for.
7400        let item = cond_item(&[("state", "complete")]);
7401        let names = cond_names(&[("#s", "state")]);
7402        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7403
7404        assert!(
7405            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
7406            "IN should fail when actual value is not in the list"
7407        );
7408    }
7409
7410    #[test]
7411    fn test_apply_update_set_list_index_replaces_existing() {
7412        // Shape emitted by orderbot's order-item update retry loop:
7413        //     UpdateExpression: fmt.Sprintf("SET #items[%d] = :item", index)
7414        // Before fix: apply_set_assignment called resolve_attr_name on the
7415        // whole "#items[0]" token, which misses the name map, and then
7416        // item.insert("#items[0]", :item), producing a top-level key
7417        // literally named "#items[0]" rather than mutating the list.
7418        let mut item = HashMap::new();
7419        item.insert(
7420            "items".to_string(),
7421            json!({"L": [
7422                {"M": {"sku": {"S": "OLD-A"}}},
7423                {"M": {"sku": {"S": "OLD-B"}}},
7424            ]}),
7425        );
7426
7427        let names = cond_names(&[("#items", "items")]);
7428        let mut values = HashMap::new();
7429        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
7430
7431        apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
7432
7433        let items_list = item
7434            .get("items")
7435            .and_then(|v| v.get("L"))
7436            .and_then(|v| v.as_array())
7437            .expect("items should still be a list");
7438        assert_eq!(items_list.len(), 2, "list length should be unchanged");
7439        let sku0 = items_list[0]
7440            .get("M")
7441            .and_then(|m| m.get("sku"))
7442            .and_then(|s| s.get("S"))
7443            .and_then(|s| s.as_str());
7444        assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
7445        let sku1 = items_list[1]
7446            .get("M")
7447            .and_then(|m| m.get("sku"))
7448            .and_then(|s| s.get("S"))
7449            .and_then(|s| s.as_str());
7450        assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
7451
7452        assert!(!item.contains_key("items[0]"));
7453        assert!(!item.contains_key("#items[0]"));
7454    }
7455
7456    #[test]
7457    fn test_apply_update_set_list_index_second_slot() {
7458        let mut item = HashMap::new();
7459        item.insert(
7460            "items".to_string(),
7461            json!({"L": [
7462                {"M": {"sku": {"S": "A"}}},
7463                {"M": {"sku": {"S": "B"}}},
7464                {"M": {"sku": {"S": "C"}}},
7465            ]}),
7466        );
7467
7468        let names = cond_names(&[("#items", "items")]);
7469        let mut values = HashMap::new();
7470        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
7471
7472        apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
7473
7474        let items_list = item
7475            .get("items")
7476            .and_then(|v| v.get("L"))
7477            .and_then(|v| v.as_array())
7478            .unwrap();
7479        let skus: Vec<&str> = items_list
7480            .iter()
7481            .map(|v| {
7482                v.get("M")
7483                    .and_then(|m| m.get("sku"))
7484                    .and_then(|s| s.get("S"))
7485                    .and_then(|s| s.as_str())
7486                    .unwrap()
7487            })
7488            .collect();
7489        assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
7490    }
7491
7492    #[test]
7493    fn test_apply_update_set_list_index_without_name_ref() {
7494        // Same fix must also work when the LHS is a literal attribute name,
7495        // not an expression attribute name ref.
7496        let mut item = HashMap::new();
7497        item.insert(
7498            "tags".to_string(),
7499            json!({"L": [{"S": "red"}, {"S": "blue"}]}),
7500        );
7501
7502        let names: HashMap<String, String> = HashMap::new();
7503        let mut values = HashMap::new();
7504        values.insert(":t".to_string(), json!({"S": "green"}));
7505
7506        apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
7507
7508        let tags = item
7509            .get("tags")
7510            .and_then(|v| v.get("L"))
7511            .and_then(|v| v.as_array())
7512            .unwrap();
7513        assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
7514        assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
7515    }
7516
7517    #[test]
7518    fn test_unrecognized_expression_returns_false() {
7519        // evaluate_single_key_condition must fail-closed: an expression shape
7520        // it doesn't recognize should return false (reject), not true (accept).
7521        let item = cond_item(&[("x", "1")]);
7522        let names: HashMap<String, String> = HashMap::new();
7523        let values: HashMap<String, Value> = HashMap::new();
7524
7525        assert!(
7526            !evaluate_single_key_condition("GARBAGE NONSENSE", &item, "", &names, &values),
7527            "unrecognized expression must return false"
7528        );
7529    }
7530
7531    #[test]
7532    fn test_set_list_index_out_of_range_returns_error() {
7533        // SET list[N] where N > len must return a ValidationException,
7534        // not silently no-op.
7535        let mut item = HashMap::new();
7536        item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
7537
7538        let names: HashMap<String, String> = HashMap::new();
7539        let mut values = HashMap::new();
7540        values.insert(":v".to_string(), json!({"S": "z"}));
7541
7542        let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
7543        assert!(
7544            result.is_err(),
7545            "out-of-range list index must return an error"
7546        );
7547
7548        // List should be unchanged
7549        let list = item
7550            .get("items")
7551            .and_then(|v| v.get("L"))
7552            .and_then(|v| v.as_array())
7553            .unwrap();
7554        assert_eq!(list.len(), 2);
7555    }
7556
7557    #[test]
7558    fn test_set_list_index_on_non_list_returns_error() {
7559        // SET attr[0] = :v where attr is a string (not a list) must return
7560        // a ValidationException.
7561        let mut item = HashMap::new();
7562        item.insert("name".to_string(), json!({"S": "hello"}));
7563
7564        let names: HashMap<String, String> = HashMap::new();
7565        let mut values = HashMap::new();
7566        values.insert(":v".to_string(), json!({"S": "z"}));
7567
7568        let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
7569        assert!(
7570            result.is_err(),
7571            "list index on non-list attribute must return an error"
7572        );
7573    }
7574
7575    #[test]
7576    fn test_unrecognized_update_action_returns_error() {
7577        let mut item = HashMap::new();
7578        item.insert("name".to_string(), json!({"S": "hello"}));
7579
7580        let names: HashMap<String, String> = HashMap::new();
7581        let mut values = HashMap::new();
7582        values.insert(":bar".to_string(), json!({"S": "baz"}));
7583
7584        let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
7585        assert!(
7586            result.is_err(),
7587            "unrecognized UpdateExpression action must return an error"
7588        );
7589        let err_msg = format!("{}", result.unwrap_err());
7590        assert!(
7591            err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
7592            "error should mention Invalid UpdateExpression, got: {err_msg}"
7593        );
7594    }
7595
7596    // ── size() function tests ──────────────────────────────────────────
7597
7598    #[test]
7599    fn test_size_string() {
7600        let mut item = HashMap::new();
7601        item.insert("name".to_string(), json!({"S": "hello"}));
7602        let names = HashMap::new();
7603        let mut values = HashMap::new();
7604        values.insert(":limit".to_string(), json!({"N": "5"}));
7605
7606        assert!(evaluate_single_filter_condition(
7607            "size(name) = :limit",
7608            &item,
7609            &names,
7610            &values,
7611        ));
7612        values.insert(":limit".to_string(), json!({"N": "4"}));
7613        assert!(evaluate_single_filter_condition(
7614            "size(name) > :limit",
7615            &item,
7616            &names,
7617            &values,
7618        ));
7619    }
7620
7621    #[test]
7622    fn test_size_list() {
7623        let mut item = HashMap::new();
7624        item.insert(
7625            "items".to_string(),
7626            json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
7627        );
7628        let names = HashMap::new();
7629        let mut values = HashMap::new();
7630        values.insert(":limit".to_string(), json!({"N": "3"}));
7631
7632        assert!(evaluate_single_filter_condition(
7633            "size(items) = :limit",
7634            &item,
7635            &names,
7636            &values,
7637        ));
7638    }
7639
7640    #[test]
7641    fn test_size_map() {
7642        let mut item = HashMap::new();
7643        item.insert(
7644            "data".to_string(),
7645            json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
7646        );
7647        let names = HashMap::new();
7648        let mut values = HashMap::new();
7649        values.insert(":limit".to_string(), json!({"N": "2"}));
7650
7651        assert!(evaluate_single_filter_condition(
7652            "size(data) = :limit",
7653            &item,
7654            &names,
7655            &values,
7656        ));
7657    }
7658
7659    #[test]
7660    fn test_size_set() {
7661        let mut item = HashMap::new();
7662        item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
7663        let names = HashMap::new();
7664        let mut values = HashMap::new();
7665        values.insert(":limit".to_string(), json!({"N": "3"}));
7666
7667        assert!(evaluate_single_filter_condition(
7668            "size(tags) > :limit",
7669            &item,
7670            &names,
7671            &values,
7672        ));
7673    }
7674
7675    // ── attribute_type() function tests ────────────────────────────────
7676
7677    #[test]
7678    fn test_attribute_type_string() {
7679        let mut item = HashMap::new();
7680        item.insert("name".to_string(), json!({"S": "hello"}));
7681        let names = HashMap::new();
7682        let mut values = HashMap::new();
7683        values.insert(":t".to_string(), json!({"S": "S"}));
7684
7685        assert!(evaluate_single_filter_condition(
7686            "attribute_type(name, :t)",
7687            &item,
7688            &names,
7689            &values,
7690        ));
7691
7692        values.insert(":t".to_string(), json!({"S": "N"}));
7693        assert!(!evaluate_single_filter_condition(
7694            "attribute_type(name, :t)",
7695            &item,
7696            &names,
7697            &values,
7698        ));
7699    }
7700
7701    #[test]
7702    fn test_attribute_type_number() {
7703        let mut item = HashMap::new();
7704        item.insert("age".to_string(), json!({"N": "42"}));
7705        let names = HashMap::new();
7706        let mut values = HashMap::new();
7707        values.insert(":t".to_string(), json!({"S": "N"}));
7708
7709        assert!(evaluate_single_filter_condition(
7710            "attribute_type(age, :t)",
7711            &item,
7712            &names,
7713            &values,
7714        ));
7715    }
7716
7717    #[test]
7718    fn test_attribute_type_list() {
7719        let mut item = HashMap::new();
7720        item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
7721        let names = HashMap::new();
7722        let mut values = HashMap::new();
7723        values.insert(":t".to_string(), json!({"S": "L"}));
7724
7725        assert!(evaluate_single_filter_condition(
7726            "attribute_type(items, :t)",
7727            &item,
7728            &names,
7729            &values,
7730        ));
7731    }
7732
7733    #[test]
7734    fn test_attribute_type_map() {
7735        let mut item = HashMap::new();
7736        item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
7737        let names = HashMap::new();
7738        let mut values = HashMap::new();
7739        values.insert(":t".to_string(), json!({"S": "M"}));
7740
7741        assert!(evaluate_single_filter_condition(
7742            "attribute_type(data, :t)",
7743            &item,
7744            &names,
7745            &values,
7746        ));
7747    }
7748
7749    #[test]
7750    fn test_attribute_type_bool() {
7751        let mut item = HashMap::new();
7752        item.insert("active".to_string(), json!({"BOOL": true}));
7753        let names = HashMap::new();
7754        let mut values = HashMap::new();
7755        values.insert(":t".to_string(), json!({"S": "BOOL"}));
7756
7757        assert!(evaluate_single_filter_condition(
7758            "attribute_type(active, :t)",
7759            &item,
7760            &names,
7761            &values,
7762        ));
7763    }
7764
7765    // ── begins_with rejects non-string types ───────────────────────────
7766
7767    #[test]
7768    fn test_begins_with_rejects_number_type() {
7769        let mut item = HashMap::new();
7770        item.insert("code".to_string(), json!({"N": "12345"}));
7771        let names = HashMap::new();
7772        let mut values = HashMap::new();
7773        values.insert(":prefix".to_string(), json!({"S": "123"}));
7774
7775        assert!(
7776            !evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
7777            "begins_with must return false for N-type attributes"
7778        );
7779    }
7780
7781    #[test]
7782    fn test_begins_with_works_on_string_type() {
7783        let mut item = HashMap::new();
7784        item.insert("code".to_string(), json!({"S": "abc123"}));
7785        let names = HashMap::new();
7786        let mut values = HashMap::new();
7787        values.insert(":prefix".to_string(), json!({"S": "abc"}));
7788
7789        assert!(evaluate_single_filter_condition(
7790            "begins_with(code, :prefix)",
7791            &item,
7792            &names,
7793            &values,
7794        ));
7795    }
7796
7797    // ── contains on sets ───────────────────────────────────────────────
7798
7799    #[test]
7800    fn test_contains_string_set() {
7801        let mut item = HashMap::new();
7802        item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
7803        let names = HashMap::new();
7804        let mut values = HashMap::new();
7805        values.insert(":val".to_string(), json!({"S": "blue"}));
7806
7807        assert!(evaluate_single_filter_condition(
7808            "contains(tags, :val)",
7809            &item,
7810            &names,
7811            &values,
7812        ));
7813
7814        values.insert(":val".to_string(), json!({"S": "yellow"}));
7815        assert!(!evaluate_single_filter_condition(
7816            "contains(tags, :val)",
7817            &item,
7818            &names,
7819            &values,
7820        ));
7821    }
7822
7823    #[test]
7824    fn test_contains_number_set() {
7825        let mut item = HashMap::new();
7826        item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
7827        let names = HashMap::new();
7828        let mut values = HashMap::new();
7829        values.insert(":val".to_string(), json!({"N": "2"}));
7830
7831        assert!(evaluate_single_filter_condition(
7832            "contains(scores, :val)",
7833            &item,
7834            &names,
7835            &values,
7836        ));
7837    }
7838
7839    // ── SET arithmetic type validation ─────────────────────────────────
7840
7841    #[test]
7842    fn test_set_arithmetic_rejects_string_operand() {
7843        let mut item = HashMap::new();
7844        item.insert("name".to_string(), json!({"S": "hello"}));
7845        let names = HashMap::new();
7846        let mut values = HashMap::new();
7847        values.insert(":val".to_string(), json!({"N": "1"}));
7848
7849        let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
7850        assert!(
7851            result.is_err(),
7852            "arithmetic on S-type attribute must return a ValidationException"
7853        );
7854    }
7855
7856    #[test]
7857    fn test_set_arithmetic_rejects_string_value() {
7858        let mut item = HashMap::new();
7859        item.insert("count".to_string(), json!({"N": "5"}));
7860        let names = HashMap::new();
7861        let mut values = HashMap::new();
7862        values.insert(":val".to_string(), json!({"S": "notanumber"}));
7863
7864        let result =
7865            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
7866        assert!(
7867            result.is_err(),
7868            "arithmetic with S-type value must return a ValidationException"
7869        );
7870    }
7871
7872    #[test]
7873    fn test_set_arithmetic_valid_numbers() {
7874        let mut item = HashMap::new();
7875        item.insert("count".to_string(), json!({"N": "10"}));
7876        let names = HashMap::new();
7877        let mut values = HashMap::new();
7878        values.insert(":val".to_string(), json!({"N": "3"}));
7879
7880        let result =
7881            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
7882        assert!(result.is_ok());
7883        assert_eq!(item["count"], json!({"N": "13"}));
7884    }
7885
7886    // ── Binary Set (BS) support in ADD/DELETE ──────────────────────────
7887
7888    #[test]
7889    fn test_add_binary_set() {
7890        let mut item = HashMap::new();
7891        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
7892        let names = HashMap::new();
7893        let mut values = HashMap::new();
7894        values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
7895
7896        let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
7897        assert!(result.is_ok());
7898        let bs = item["data"]["BS"].as_array().unwrap();
7899        assert_eq!(bs.len(), 3, "should merge sets without duplicates");
7900        assert!(bs.contains(&json!("YQ==")));
7901        assert!(bs.contains(&json!("Yg==")));
7902        assert!(bs.contains(&json!("Yw==")));
7903    }
7904
7905    #[test]
7906    fn test_delete_binary_set() {
7907        let mut item = HashMap::new();
7908        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
7909        let names = HashMap::new();
7910        let mut values = HashMap::new();
7911        values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
7912
7913        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
7914        assert!(result.is_ok());
7915        let bs = item["data"]["BS"].as_array().unwrap();
7916        assert_eq!(bs.len(), 2);
7917        assert!(!bs.contains(&json!("Yg==")));
7918    }
7919
7920    #[test]
7921    fn test_delete_binary_set_removes_attr_when_empty() {
7922        let mut item = HashMap::new();
7923        item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
7924        let names = HashMap::new();
7925        let mut values = HashMap::new();
7926        values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
7927
7928        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
7929        assert!(result.is_ok());
7930        assert!(
7931            !item.contains_key("data"),
7932            "attribute should be removed when set becomes empty"
7933        );
7934    }
7935}