Skip to main content

fakecloud_dynamodb/
service.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use parking_lot::RwLock;
9use serde_json::{json, Value};
10
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_s3::state::SharedS3State;
16
17use crate::state::{
18    attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
19    ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
20    KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
21    ReplicaDescription, SharedDynamoDbState,
22};
23
24pub struct DynamoDbService {
25    state: SharedDynamoDbState,
26    s3_state: Option<SharedS3State>,
27    delivery: Option<Arc<DeliveryBus>>,
28}
29
30impl DynamoDbService {
31    pub fn new(state: SharedDynamoDbState) -> Self {
32        Self {
33            state,
34            s3_state: None,
35            delivery: None,
36        }
37    }
38
39    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
40        self.s3_state = Some(s3_state);
41        self
42    }
43
44    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
45        self.delivery = Some(delivery);
46        self
47    }
48
49    /// Deliver a change record to all active Kinesis streaming destinations for a table.
50    fn deliver_to_kinesis_destinations(
51        &self,
52        table: &DynamoTable,
53        event_name: &str,
54        keys: &HashMap<String, AttributeValue>,
55        old_image: Option<&HashMap<String, AttributeValue>>,
56        new_image: Option<&HashMap<String, AttributeValue>>,
57    ) {
58        let delivery = match &self.delivery {
59            Some(d) => d,
60            None => return,
61        };
62
63        let active_destinations: Vec<_> = table
64            .kinesis_destinations
65            .iter()
66            .filter(|d| d.destination_status == "ACTIVE")
67            .collect();
68
69        if active_destinations.is_empty() {
70            return;
71        }
72
73        let mut record = json!({
74            "eventID": uuid::Uuid::new_v4().to_string(),
75            "eventName": event_name,
76            "eventVersion": "1.1",
77            "eventSource": "aws:dynamodb",
78            "awsRegion": table.arn.split(':').nth(3).unwrap_or("us-east-1"),
79            "dynamodb": {
80                "Keys": keys,
81                "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
82                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
83                "StreamViewType": "NEW_AND_OLD_IMAGES",
84            },
85            "eventSourceARN": &table.arn,
86            "tableName": &table.name,
87        });
88
89        if let Some(old) = old_image {
90            record["dynamodb"]["OldImage"] = json!(old);
91        }
92        if let Some(new) = new_image {
93            record["dynamodb"]["NewImage"] = json!(new);
94        }
95
96        let record_str = serde_json::to_string(&record).unwrap_or_default();
97        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
98        let partition_key = serde_json::to_string(keys).unwrap_or_default();
99
100        for dest in active_destinations {
101            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
102        }
103    }
104
105    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
106        serde_json::from_slice(&req.body).map_err(|e| {
107            AwsServiceError::aws_error(
108                StatusCode::BAD_REQUEST,
109                "SerializationException",
110                format!("Invalid JSON: {e}"),
111            )
112        })
113    }
114
115    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
116        Ok(AwsResponse::ok_json(body))
117    }
118
119    // ── Table Operations ────────────────────────────────────────────────
120
121    fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
122        let body = Self::parse_body(req)?;
123
124        let table_name = body["TableName"]
125            .as_str()
126            .ok_or_else(|| {
127                AwsServiceError::aws_error(
128                    StatusCode::BAD_REQUEST,
129                    "ValidationException",
130                    "TableName is required",
131                )
132            })?
133            .to_string();
134
135        let key_schema = parse_key_schema(&body["KeySchema"])?;
136        let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
137
138        // Validate that key schema attributes are defined
139        for ks in &key_schema {
140            if !attribute_definitions
141                .iter()
142                .any(|ad| ad.attribute_name == ks.attribute_name)
143            {
144                return Err(AwsServiceError::aws_error(
145                    StatusCode::BAD_REQUEST,
146                    "ValidationException",
147                    format!(
148                        "One or more parameter values were invalid: \
149                         Some index key attributes are not defined in AttributeDefinitions. \
150                         Keys: [{}], AttributeDefinitions: [{}]",
151                        ks.attribute_name,
152                        attribute_definitions
153                            .iter()
154                            .map(|ad| ad.attribute_name.as_str())
155                            .collect::<Vec<_>>()
156                            .join(", ")
157                    ),
158                ));
159            }
160        }
161
162        let billing_mode = body["BillingMode"]
163            .as_str()
164            .unwrap_or("PROVISIONED")
165            .to_string();
166
167        let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
168            ProvisionedThroughput {
169                read_capacity_units: 0,
170                write_capacity_units: 0,
171            }
172        } else {
173            parse_provisioned_throughput(&body["ProvisionedThroughput"])?
174        };
175
176        let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
177        let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
178        let tags = parse_tags(&body["Tags"]);
179
180        // Parse StreamSpecification
181        let (stream_enabled, stream_view_type) =
182            if let Some(stream_spec) = body.get("StreamSpecification") {
183                let enabled = stream_spec
184                    .get("StreamEnabled")
185                    .and_then(|v| v.as_bool())
186                    .unwrap_or(false);
187                let view_type = if enabled {
188                    stream_spec
189                        .get("StreamViewType")
190                        .and_then(|v| v.as_str())
191                        .map(|s| s.to_string())
192                } else {
193                    None
194                };
195                (enabled, view_type)
196            } else {
197                (false, None)
198            };
199
200        // Parse SSESpecification
201        let (sse_type, sse_kms_key_arn) = if let Some(sse_spec) = body.get("SSESpecification") {
202            let enabled = sse_spec
203                .get("Enabled")
204                .and_then(|v| v.as_bool())
205                .unwrap_or(false);
206            if enabled {
207                let sse_type = sse_spec
208                    .get("SSEType")
209                    .and_then(|v| v.as_str())
210                    .unwrap_or("KMS")
211                    .to_string();
212                let kms_key = sse_spec
213                    .get("KMSMasterKeyId")
214                    .and_then(|v| v.as_str())
215                    .map(|s| s.to_string());
216                (Some(sse_type), kms_key)
217            } else {
218                (None, None)
219            }
220        } else {
221            (None, None)
222        };
223
224        let mut state = self.state.write();
225
226        if state.tables.contains_key(&table_name) {
227            return Err(AwsServiceError::aws_error(
228                StatusCode::BAD_REQUEST,
229                "ResourceInUseException",
230                format!("Table already exists: {table_name}"),
231            ));
232        }
233
234        let now = Utc::now();
235        let arn = format!(
236            "arn:aws:dynamodb:{}:{}:table/{}",
237            state.region, state.account_id, table_name
238        );
239        let stream_arn = if stream_enabled {
240            Some(format!(
241                "arn:aws:dynamodb:{}:{}:table/{}/stream/{}",
242                state.region,
243                state.account_id,
244                table_name,
245                now.format("%Y-%m-%dT%H:%M:%S.%3f")
246            ))
247        } else {
248            None
249        };
250
251        let table = DynamoTable {
252            name: table_name.clone(),
253            arn: arn.clone(),
254            key_schema: key_schema.clone(),
255            attribute_definitions: attribute_definitions.clone(),
256            provisioned_throughput: provisioned_throughput.clone(),
257            items: Vec::new(),
258            gsi: gsi.clone(),
259            lsi: lsi.clone(),
260            tags,
261            created_at: now,
262            status: "ACTIVE".to_string(),
263            item_count: 0,
264            size_bytes: 0,
265            billing_mode: billing_mode.clone(),
266            ttl_attribute: None,
267            ttl_enabled: false,
268            resource_policy: None,
269            pitr_enabled: false,
270            kinesis_destinations: Vec::new(),
271            contributor_insights_status: "DISABLED".to_string(),
272            contributor_insights_counters: HashMap::new(),
273            stream_enabled,
274            stream_view_type,
275            stream_arn,
276            stream_records: Arc::new(RwLock::new(Vec::new())),
277            sse_type,
278            sse_kms_key_arn,
279        };
280
281        state.tables.insert(table_name, table);
282
283        let table_desc = build_table_description_json(
284            &arn,
285            &key_schema,
286            &attribute_definitions,
287            &provisioned_throughput,
288            &gsi,
289            &lsi,
290            &billing_mode,
291            now,
292            0,
293            0,
294            "ACTIVE",
295        );
296
297        Self::ok_json(json!({ "TableDescription": table_desc }))
298    }
299
300    fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301        let body = Self::parse_body(req)?;
302        let table_name = require_str(&body, "TableName")?;
303
304        let mut state = self.state.write();
305        let table = state.tables.remove(table_name).ok_or_else(|| {
306            AwsServiceError::aws_error(
307                StatusCode::BAD_REQUEST,
308                "ResourceNotFoundException",
309                format!("Requested resource not found: Table: {table_name} not found"),
310            )
311        })?;
312
313        let table_desc = build_table_description_json(
314            &table.arn,
315            &table.key_schema,
316            &table.attribute_definitions,
317            &table.provisioned_throughput,
318            &table.gsi,
319            &table.lsi,
320            &table.billing_mode,
321            table.created_at,
322            table.item_count,
323            table.size_bytes,
324            "DELETING",
325        );
326
327        Self::ok_json(json!({ "TableDescription": table_desc }))
328    }
329
330    fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
331        let body = Self::parse_body(req)?;
332        let table_name = require_str(&body, "TableName")?;
333
334        let state = self.state.read();
335        let table = get_table(&state.tables, table_name)?;
336
337        let table_desc = build_table_description(table);
338
339        Self::ok_json(json!({ "Table": table_desc }))
340    }
341
342    fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
343        let body = Self::parse_body(req)?;
344
345        validate_optional_string_length(
346            "exclusiveStartTableName",
347            body["ExclusiveStartTableName"].as_str(),
348            3,
349            255,
350        )?;
351        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
352
353        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
354        let exclusive_start = body["ExclusiveStartTableName"]
355            .as_str()
356            .map(|s| s.to_string());
357
358        let state = self.state.read();
359        let mut names: Vec<&String> = state.tables.keys().collect();
360        names.sort();
361
362        let start_idx = match &exclusive_start {
363            Some(start) => names
364                .iter()
365                .position(|n| n.as_str() > start.as_str())
366                .unwrap_or(names.len()),
367            None => 0,
368        };
369
370        let page: Vec<&str> = names
371            .iter()
372            .skip(start_idx)
373            .take(limit)
374            .map(|n| n.as_str())
375            .collect();
376
377        let mut result = json!({ "TableNames": page });
378
379        if start_idx + limit < names.len() {
380            if let Some(last) = page.last() {
381                result["LastEvaluatedTableName"] = json!(last);
382            }
383        }
384
385        Self::ok_json(result)
386    }
387
388    fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
389        let body = Self::parse_body(req)?;
390        let table_name = require_str(&body, "TableName")?;
391
392        let mut state = self.state.write();
393        let table = state.tables.get_mut(table_name).ok_or_else(|| {
394            AwsServiceError::aws_error(
395                StatusCode::BAD_REQUEST,
396                "ResourceNotFoundException",
397                format!("Requested resource not found: Table: {table_name} not found"),
398            )
399        })?;
400
401        if let Some(pt) = body.get("ProvisionedThroughput") {
402            if let Ok(throughput) = parse_provisioned_throughput(pt) {
403                table.provisioned_throughput = throughput;
404            }
405        }
406
407        if let Some(bm) = body["BillingMode"].as_str() {
408            table.billing_mode = bm.to_string();
409        }
410
411        // Handle SSESpecification update
412        if let Some(sse_spec) = body.get("SSESpecification") {
413            let enabled = sse_spec
414                .get("Enabled")
415                .and_then(|v| v.as_bool())
416                .unwrap_or(false);
417            if enabled {
418                table.sse_type = Some(
419                    sse_spec
420                        .get("SSEType")
421                        .and_then(|v| v.as_str())
422                        .unwrap_or("KMS")
423                        .to_string(),
424                );
425                table.sse_kms_key_arn = sse_spec
426                    .get("KMSMasterKeyId")
427                    .and_then(|v| v.as_str())
428                    .map(|s| s.to_string());
429            } else {
430                table.sse_type = None;
431                table.sse_kms_key_arn = None;
432            }
433        }
434
435        let table_desc = build_table_description_json(
436            &table.arn,
437            &table.key_schema,
438            &table.attribute_definitions,
439            &table.provisioned_throughput,
440            &table.gsi,
441            &table.lsi,
442            &table.billing_mode,
443            table.created_at,
444            table.item_count,
445            table.size_bytes,
446            &table.status,
447        );
448
449        Self::ok_json(json!({ "TableDescription": table_desc }))
450    }
451
452    // ── Item Operations ─────────────────────────────────────────────────
453
454    fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
455        // --- Parse request body and expression attributes WITHOUT holding any lock ---
456        let body = Self::parse_body(req)?;
457        let table_name = require_str(&body, "TableName")?;
458        let item = require_object(&body, "Item")?;
459        let condition = body["ConditionExpression"].as_str().map(|s| s.to_string());
460        let expr_attr_names = parse_expression_attribute_names(&body);
461        let expr_attr_values = parse_expression_attribute_values(&body);
462        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE").to_string();
463
464        // --- Acquire write lock ONLY for validation + mutation ---
465        // Capture kinesis delivery info alongside the return value
466        let (old_item, kinesis_info) = {
467            let mut state = self.state.write();
468            let region = state.region.clone();
469            let table = get_table_mut(&mut state.tables, table_name)?;
470
471            validate_key_in_item(table, &item)?;
472
473            let key = extract_key(table, &item);
474            let existing_idx = table.find_item_index(&key);
475
476            if let Some(ref cond) = condition {
477                let existing = existing_idx.map(|i| &table.items[i]);
478                evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
479            }
480
481            let old_item_for_return = if return_values == "ALL_OLD" {
482                existing_idx.map(|i| table.items[i].clone())
483            } else {
484                None
485            };
486
487            // Capture old item for stream/kinesis if needed
488            let needs_change_capture = table.stream_enabled
489                || table
490                    .kinesis_destinations
491                    .iter()
492                    .any(|d| d.destination_status == "ACTIVE");
493            let old_item_for_stream = if needs_change_capture {
494                existing_idx.map(|i| table.items[i].clone())
495            } else {
496                None
497            };
498
499            let is_modify = existing_idx.is_some();
500
501            if let Some(idx) = existing_idx {
502                table.items[idx] = item.clone();
503            } else {
504                table.items.push(item.clone());
505            }
506
507            table.record_item_access(&item);
508            table.recalculate_stats();
509
510            let event_name = if is_modify { "MODIFY" } else { "INSERT" };
511            let key = extract_key(table, &item);
512
513            // Generate stream record
514            if table.stream_enabled {
515                if let Some(record) = crate::streams::generate_stream_record(
516                    table,
517                    event_name,
518                    key.clone(),
519                    old_item_for_stream.clone(),
520                    Some(item.clone()),
521                    &region,
522                ) {
523                    crate::streams::add_stream_record(table, record);
524                }
525            }
526
527            // Capture kinesis delivery info (delivered after lock release)
528            let kinesis_info = if table
529                .kinesis_destinations
530                .iter()
531                .any(|d| d.destination_status == "ACTIVE")
532            {
533                Some((
534                    table.clone(),
535                    event_name.to_string(),
536                    key,
537                    old_item_for_stream,
538                    Some(item.clone()),
539                ))
540            } else {
541                None
542            };
543
544            (old_item_for_return, kinesis_info)
545        };
546        // --- Write lock released, build response ---
547
548        // Deliver to Kinesis destinations outside the lock
549        if let Some((table, event_name, keys, old_image, new_image)) = kinesis_info {
550            self.deliver_to_kinesis_destinations(
551                &table,
552                &event_name,
553                &keys,
554                old_image.as_ref(),
555                new_image.as_ref(),
556            );
557        }
558
559        let mut result = json!({});
560        if let Some(old) = old_item {
561            result["Attributes"] = json!(old);
562        }
563
564        Self::ok_json(result)
565    }
566
567    fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
568        // --- Parse request body WITHOUT holding any lock ---
569        let body = Self::parse_body(req)?;
570        let table_name = require_str(&body, "TableName")?;
571        let key = require_object(&body, "Key")?;
572
573        // --- Use a read lock for the lookup (allows concurrent GetItem calls) ---
574        let (result, needs_insights) = {
575            let state = self.state.read();
576            let table = get_table(&state.tables, table_name)?;
577            let needs_insights = table.contributor_insights_status == "ENABLED";
578
579            let result = match table.find_item_index(&key) {
580                Some(idx) => {
581                    let item = &table.items[idx];
582                    let projected = project_item(item, &body);
583                    json!({ "Item": projected })
584                }
585                None => json!({}),
586            };
587            (result, needs_insights)
588        };
589        // --- Read lock released ---
590
591        // Only acquire write lock if contributor insights tracking is enabled
592        if needs_insights {
593            let mut state = self.state.write();
594            if let Some(table) = state.tables.get_mut(table_name) {
595                table.record_key_access(&key);
596            }
597        }
598
599        Self::ok_json(result)
600    }
601
602    fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
603        let body = Self::parse_body(req)?;
604
605        validate_optional_enum_value(
606            "conditionalOperator",
607            &body["ConditionalOperator"],
608            &["AND", "OR"],
609        )?;
610        validate_optional_enum_value(
611            "returnConsumedCapacity",
612            &body["ReturnConsumedCapacity"],
613            &["INDEXES", "TOTAL", "NONE"],
614        )?;
615        validate_optional_enum_value(
616            "returnValues",
617            &body["ReturnValues"],
618            &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
619        )?;
620        validate_optional_enum_value(
621            "returnItemCollectionMetrics",
622            &body["ReturnItemCollectionMetrics"],
623            &["SIZE", "NONE"],
624        )?;
625        validate_optional_enum_value(
626            "returnValuesOnConditionCheckFailure",
627            &body["ReturnValuesOnConditionCheckFailure"],
628            &["ALL_OLD", "NONE"],
629        )?;
630
631        let table_name = require_str(&body, "TableName")?;
632        let key = require_object(&body, "Key")?;
633
634        let (result, kinesis_info) = {
635            let mut state = self.state.write();
636            let region = state.region.clone();
637            let table = get_table_mut(&mut state.tables, table_name)?;
638
639            let condition = body["ConditionExpression"].as_str();
640            let expr_attr_names = parse_expression_attribute_names(&body);
641            let expr_attr_values = parse_expression_attribute_values(&body);
642
643            let existing_idx = table.find_item_index(&key);
644
645            if let Some(cond) = condition {
646                let existing = existing_idx.map(|i| &table.items[i]);
647                evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
648            }
649
650            let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
651
652            let mut result = json!({});
653            let mut kinesis_info = None;
654
655            if let Some(idx) = existing_idx {
656                let old_item = table.items[idx].clone();
657                if return_values == "ALL_OLD" {
658                    result["Attributes"] = json!(old_item.clone());
659                }
660
661                // Generate stream record before removing
662                if table.stream_enabled {
663                    if let Some(record) = crate::streams::generate_stream_record(
664                        table,
665                        "REMOVE",
666                        key.clone(),
667                        Some(old_item.clone()),
668                        None,
669                        &region,
670                    ) {
671                        crate::streams::add_stream_record(table, record);
672                    }
673                }
674
675                // Capture kinesis delivery info
676                if table
677                    .kinesis_destinations
678                    .iter()
679                    .any(|d| d.destination_status == "ACTIVE")
680                {
681                    kinesis_info = Some((table.clone(), key.clone(), Some(old_item)));
682                }
683
684                table.items.remove(idx);
685                table.recalculate_stats();
686            }
687
688            let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
689            let return_icm = body["ReturnItemCollectionMetrics"]
690                .as_str()
691                .unwrap_or("NONE");
692
693            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
694                result["ConsumedCapacity"] = json!({
695                    "TableName": table_name,
696                    "CapacityUnits": 1.0,
697                });
698            }
699
700            if return_icm == "SIZE" {
701                result["ItemCollectionMetrics"] = json!({});
702            }
703
704            (result, kinesis_info)
705        };
706
707        // Deliver to Kinesis destinations outside the lock
708        if let Some((table, keys, old_image)) = kinesis_info {
709            self.deliver_to_kinesis_destinations(&table, "REMOVE", &keys, old_image.as_ref(), None);
710        }
711
712        Self::ok_json(result)
713    }
714
715    fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
716        let body = Self::parse_body(req)?;
717        let table_name = require_str(&body, "TableName")?;
718        let key = require_object(&body, "Key")?;
719
720        let mut state = self.state.write();
721        let region = state.region.clone();
722        let table = get_table_mut(&mut state.tables, table_name)?;
723
724        validate_key_attributes_in_key(table, &key)?;
725
726        let condition = body["ConditionExpression"].as_str();
727        let expr_attr_names = parse_expression_attribute_names(&body);
728        let expr_attr_values = parse_expression_attribute_values(&body);
729        let update_expression = body["UpdateExpression"].as_str();
730
731        let existing_idx = table.find_item_index(&key);
732
733        if let Some(cond) = condition {
734            let existing = existing_idx.map(|i| &table.items[i]);
735            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
736        }
737
738        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
739
740        let is_insert = existing_idx.is_none();
741        let idx = match existing_idx {
742            Some(i) => i,
743            None => {
744                let mut new_item = HashMap::new();
745                for (k, v) in &key {
746                    new_item.insert(k.clone(), v.clone());
747                }
748                table.items.push(new_item);
749                table.items.len() - 1
750            }
751        };
752
753        // Capture old item for stream/kinesis (before update)
754        let needs_change_capture = table.stream_enabled
755            || table
756                .kinesis_destinations
757                .iter()
758                .any(|d| d.destination_status == "ACTIVE");
759        let old_item_for_stream = if needs_change_capture {
760            Some(table.items[idx].clone())
761        } else {
762            None
763        };
764
765        let old_item = if return_values == "ALL_OLD" {
766            Some(table.items[idx].clone())
767        } else {
768            None
769        };
770
771        if let Some(expr) = update_expression {
772            apply_update_expression(
773                &mut table.items[idx],
774                expr,
775                &expr_attr_names,
776                &expr_attr_values,
777            )?;
778        }
779
780        let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
781            Some(table.items[idx].clone())
782        } else {
783            None
784        };
785
786        let event_name = if is_insert { "INSERT" } else { "MODIFY" };
787        let new_item_for_stream = table.items[idx].clone();
788
789        // Generate stream record after update
790        if table.stream_enabled {
791            if let Some(record) = crate::streams::generate_stream_record(
792                table,
793                event_name,
794                key.clone(),
795                old_item_for_stream.clone(),
796                Some(new_item_for_stream.clone()),
797                &region,
798            ) {
799                crate::streams::add_stream_record(table, record);
800            }
801        }
802
803        // Capture kinesis delivery info
804        let kinesis_info = if table
805            .kinesis_destinations
806            .iter()
807            .any(|d| d.destination_status == "ACTIVE")
808        {
809            Some((
810                table.clone(),
811                event_name.to_string(),
812                key.clone(),
813                old_item_for_stream,
814                Some(new_item_for_stream),
815            ))
816        } else {
817            None
818        };
819
820        table.recalculate_stats();
821
822        // Release the write lock (drop `state`)
823        drop(state);
824
825        // Deliver to Kinesis destinations outside the lock
826        if let Some((tbl, ev, keys, old_image, new_image)) = kinesis_info {
827            self.deliver_to_kinesis_destinations(
828                &tbl,
829                &ev,
830                &keys,
831                old_image.as_ref(),
832                new_image.as_ref(),
833            );
834        }
835
836        let mut result = json!({});
837        if let Some(old) = old_item {
838            result["Attributes"] = json!(old);
839        } else if let Some(new) = new_item {
840            result["Attributes"] = json!(new);
841        }
842
843        Self::ok_json(result)
844    }
845
846    // ── Query & Scan ────────────────────────────────────────────────────
847
848    fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
849        let body = Self::parse_body(req)?;
850        let table_name = require_str(&body, "TableName")?;
851
852        let state = self.state.read();
853        let table = get_table(&state.tables, table_name)?;
854
855        let expr_attr_names = parse_expression_attribute_names(&body);
856        let expr_attr_values = parse_expression_attribute_values(&body);
857
858        let key_condition = body["KeyConditionExpression"].as_str();
859        let filter_expression = body["FilterExpression"].as_str();
860        let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
861        let limit = body["Limit"].as_i64().map(|l| l as usize);
862        let index_name = body["IndexName"].as_str();
863        let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
864            parse_key_map(&body["ExclusiveStartKey"]);
865
866        let (items_to_scan, hash_key_name, range_key_name): (
867            &[HashMap<String, AttributeValue>],
868            String,
869            Option<String>,
870        ) = if let Some(idx_name) = index_name {
871            if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
872                let hk = gsi
873                    .key_schema
874                    .iter()
875                    .find(|k| k.key_type == "HASH")
876                    .map(|k| k.attribute_name.clone())
877                    .unwrap_or_default();
878                let rk = gsi
879                    .key_schema
880                    .iter()
881                    .find(|k| k.key_type == "RANGE")
882                    .map(|k| k.attribute_name.clone());
883                (&table.items, hk, rk)
884            } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
885                let hk = lsi
886                    .key_schema
887                    .iter()
888                    .find(|k| k.key_type == "HASH")
889                    .map(|k| k.attribute_name.clone())
890                    .unwrap_or_default();
891                let rk = lsi
892                    .key_schema
893                    .iter()
894                    .find(|k| k.key_type == "RANGE")
895                    .map(|k| k.attribute_name.clone());
896                (&table.items, hk, rk)
897            } else {
898                return Err(AwsServiceError::aws_error(
899                    StatusCode::BAD_REQUEST,
900                    "ValidationException",
901                    format!("The table does not have the specified index: {idx_name}"),
902                ));
903            }
904        } else {
905            (
906                &table.items[..],
907                table.hash_key_name().to_string(),
908                table.range_key_name().map(|s| s.to_string()),
909            )
910        };
911
912        let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
913            .iter()
914            .filter(|item| {
915                if let Some(kc) = key_condition {
916                    evaluate_key_condition(
917                        kc,
918                        item,
919                        &hash_key_name,
920                        range_key_name.as_deref(),
921                        &expr_attr_names,
922                        &expr_attr_values,
923                    )
924                } else {
925                    true
926                }
927            })
928            .collect();
929
930        if let Some(ref rk) = range_key_name {
931            matched.sort_by(|a, b| {
932                let av = a.get(rk.as_str());
933                let bv = b.get(rk.as_str());
934                compare_attribute_values(av, bv)
935            });
936            if !scan_forward {
937                matched.reverse();
938            }
939        }
940
941        // For GSI queries, we need the table's primary key attributes to uniquely
942        // identify items (GSI keys are not unique).
943        let table_pk_hash = table.hash_key_name().to_string();
944        let table_pk_range = table.range_key_name().map(|s| s.to_string());
945        let is_gsi_query = index_name.is_some()
946            && (hash_key_name != table_pk_hash
947                || range_key_name.as_deref() != table_pk_range.as_deref());
948
949        // Apply ExclusiveStartKey: skip items up to and including the start key.
950        // For GSI queries the start key contains both index keys and table PK, so
951        // we must match on ALL of them to find the exact item.
952        if let Some(ref start_key) = exclusive_start_key {
953            if let Some(pos) = matched.iter().position(|item| {
954                let index_match =
955                    item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref());
956                if is_gsi_query {
957                    index_match
958                        && item_matches_key(
959                            item,
960                            start_key,
961                            &table_pk_hash,
962                            table_pk_range.as_deref(),
963                        )
964                } else {
965                    index_match
966                }
967            }) {
968                matched = matched.split_off(pos + 1);
969            }
970        }
971
972        if let Some(filter) = filter_expression {
973            matched.retain(|item| {
974                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
975            });
976        }
977
978        let scanned_count = matched.len();
979
980        let has_more = if let Some(lim) = limit {
981            let more = matched.len() > lim;
982            matched.truncate(lim);
983            more
984        } else {
985            false
986        };
987
988        // Build LastEvaluatedKey from the last returned item if there are more results.
989        // For GSI queries, include both the index keys and the table's primary key
990        // so the item can be uniquely identified on resume.
991        let last_evaluated_key = if has_more {
992            matched.last().map(|item| {
993                let mut key =
994                    extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref());
995                if is_gsi_query {
996                    let table_key =
997                        extract_key_for_schema(item, &table_pk_hash, table_pk_range.as_deref());
998                    key.extend(table_key);
999                }
1000                key
1001            })
1002        } else {
1003            None
1004        };
1005
1006        // Collect partition key values for contributor insights
1007        let insights_enabled = table.contributor_insights_status == "ENABLED";
1008        let pk_name = table.hash_key_name().to_string();
1009        let accessed_keys: Vec<String> = if insights_enabled {
1010            matched
1011                .iter()
1012                .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1013                .collect()
1014        } else {
1015            Vec::new()
1016        };
1017
1018        let items: Vec<Value> = matched
1019            .iter()
1020            .map(|item| {
1021                let projected = project_item(item, &body);
1022                json!(projected)
1023            })
1024            .collect();
1025
1026        let mut result = json!({
1027            "Items": items,
1028            "Count": items.len(),
1029            "ScannedCount": scanned_count,
1030        });
1031
1032        if let Some(lek) = last_evaluated_key {
1033            result["LastEvaluatedKey"] = json!(lek);
1034        }
1035
1036        drop(state);
1037
1038        if !accessed_keys.is_empty() {
1039            let mut state = self.state.write();
1040            if let Some(table) = state.tables.get_mut(table_name) {
1041                // Re-check insights status after acquiring write lock in case it
1042                // was disabled between the read and write lock acquisitions.
1043                if table.contributor_insights_status == "ENABLED" {
1044                    for key_str in accessed_keys {
1045                        *table
1046                            .contributor_insights_counters
1047                            .entry(key_str)
1048                            .or_insert(0) += 1;
1049                    }
1050                }
1051            }
1052        }
1053
1054        Self::ok_json(result)
1055    }
1056
1057    fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1058        let body = Self::parse_body(req)?;
1059        let table_name = require_str(&body, "TableName")?;
1060
1061        let state = self.state.read();
1062        let table = get_table(&state.tables, table_name)?;
1063
1064        let expr_attr_names = parse_expression_attribute_names(&body);
1065        let expr_attr_values = parse_expression_attribute_values(&body);
1066        let filter_expression = body["FilterExpression"].as_str();
1067        let limit = body["Limit"].as_i64().map(|l| l as usize);
1068        let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
1069            parse_key_map(&body["ExclusiveStartKey"]);
1070
1071        let hash_key_name = table.hash_key_name().to_string();
1072        let range_key_name = table.range_key_name().map(|s| s.to_string());
1073
1074        let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
1075
1076        // Apply ExclusiveStartKey: skip items up to and including the start key
1077        if let Some(ref start_key) = exclusive_start_key {
1078            if let Some(pos) = matched.iter().position(|item| {
1079                item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref())
1080            }) {
1081                matched = matched.split_off(pos + 1);
1082            }
1083        }
1084
1085        let scanned_count = matched.len();
1086
1087        if let Some(filter) = filter_expression {
1088            matched.retain(|item| {
1089                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
1090            });
1091        }
1092
1093        let has_more = if let Some(lim) = limit {
1094            let more = matched.len() > lim;
1095            matched.truncate(lim);
1096            more
1097        } else {
1098            false
1099        };
1100
1101        // Build LastEvaluatedKey from the last returned item if there are more results
1102        let last_evaluated_key = if has_more {
1103            matched
1104                .last()
1105                .map(|item| extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref()))
1106        } else {
1107            None
1108        };
1109
1110        // Collect partition key values for contributor insights
1111        let insights_enabled = table.contributor_insights_status == "ENABLED";
1112        let pk_name = table.hash_key_name().to_string();
1113        let accessed_keys: Vec<String> = if insights_enabled {
1114            matched
1115                .iter()
1116                .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1117                .collect()
1118        } else {
1119            Vec::new()
1120        };
1121
1122        let items: Vec<Value> = matched
1123            .iter()
1124            .map(|item| {
1125                let projected = project_item(item, &body);
1126                json!(projected)
1127            })
1128            .collect();
1129
1130        let mut result = json!({
1131            "Items": items,
1132            "Count": items.len(),
1133            "ScannedCount": scanned_count,
1134        });
1135
1136        if let Some(lek) = last_evaluated_key {
1137            result["LastEvaluatedKey"] = json!(lek);
1138        }
1139
1140        drop(state);
1141
1142        if !accessed_keys.is_empty() {
1143            let mut state = self.state.write();
1144            if let Some(table) = state.tables.get_mut(table_name) {
1145                // Re-check insights status after acquiring write lock in case it
1146                // was disabled between the read and write lock acquisitions.
1147                if table.contributor_insights_status == "ENABLED" {
1148                    for key_str in accessed_keys {
1149                        *table
1150                            .contributor_insights_counters
1151                            .entry(key_str)
1152                            .or_insert(0) += 1;
1153                    }
1154                }
1155            }
1156        }
1157
1158        Self::ok_json(result)
1159    }
1160
1161    // ── Batch Operations ────────────────────────────────────────────────
1162
1163    fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1164        let body = Self::parse_body(req)?;
1165
1166        validate_optional_enum_value(
1167            "returnConsumedCapacity",
1168            &body["ReturnConsumedCapacity"],
1169            &["INDEXES", "TOTAL", "NONE"],
1170        )?;
1171
1172        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1173
1174        let request_items = body["RequestItems"]
1175            .as_object()
1176            .ok_or_else(|| {
1177                AwsServiceError::aws_error(
1178                    StatusCode::BAD_REQUEST,
1179                    "ValidationException",
1180                    "RequestItems is required",
1181                )
1182            })?
1183            .clone();
1184
1185        let state = self.state.read();
1186        let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
1187        let mut consumed_capacity: Vec<Value> = Vec::new();
1188
1189        for (table_name, params) in &request_items {
1190            let table = get_table(&state.tables, table_name)?;
1191            let keys = params["Keys"].as_array().ok_or_else(|| {
1192                AwsServiceError::aws_error(
1193                    StatusCode::BAD_REQUEST,
1194                    "ValidationException",
1195                    "Keys is required",
1196                )
1197            })?;
1198
1199            let mut items = Vec::new();
1200            for key_val in keys {
1201                let key: HashMap<String, AttributeValue> =
1202                    serde_json::from_value(key_val.clone()).unwrap_or_default();
1203                if let Some(idx) = table.find_item_index(&key) {
1204                    items.push(json!(table.items[idx]));
1205                }
1206            }
1207            responses.insert(table_name.clone(), items);
1208
1209            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1210                consumed_capacity.push(json!({
1211                    "TableName": table_name,
1212                    "CapacityUnits": 1.0,
1213                }));
1214            }
1215        }
1216
1217        let mut result = json!({
1218            "Responses": responses,
1219            "UnprocessedKeys": {},
1220        });
1221
1222        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1223            result["ConsumedCapacity"] = json!(consumed_capacity);
1224        }
1225
1226        Self::ok_json(result)
1227    }
1228
1229    fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1230        let body = Self::parse_body(req)?;
1231
1232        validate_optional_enum_value(
1233            "returnConsumedCapacity",
1234            &body["ReturnConsumedCapacity"],
1235            &["INDEXES", "TOTAL", "NONE"],
1236        )?;
1237        validate_optional_enum_value(
1238            "returnItemCollectionMetrics",
1239            &body["ReturnItemCollectionMetrics"],
1240            &["SIZE", "NONE"],
1241        )?;
1242
1243        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1244        let return_icm = body["ReturnItemCollectionMetrics"]
1245            .as_str()
1246            .unwrap_or("NONE");
1247
1248        let request_items = body["RequestItems"]
1249            .as_object()
1250            .ok_or_else(|| {
1251                AwsServiceError::aws_error(
1252                    StatusCode::BAD_REQUEST,
1253                    "ValidationException",
1254                    "RequestItems is required",
1255                )
1256            })?
1257            .clone();
1258
1259        let mut state = self.state.write();
1260        let mut consumed_capacity: Vec<Value> = Vec::new();
1261        let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
1262
1263        for (table_name, requests) in &request_items {
1264            let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
1265                AwsServiceError::aws_error(
1266                    StatusCode::BAD_REQUEST,
1267                    "ResourceNotFoundException",
1268                    format!("Requested resource not found: Table: {table_name} not found"),
1269                )
1270            })?;
1271
1272            let reqs = requests.as_array().ok_or_else(|| {
1273                AwsServiceError::aws_error(
1274                    StatusCode::BAD_REQUEST,
1275                    "ValidationException",
1276                    "Request list must be an array",
1277                )
1278            })?;
1279
1280            for request in reqs {
1281                if let Some(put_req) = request.get("PutRequest") {
1282                    let item: HashMap<String, AttributeValue> =
1283                        serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
1284                    let key = extract_key(table, &item);
1285                    if let Some(idx) = table.find_item_index(&key) {
1286                        table.items[idx] = item;
1287                    } else {
1288                        table.items.push(item);
1289                    }
1290                } else if let Some(del_req) = request.get("DeleteRequest") {
1291                    let key: HashMap<String, AttributeValue> =
1292                        serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
1293                    if let Some(idx) = table.find_item_index(&key) {
1294                        table.items.remove(idx);
1295                    }
1296                }
1297            }
1298
1299            table.recalculate_stats();
1300
1301            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1302                consumed_capacity.push(json!({
1303                    "TableName": table_name,
1304                    "CapacityUnits": 1.0,
1305                }));
1306            }
1307
1308            if return_icm == "SIZE" {
1309                item_collection_metrics.insert(table_name.clone(), vec![]);
1310            }
1311        }
1312
1313        let mut result = json!({
1314            "UnprocessedItems": {},
1315        });
1316
1317        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1318            result["ConsumedCapacity"] = json!(consumed_capacity);
1319        }
1320
1321        if return_icm == "SIZE" {
1322            result["ItemCollectionMetrics"] = json!(item_collection_metrics);
1323        }
1324
1325        Self::ok_json(result)
1326    }
1327
1328    // ── Tags ────────────────────────────────────────────────────────────
1329
1330    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1331        let body = Self::parse_body(req)?;
1332        let resource_arn = require_str(&body, "ResourceArn")?;
1333        validate_required("Tags", &body["Tags"])?;
1334
1335        let mut state = self.state.write();
1336        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1337
1338        fakecloud_core::tags::apply_tags(&mut table.tags, &body, "Tags", "Key", "Value").map_err(
1339            |f| {
1340                AwsServiceError::aws_error(
1341                    StatusCode::BAD_REQUEST,
1342                    "ValidationException",
1343                    format!("{f} must be a list"),
1344                )
1345            },
1346        )?;
1347
1348        Self::ok_json(json!({}))
1349    }
1350
1351    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1352        let body = Self::parse_body(req)?;
1353        let resource_arn = require_str(&body, "ResourceArn")?;
1354        validate_required("TagKeys", &body["TagKeys"])?;
1355
1356        let mut state = self.state.write();
1357        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1358
1359        fakecloud_core::tags::remove_tags(&mut table.tags, &body, "TagKeys").map_err(|f| {
1360            AwsServiceError::aws_error(
1361                StatusCode::BAD_REQUEST,
1362                "ValidationException",
1363                format!("{f} must be a list"),
1364            )
1365        })?;
1366
1367        Self::ok_json(json!({}))
1368    }
1369
1370    fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1371        let body = Self::parse_body(req)?;
1372        let resource_arn = require_str(&body, "ResourceArn")?;
1373
1374        let state = self.state.read();
1375        let table = find_table_by_arn(&state.tables, resource_arn)?;
1376
1377        let tags = fakecloud_core::tags::tags_to_json(&table.tags, "Key", "Value");
1378
1379        Self::ok_json(json!({ "Tags": tags }))
1380    }
1381
1382    // ── Transactions ────────────────────────────────────────────────────
1383
1384    fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1385        let body = Self::parse_body(req)?;
1386        validate_optional_enum_value(
1387            "returnConsumedCapacity",
1388            &body["ReturnConsumedCapacity"],
1389            &["INDEXES", "TOTAL", "NONE"],
1390        )?;
1391        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1392            AwsServiceError::aws_error(
1393                StatusCode::BAD_REQUEST,
1394                "ValidationException",
1395                "TransactItems is required",
1396            )
1397        })?;
1398
1399        let state = self.state.read();
1400        let mut responses: Vec<Value> = Vec::new();
1401
1402        for ti in transact_items {
1403            let get = &ti["Get"];
1404            let table_name = get["TableName"].as_str().ok_or_else(|| {
1405                AwsServiceError::aws_error(
1406                    StatusCode::BAD_REQUEST,
1407                    "ValidationException",
1408                    "TableName is required in Get",
1409                )
1410            })?;
1411            let key: HashMap<String, AttributeValue> =
1412                serde_json::from_value(get["Key"].clone()).unwrap_or_default();
1413
1414            let table = get_table(&state.tables, table_name)?;
1415            match table.find_item_index(&key) {
1416                Some(idx) => {
1417                    responses.push(json!({ "Item": table.items[idx] }));
1418                }
1419                None => {
1420                    responses.push(json!({}));
1421                }
1422            }
1423        }
1424
1425        Self::ok_json(json!({ "Responses": responses }))
1426    }
1427
1428    fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1429        let body = Self::parse_body(req)?;
1430        validate_optional_string_length(
1431            "clientRequestToken",
1432            body["ClientRequestToken"].as_str(),
1433            1,
1434            36,
1435        )?;
1436        validate_optional_enum_value(
1437            "returnConsumedCapacity",
1438            &body["ReturnConsumedCapacity"],
1439            &["INDEXES", "TOTAL", "NONE"],
1440        )?;
1441        validate_optional_enum_value(
1442            "returnItemCollectionMetrics",
1443            &body["ReturnItemCollectionMetrics"],
1444            &["SIZE", "NONE"],
1445        )?;
1446        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1447            AwsServiceError::aws_error(
1448                StatusCode::BAD_REQUEST,
1449                "ValidationException",
1450                "TransactItems is required",
1451            )
1452        })?;
1453
1454        let mut state = self.state.write();
1455
1456        // First pass: validate all conditions
1457        let mut cancellation_reasons: Vec<Value> = Vec::new();
1458        let mut any_failed = false;
1459
1460        for ti in transact_items {
1461            if let Some(put) = ti.get("Put") {
1462                let table_name = put["TableName"].as_str().unwrap_or_default();
1463                let item: HashMap<String, AttributeValue> =
1464                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1465                let condition = put["ConditionExpression"].as_str();
1466
1467                if let Some(cond) = condition {
1468                    let table = get_table(&state.tables, table_name)?;
1469                    let expr_attr_names = parse_expression_attribute_names(put);
1470                    let expr_attr_values = parse_expression_attribute_values(put);
1471                    let key = extract_key(table, &item);
1472                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1473                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1474                        .is_err()
1475                    {
1476                        cancellation_reasons.push(json!({
1477                            "Code": "ConditionalCheckFailed",
1478                            "Message": "The conditional request failed"
1479                        }));
1480                        any_failed = true;
1481                        continue;
1482                    }
1483                }
1484                cancellation_reasons.push(json!({ "Code": "None" }));
1485            } else if let Some(delete) = ti.get("Delete") {
1486                let table_name = delete["TableName"].as_str().unwrap_or_default();
1487                let key: HashMap<String, AttributeValue> =
1488                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1489                let condition = delete["ConditionExpression"].as_str();
1490
1491                if let Some(cond) = condition {
1492                    let table = get_table(&state.tables, table_name)?;
1493                    let expr_attr_names = parse_expression_attribute_names(delete);
1494                    let expr_attr_values = parse_expression_attribute_values(delete);
1495                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1496                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1497                        .is_err()
1498                    {
1499                        cancellation_reasons.push(json!({
1500                            "Code": "ConditionalCheckFailed",
1501                            "Message": "The conditional request failed"
1502                        }));
1503                        any_failed = true;
1504                        continue;
1505                    }
1506                }
1507                cancellation_reasons.push(json!({ "Code": "None" }));
1508            } else if let Some(update) = ti.get("Update") {
1509                let table_name = update["TableName"].as_str().unwrap_or_default();
1510                let key: HashMap<String, AttributeValue> =
1511                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1512                let condition = update["ConditionExpression"].as_str();
1513
1514                if let Some(cond) = condition {
1515                    let table = get_table(&state.tables, table_name)?;
1516                    let expr_attr_names = parse_expression_attribute_names(update);
1517                    let expr_attr_values = parse_expression_attribute_values(update);
1518                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1519                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1520                        .is_err()
1521                    {
1522                        cancellation_reasons.push(json!({
1523                            "Code": "ConditionalCheckFailed",
1524                            "Message": "The conditional request failed"
1525                        }));
1526                        any_failed = true;
1527                        continue;
1528                    }
1529                }
1530                cancellation_reasons.push(json!({ "Code": "None" }));
1531            } else if let Some(check) = ti.get("ConditionCheck") {
1532                let table_name = check["TableName"].as_str().unwrap_or_default();
1533                let key: HashMap<String, AttributeValue> =
1534                    serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1535                let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1536
1537                let table = get_table(&state.tables, table_name)?;
1538                let expr_attr_names = parse_expression_attribute_names(check);
1539                let expr_attr_values = parse_expression_attribute_values(check);
1540                let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1541                if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1542                {
1543                    cancellation_reasons.push(json!({
1544                        "Code": "ConditionalCheckFailed",
1545                        "Message": "The conditional request failed"
1546                    }));
1547                    any_failed = true;
1548                    continue;
1549                }
1550                cancellation_reasons.push(json!({ "Code": "None" }));
1551            } else {
1552                cancellation_reasons.push(json!({ "Code": "None" }));
1553            }
1554        }
1555
1556        if any_failed {
1557            let error_body = json!({
1558                "__type": "TransactionCanceledException",
1559                "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1560                "CancellationReasons": cancellation_reasons
1561            });
1562            return Ok(AwsResponse::json(
1563                StatusCode::BAD_REQUEST,
1564                serde_json::to_vec(&error_body).unwrap(),
1565            ));
1566        }
1567
1568        // Second pass: apply all writes
1569        for ti in transact_items {
1570            if let Some(put) = ti.get("Put") {
1571                let table_name = put["TableName"].as_str().unwrap_or_default();
1572                let item: HashMap<String, AttributeValue> =
1573                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1574                let table = get_table_mut(&mut state.tables, table_name)?;
1575                let key = extract_key(table, &item);
1576                if let Some(idx) = table.find_item_index(&key) {
1577                    table.items[idx] = item;
1578                } else {
1579                    table.items.push(item);
1580                }
1581                table.recalculate_stats();
1582            } else if let Some(delete) = ti.get("Delete") {
1583                let table_name = delete["TableName"].as_str().unwrap_or_default();
1584                let key: HashMap<String, AttributeValue> =
1585                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1586                let table = get_table_mut(&mut state.tables, table_name)?;
1587                if let Some(idx) = table.find_item_index(&key) {
1588                    table.items.remove(idx);
1589                }
1590                table.recalculate_stats();
1591            } else if let Some(update) = ti.get("Update") {
1592                let table_name = update["TableName"].as_str().unwrap_or_default();
1593                let key: HashMap<String, AttributeValue> =
1594                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1595                let update_expression = update["UpdateExpression"].as_str();
1596                let expr_attr_names = parse_expression_attribute_names(update);
1597                let expr_attr_values = parse_expression_attribute_values(update);
1598
1599                let table = get_table_mut(&mut state.tables, table_name)?;
1600                let idx = match table.find_item_index(&key) {
1601                    Some(i) => i,
1602                    None => {
1603                        let mut new_item = HashMap::new();
1604                        for (k, v) in &key {
1605                            new_item.insert(k.clone(), v.clone());
1606                        }
1607                        table.items.push(new_item);
1608                        table.items.len() - 1
1609                    }
1610                };
1611
1612                if let Some(expr) = update_expression {
1613                    apply_update_expression(
1614                        &mut table.items[idx],
1615                        expr,
1616                        &expr_attr_names,
1617                        &expr_attr_values,
1618                    )?;
1619                }
1620                table.recalculate_stats();
1621            }
1622            // ConditionCheck: no write needed
1623        }
1624
1625        Self::ok_json(json!({}))
1626    }
1627
1628    fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1629        let body = Self::parse_body(req)?;
1630        let statement = require_str(&body, "Statement")?;
1631        let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1632
1633        execute_partiql_statement(&self.state, statement, &parameters)
1634    }
1635
1636    fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1637        let body = Self::parse_body(req)?;
1638        validate_optional_enum_value(
1639            "returnConsumedCapacity",
1640            &body["ReturnConsumedCapacity"],
1641            &["INDEXES", "TOTAL", "NONE"],
1642        )?;
1643        let statements = body["Statements"].as_array().ok_or_else(|| {
1644            AwsServiceError::aws_error(
1645                StatusCode::BAD_REQUEST,
1646                "ValidationException",
1647                "Statements is required",
1648            )
1649        })?;
1650
1651        let mut responses: Vec<Value> = Vec::new();
1652        for stmt_obj in statements {
1653            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1654            let parameters = stmt_obj["Parameters"]
1655                .as_array()
1656                .cloned()
1657                .unwrap_or_default();
1658
1659            match execute_partiql_statement(&self.state, statement, &parameters) {
1660                Ok(resp) => {
1661                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1662                    responses.push(resp_body);
1663                }
1664                Err(e) => {
1665                    responses.push(json!({
1666                        "Error": {
1667                            "Code": "ValidationException",
1668                            "Message": e.to_string()
1669                        }
1670                    }));
1671                }
1672            }
1673        }
1674
1675        Self::ok_json(json!({ "Responses": responses }))
1676    }
1677
1678    fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1679        let body = Self::parse_body(req)?;
1680        validate_optional_string_length(
1681            "clientRequestToken",
1682            body["ClientRequestToken"].as_str(),
1683            1,
1684            36,
1685        )?;
1686        validate_optional_enum_value(
1687            "returnConsumedCapacity",
1688            &body["ReturnConsumedCapacity"],
1689            &["INDEXES", "TOTAL", "NONE"],
1690        )?;
1691        let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1692            AwsServiceError::aws_error(
1693                StatusCode::BAD_REQUEST,
1694                "ValidationException",
1695                "TransactStatements is required",
1696            )
1697        })?;
1698
1699        // Collect all results; if any fail, return TransactionCanceledException
1700        let mut results: Vec<Result<Value, String>> = Vec::new();
1701        for stmt_obj in transact_statements {
1702            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1703            let parameters = stmt_obj["Parameters"]
1704                .as_array()
1705                .cloned()
1706                .unwrap_or_default();
1707
1708            match execute_partiql_statement(&self.state, statement, &parameters) {
1709                Ok(resp) => {
1710                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1711                    results.push(Ok(resp_body));
1712                }
1713                Err(e) => {
1714                    results.push(Err(e.to_string()));
1715                }
1716            }
1717        }
1718
1719        let any_failed = results.iter().any(|r| r.is_err());
1720        if any_failed {
1721            let reasons: Vec<Value> = results
1722                .iter()
1723                .map(|r| match r {
1724                    Ok(_) => json!({ "Code": "None" }),
1725                    Err(msg) => json!({
1726                        "Code": "ValidationException",
1727                        "Message": msg
1728                    }),
1729                })
1730                .collect();
1731            let error_body = json!({
1732                "__type": "TransactionCanceledException",
1733                "message": "Transaction cancelled due to validation errors",
1734                "CancellationReasons": reasons
1735            });
1736            return Ok(AwsResponse::json(
1737                StatusCode::BAD_REQUEST,
1738                serde_json::to_vec(&error_body).unwrap(),
1739            ));
1740        }
1741
1742        let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1743        Self::ok_json(json!({ "Responses": responses }))
1744    }
1745
1746    // ── TTL ─────────────────────────────────────────────────────────────
1747
1748    fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1749        let body = Self::parse_body(req)?;
1750        let table_name = require_str(&body, "TableName")?;
1751        let spec = &body["TimeToLiveSpecification"];
1752        let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1753            AwsServiceError::aws_error(
1754                StatusCode::BAD_REQUEST,
1755                "ValidationException",
1756                "TimeToLiveSpecification.AttributeName is required",
1757            )
1758        })?;
1759        let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1760            AwsServiceError::aws_error(
1761                StatusCode::BAD_REQUEST,
1762                "ValidationException",
1763                "TimeToLiveSpecification.Enabled is required",
1764            )
1765        })?;
1766
1767        let mut state = self.state.write();
1768        let table = get_table_mut(&mut state.tables, table_name)?;
1769
1770        if enabled {
1771            table.ttl_attribute = Some(attr_name.to_string());
1772            table.ttl_enabled = true;
1773        } else {
1774            table.ttl_enabled = false;
1775        }
1776
1777        Self::ok_json(json!({
1778            "TimeToLiveSpecification": {
1779                "AttributeName": attr_name,
1780                "Enabled": enabled
1781            }
1782        }))
1783    }
1784
1785    fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1786        let body = Self::parse_body(req)?;
1787        let table_name = require_str(&body, "TableName")?;
1788
1789        let state = self.state.read();
1790        let table = get_table(&state.tables, table_name)?;
1791
1792        let status = if table.ttl_enabled {
1793            "ENABLED"
1794        } else {
1795            "DISABLED"
1796        };
1797
1798        let mut desc = json!({
1799            "TimeToLiveDescription": {
1800                "TimeToLiveStatus": status
1801            }
1802        });
1803
1804        if let Some(ref attr) = table.ttl_attribute {
1805            desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1806        }
1807
1808        Self::ok_json(desc)
1809    }
1810
1811    // ── Resource Policies ───────────────────────────────────────────────
1812
1813    fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1814        let body = Self::parse_body(req)?;
1815        let resource_arn = require_str(&body, "ResourceArn")?;
1816        let policy = require_str(&body, "Policy")?;
1817
1818        let mut state = self.state.write();
1819        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1820        table.resource_policy = Some(policy.to_string());
1821
1822        let revision_id = uuid::Uuid::new_v4().to_string();
1823        Self::ok_json(json!({ "RevisionId": revision_id }))
1824    }
1825
1826    fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1827        let body = Self::parse_body(req)?;
1828        let resource_arn = require_str(&body, "ResourceArn")?;
1829
1830        let state = self.state.read();
1831        let table = find_table_by_arn(&state.tables, resource_arn)?;
1832
1833        match &table.resource_policy {
1834            Some(policy) => {
1835                let revision_id = uuid::Uuid::new_v4().to_string();
1836                Self::ok_json(json!({
1837                    "Policy": policy,
1838                    "RevisionId": revision_id
1839                }))
1840            }
1841            None => Self::ok_json(json!({ "Policy": null })),
1842        }
1843    }
1844
1845    fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1846        let body = Self::parse_body(req)?;
1847        let resource_arn = require_str(&body, "ResourceArn")?;
1848
1849        let mut state = self.state.write();
1850        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1851        table.resource_policy = None;
1852
1853        Self::ok_json(json!({}))
1854    }
1855
1856    // ── Stubs ──────────────────────────────────────────────────────────
1857
1858    fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1859        Self::ok_json(json!({
1860            "Endpoints": [{
1861                "Address": "dynamodb.us-east-1.amazonaws.com",
1862                "CachePeriodInMinutes": 1440
1863            }]
1864        }))
1865    }
1866
1867    fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1868        Self::ok_json(json!({
1869            "AccountMaxReadCapacityUnits": 80000,
1870            "AccountMaxWriteCapacityUnits": 80000,
1871            "TableMaxReadCapacityUnits": 40000,
1872            "TableMaxWriteCapacityUnits": 40000
1873        }))
1874    }
1875
1876    // ── Backups ────────────────────────────────────────────────────────
1877
1878    fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1879        let body = Self::parse_body(req)?;
1880        let table_name = require_str(&body, "TableName")?;
1881        let backup_name = require_str(&body, "BackupName")?;
1882
1883        let mut state = self.state.write();
1884        let table = get_table(&state.tables, table_name)?;
1885
1886        let backup_arn = format!(
1887            "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1888            state.region,
1889            state.account_id,
1890            table_name,
1891            Utc::now().format("%Y%m%d%H%M%S")
1892        );
1893        let now = Utc::now();
1894
1895        let backup = BackupDescription {
1896            backup_arn: backup_arn.clone(),
1897            backup_name: backup_name.to_string(),
1898            table_name: table_name.to_string(),
1899            table_arn: table.arn.clone(),
1900            backup_status: "AVAILABLE".to_string(),
1901            backup_type: "USER".to_string(),
1902            backup_creation_date: now,
1903            key_schema: table.key_schema.clone(),
1904            attribute_definitions: table.attribute_definitions.clone(),
1905            provisioned_throughput: table.provisioned_throughput.clone(),
1906            billing_mode: table.billing_mode.clone(),
1907            item_count: table.item_count,
1908            size_bytes: table.size_bytes,
1909            items: table.items.clone(),
1910        };
1911
1912        state.backups.insert(backup_arn.clone(), backup);
1913
1914        Self::ok_json(json!({
1915            "BackupDetails": {
1916                "BackupArn": backup_arn,
1917                "BackupName": backup_name,
1918                "BackupStatus": "AVAILABLE",
1919                "BackupType": "USER",
1920                "BackupCreationDateTime": now.timestamp() as f64,
1921                "BackupSizeBytes": 0
1922            }
1923        }))
1924    }
1925
1926    fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1927        let body = Self::parse_body(req)?;
1928        let backup_arn = require_str(&body, "BackupArn")?;
1929
1930        let mut state = self.state.write();
1931        let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1932            AwsServiceError::aws_error(
1933                StatusCode::BAD_REQUEST,
1934                "BackupNotFoundException",
1935                format!("Backup not found: {backup_arn}"),
1936            )
1937        })?;
1938
1939        Self::ok_json(json!({
1940            "BackupDescription": {
1941                "BackupDetails": {
1942                    "BackupArn": backup.backup_arn,
1943                    "BackupName": backup.backup_name,
1944                    "BackupStatus": "DELETED",
1945                    "BackupType": backup.backup_type,
1946                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1947                    "BackupSizeBytes": backup.size_bytes
1948                },
1949                "SourceTableDetails": {
1950                    "TableName": backup.table_name,
1951                    "TableArn": backup.table_arn,
1952                    "TableId": uuid::Uuid::new_v4().to_string(),
1953                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1954                        "AttributeName": ks.attribute_name,
1955                        "KeyType": ks.key_type
1956                    })).collect::<Vec<_>>(),
1957                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1958                    "ProvisionedThroughput": {
1959                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1960                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1961                    },
1962                    "ItemCount": backup.item_count,
1963                    "BillingMode": backup.billing_mode,
1964                    "TableSizeBytes": backup.size_bytes
1965                },
1966                "SourceTableFeatureDetails": {}
1967            }
1968        }))
1969    }
1970
1971    fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1972        let body = Self::parse_body(req)?;
1973        let backup_arn = require_str(&body, "BackupArn")?;
1974
1975        let state = self.state.read();
1976        let backup = state.backups.get(backup_arn).ok_or_else(|| {
1977            AwsServiceError::aws_error(
1978                StatusCode::BAD_REQUEST,
1979                "BackupNotFoundException",
1980                format!("Backup not found: {backup_arn}"),
1981            )
1982        })?;
1983
1984        Self::ok_json(json!({
1985            "BackupDescription": {
1986                "BackupDetails": {
1987                    "BackupArn": backup.backup_arn,
1988                    "BackupName": backup.backup_name,
1989                    "BackupStatus": backup.backup_status,
1990                    "BackupType": backup.backup_type,
1991                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1992                    "BackupSizeBytes": backup.size_bytes
1993                },
1994                "SourceTableDetails": {
1995                    "TableName": backup.table_name,
1996                    "TableArn": backup.table_arn,
1997                    "TableId": uuid::Uuid::new_v4().to_string(),
1998                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1999                        "AttributeName": ks.attribute_name,
2000                        "KeyType": ks.key_type
2001                    })).collect::<Vec<_>>(),
2002                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
2003                    "ProvisionedThroughput": {
2004                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
2005                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
2006                    },
2007                    "ItemCount": backup.item_count,
2008                    "BillingMode": backup.billing_mode,
2009                    "TableSizeBytes": backup.size_bytes
2010                },
2011                "SourceTableFeatureDetails": {}
2012            }
2013        }))
2014    }
2015
2016    fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2017        let body = Self::parse_body(req)?;
2018        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2019        validate_optional_string_length(
2020            "exclusiveStartBackupArn",
2021            body["ExclusiveStartBackupArn"].as_str(),
2022            37,
2023            1024,
2024        )?;
2025        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2026        validate_optional_enum_value(
2027            "backupType",
2028            &body["BackupType"],
2029            &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
2030        )?;
2031        let table_name = body["TableName"].as_str();
2032
2033        let state = self.state.read();
2034        let summaries: Vec<Value> = state
2035            .backups
2036            .values()
2037            .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
2038            .map(|b| {
2039                json!({
2040                    "TableName": b.table_name,
2041                    "TableArn": b.table_arn,
2042                    "BackupArn": b.backup_arn,
2043                    "BackupName": b.backup_name,
2044                    "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
2045                    "BackupStatus": b.backup_status,
2046                    "BackupType": b.backup_type,
2047                    "BackupSizeBytes": b.size_bytes
2048                })
2049            })
2050            .collect();
2051
2052        Self::ok_json(json!({
2053            "BackupSummaries": summaries
2054        }))
2055    }
2056
2057    fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2058        let body = Self::parse_body(req)?;
2059        let backup_arn = require_str(&body, "BackupArn")?;
2060        let target_table_name = require_str(&body, "TargetTableName")?;
2061
2062        let mut state = self.state.write();
2063        let backup = state.backups.get(backup_arn).ok_or_else(|| {
2064            AwsServiceError::aws_error(
2065                StatusCode::BAD_REQUEST,
2066                "BackupNotFoundException",
2067                format!("Backup not found: {backup_arn}"),
2068            )
2069        })?;
2070
2071        if state.tables.contains_key(target_table_name) {
2072            return Err(AwsServiceError::aws_error(
2073                StatusCode::BAD_REQUEST,
2074                "TableAlreadyExistsException",
2075                format!("Table already exists: {target_table_name}"),
2076            ));
2077        }
2078
2079        let now = Utc::now();
2080        let arn = format!(
2081            "arn:aws:dynamodb:{}:{}:table/{}",
2082            state.region, state.account_id, target_table_name
2083        );
2084
2085        let restored_items = backup.items.clone();
2086        let mut table = DynamoTable {
2087            name: target_table_name.to_string(),
2088            arn: arn.clone(),
2089            key_schema: backup.key_schema.clone(),
2090            attribute_definitions: backup.attribute_definitions.clone(),
2091            provisioned_throughput: backup.provisioned_throughput.clone(),
2092            items: restored_items,
2093            gsi: Vec::new(),
2094            lsi: Vec::new(),
2095            tags: HashMap::new(),
2096            created_at: now,
2097            status: "ACTIVE".to_string(),
2098            item_count: 0,
2099            size_bytes: 0,
2100            billing_mode: backup.billing_mode.clone(),
2101            ttl_attribute: None,
2102            ttl_enabled: false,
2103            resource_policy: None,
2104            pitr_enabled: false,
2105            kinesis_destinations: Vec::new(),
2106            contributor_insights_status: "DISABLED".to_string(),
2107            contributor_insights_counters: HashMap::new(),
2108            stream_enabled: false,
2109            stream_view_type: None,
2110            stream_arn: None,
2111            stream_records: Arc::new(RwLock::new(Vec::new())),
2112            sse_type: None,
2113            sse_kms_key_arn: None,
2114        };
2115        table.recalculate_stats();
2116
2117        let desc = build_table_description(&table);
2118        state.tables.insert(target_table_name.to_string(), table);
2119
2120        Self::ok_json(json!({
2121            "TableDescription": desc
2122        }))
2123    }
2124
2125    fn restore_table_to_point_in_time(
2126        &self,
2127        req: &AwsRequest,
2128    ) -> Result<AwsResponse, AwsServiceError> {
2129        let body = Self::parse_body(req)?;
2130        let target_table_name = require_str(&body, "TargetTableName")?;
2131        let source_table_name = body["SourceTableName"].as_str();
2132        let source_table_arn = body["SourceTableArn"].as_str();
2133
2134        let mut state = self.state.write();
2135
2136        // Resolve source table
2137        let source = if let Some(name) = source_table_name {
2138            get_table(&state.tables, name)?.clone()
2139        } else if let Some(arn) = source_table_arn {
2140            find_table_by_arn(&state.tables, arn)?.clone()
2141        } else {
2142            return Err(AwsServiceError::aws_error(
2143                StatusCode::BAD_REQUEST,
2144                "ValidationException",
2145                "SourceTableName or SourceTableArn is required",
2146            ));
2147        };
2148
2149        if state.tables.contains_key(target_table_name) {
2150            return Err(AwsServiceError::aws_error(
2151                StatusCode::BAD_REQUEST,
2152                "TableAlreadyExistsException",
2153                format!("Table already exists: {target_table_name}"),
2154            ));
2155        }
2156
2157        let now = Utc::now();
2158        let arn = format!(
2159            "arn:aws:dynamodb:{}:{}:table/{}",
2160            state.region, state.account_id, target_table_name
2161        );
2162
2163        let mut table = DynamoTable {
2164            name: target_table_name.to_string(),
2165            arn: arn.clone(),
2166            key_schema: source.key_schema.clone(),
2167            attribute_definitions: source.attribute_definitions.clone(),
2168            provisioned_throughput: source.provisioned_throughput.clone(),
2169            items: source.items.clone(),
2170            gsi: Vec::new(),
2171            lsi: Vec::new(),
2172            tags: HashMap::new(),
2173            created_at: now,
2174            status: "ACTIVE".to_string(),
2175            item_count: 0,
2176            size_bytes: 0,
2177            billing_mode: source.billing_mode.clone(),
2178            ttl_attribute: None,
2179            ttl_enabled: false,
2180            resource_policy: None,
2181            pitr_enabled: false,
2182            kinesis_destinations: Vec::new(),
2183            contributor_insights_status: "DISABLED".to_string(),
2184            contributor_insights_counters: HashMap::new(),
2185            stream_enabled: false,
2186            stream_view_type: None,
2187            stream_arn: None,
2188            stream_records: Arc::new(RwLock::new(Vec::new())),
2189            sse_type: None,
2190            sse_kms_key_arn: None,
2191        };
2192        table.recalculate_stats();
2193
2194        let desc = build_table_description(&table);
2195        state.tables.insert(target_table_name.to_string(), table);
2196
2197        Self::ok_json(json!({
2198            "TableDescription": desc
2199        }))
2200    }
2201
2202    fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2203        let body = Self::parse_body(req)?;
2204        let table_name = require_str(&body, "TableName")?;
2205
2206        let pitr_spec = body["PointInTimeRecoverySpecification"]
2207            .as_object()
2208            .ok_or_else(|| {
2209                AwsServiceError::aws_error(
2210                    StatusCode::BAD_REQUEST,
2211                    "ValidationException",
2212                    "PointInTimeRecoverySpecification is required",
2213                )
2214            })?;
2215        let enabled = pitr_spec
2216            .get("PointInTimeRecoveryEnabled")
2217            .and_then(|v| v.as_bool())
2218            .unwrap_or(false);
2219
2220        let mut state = self.state.write();
2221        let table = get_table_mut(&mut state.tables, table_name)?;
2222        table.pitr_enabled = enabled;
2223
2224        let status = if enabled { "ENABLED" } else { "DISABLED" };
2225        Self::ok_json(json!({
2226            "ContinuousBackupsDescription": {
2227                "ContinuousBackupsStatus": status,
2228                "PointInTimeRecoveryDescription": {
2229                    "PointInTimeRecoveryStatus": status,
2230                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2231                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
2232                }
2233            }
2234        }))
2235    }
2236
2237    fn describe_continuous_backups(
2238        &self,
2239        req: &AwsRequest,
2240    ) -> Result<AwsResponse, AwsServiceError> {
2241        let body = Self::parse_body(req)?;
2242        let table_name = require_str(&body, "TableName")?;
2243
2244        let state = self.state.read();
2245        let table = get_table(&state.tables, table_name)?;
2246
2247        let status = if table.pitr_enabled {
2248            "ENABLED"
2249        } else {
2250            "DISABLED"
2251        };
2252        Self::ok_json(json!({
2253            "ContinuousBackupsDescription": {
2254                "ContinuousBackupsStatus": status,
2255                "PointInTimeRecoveryDescription": {
2256                    "PointInTimeRecoveryStatus": status,
2257                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2258                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
2259                }
2260            }
2261        }))
2262    }
2263
2264    // ── Global Tables ──────────────────────────────────────────────────
2265
2266    fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2267        let body = Self::parse_body(req)?;
2268        let global_table_name = require_str(&body, "GlobalTableName")?;
2269        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2270
2271        let replication_group = body["ReplicationGroup"]
2272            .as_array()
2273            .ok_or_else(|| {
2274                AwsServiceError::aws_error(
2275                    StatusCode::BAD_REQUEST,
2276                    "ValidationException",
2277                    "ReplicationGroup is required",
2278                )
2279            })?
2280            .iter()
2281            .filter_map(|r| {
2282                r["RegionName"].as_str().map(|rn| ReplicaDescription {
2283                    region_name: rn.to_string(),
2284                    replica_status: "ACTIVE".to_string(),
2285                })
2286            })
2287            .collect::<Vec<_>>();
2288
2289        let mut state = self.state.write();
2290
2291        if state.global_tables.contains_key(global_table_name) {
2292            return Err(AwsServiceError::aws_error(
2293                StatusCode::BAD_REQUEST,
2294                "GlobalTableAlreadyExistsException",
2295                format!("Global table already exists: {global_table_name}"),
2296            ));
2297        }
2298
2299        let arn = format!(
2300            "arn:aws:dynamodb::{}:global-table/{}",
2301            state.account_id, global_table_name
2302        );
2303        let now = Utc::now();
2304
2305        let gt = GlobalTableDescription {
2306            global_table_name: global_table_name.to_string(),
2307            global_table_arn: arn.clone(),
2308            global_table_status: "ACTIVE".to_string(),
2309            creation_date: now,
2310            replication_group: replication_group.clone(),
2311        };
2312
2313        state
2314            .global_tables
2315            .insert(global_table_name.to_string(), gt);
2316
2317        Self::ok_json(json!({
2318            "GlobalTableDescription": {
2319                "GlobalTableName": global_table_name,
2320                "GlobalTableArn": arn,
2321                "GlobalTableStatus": "ACTIVE",
2322                "CreationDateTime": now.timestamp() as f64,
2323                "ReplicationGroup": replication_group.iter().map(|r| json!({
2324                    "RegionName": r.region_name,
2325                    "ReplicaStatus": r.replica_status
2326                })).collect::<Vec<_>>()
2327            }
2328        }))
2329    }
2330
2331    fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2332        let body = Self::parse_body(req)?;
2333        let global_table_name = require_str(&body, "GlobalTableName")?;
2334        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2335
2336        let state = self.state.read();
2337        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2338            AwsServiceError::aws_error(
2339                StatusCode::BAD_REQUEST,
2340                "GlobalTableNotFoundException",
2341                format!("Global table not found: {global_table_name}"),
2342            )
2343        })?;
2344
2345        Self::ok_json(json!({
2346            "GlobalTableDescription": {
2347                "GlobalTableName": gt.global_table_name,
2348                "GlobalTableArn": gt.global_table_arn,
2349                "GlobalTableStatus": gt.global_table_status,
2350                "CreationDateTime": gt.creation_date.timestamp() as f64,
2351                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2352                    "RegionName": r.region_name,
2353                    "ReplicaStatus": r.replica_status
2354                })).collect::<Vec<_>>()
2355            }
2356        }))
2357    }
2358
2359    fn describe_global_table_settings(
2360        &self,
2361        req: &AwsRequest,
2362    ) -> Result<AwsResponse, AwsServiceError> {
2363        let body = Self::parse_body(req)?;
2364        let global_table_name = require_str(&body, "GlobalTableName")?;
2365        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2366
2367        let state = self.state.read();
2368        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2369            AwsServiceError::aws_error(
2370                StatusCode::BAD_REQUEST,
2371                "GlobalTableNotFoundException",
2372                format!("Global table not found: {global_table_name}"),
2373            )
2374        })?;
2375
2376        let replica_settings: Vec<Value> = gt
2377            .replication_group
2378            .iter()
2379            .map(|r| {
2380                json!({
2381                    "RegionName": r.region_name,
2382                    "ReplicaStatus": r.replica_status,
2383                    "ReplicaProvisionedReadCapacityUnits": 0,
2384                    "ReplicaProvisionedWriteCapacityUnits": 0
2385                })
2386            })
2387            .collect();
2388
2389        Self::ok_json(json!({
2390            "GlobalTableName": gt.global_table_name,
2391            "ReplicaSettings": replica_settings
2392        }))
2393    }
2394
2395    fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2396        let body = Self::parse_body(req)?;
2397        validate_optional_string_length(
2398            "exclusiveStartGlobalTableName",
2399            body["ExclusiveStartGlobalTableName"].as_str(),
2400            3,
2401            255,
2402        )?;
2403        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
2404        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2405
2406        let state = self.state.read();
2407        let tables: Vec<Value> = state
2408            .global_tables
2409            .values()
2410            .take(limit)
2411            .map(|gt| {
2412                json!({
2413                    "GlobalTableName": gt.global_table_name,
2414                    "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2415                        "RegionName": r.region_name
2416                    })).collect::<Vec<_>>()
2417                })
2418            })
2419            .collect();
2420
2421        Self::ok_json(json!({
2422            "GlobalTables": tables
2423        }))
2424    }
2425
2426    fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2427        let body = Self::parse_body(req)?;
2428        let global_table_name = require_str(&body, "GlobalTableName")?;
2429        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2430        validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
2431
2432        let mut state = self.state.write();
2433        let gt = state
2434            .global_tables
2435            .get_mut(global_table_name)
2436            .ok_or_else(|| {
2437                AwsServiceError::aws_error(
2438                    StatusCode::BAD_REQUEST,
2439                    "GlobalTableNotFoundException",
2440                    format!("Global table not found: {global_table_name}"),
2441                )
2442            })?;
2443
2444        if let Some(updates) = body["ReplicaUpdates"].as_array() {
2445            for update in updates {
2446                if let Some(create) = update["Create"].as_object() {
2447                    if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
2448                        gt.replication_group.push(ReplicaDescription {
2449                            region_name: region.to_string(),
2450                            replica_status: "ACTIVE".to_string(),
2451                        });
2452                    }
2453                }
2454                if let Some(delete) = update["Delete"].as_object() {
2455                    if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
2456                        gt.replication_group.retain(|r| r.region_name != region);
2457                    }
2458                }
2459            }
2460        }
2461
2462        Self::ok_json(json!({
2463            "GlobalTableDescription": {
2464                "GlobalTableName": gt.global_table_name,
2465                "GlobalTableArn": gt.global_table_arn,
2466                "GlobalTableStatus": gt.global_table_status,
2467                "CreationDateTime": gt.creation_date.timestamp() as f64,
2468                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2469                    "RegionName": r.region_name,
2470                    "ReplicaStatus": r.replica_status
2471                })).collect::<Vec<_>>()
2472            }
2473        }))
2474    }
2475
2476    fn update_global_table_settings(
2477        &self,
2478        req: &AwsRequest,
2479    ) -> Result<AwsResponse, AwsServiceError> {
2480        let body = Self::parse_body(req)?;
2481        let global_table_name = require_str(&body, "GlobalTableName")?;
2482        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2483        validate_optional_enum_value(
2484            "globalTableBillingMode",
2485            &body["GlobalTableBillingMode"],
2486            &["PROVISIONED", "PAY_PER_REQUEST"],
2487        )?;
2488        validate_optional_range_i64(
2489            "globalTableProvisionedWriteCapacityUnits",
2490            body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
2491            1,
2492            i64::MAX,
2493        )?;
2494
2495        let state = self.state.read();
2496        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2497            AwsServiceError::aws_error(
2498                StatusCode::BAD_REQUEST,
2499                "GlobalTableNotFoundException",
2500                format!("Global table not found: {global_table_name}"),
2501            )
2502        })?;
2503
2504        let replica_settings: Vec<Value> = gt
2505            .replication_group
2506            .iter()
2507            .map(|r| {
2508                json!({
2509                    "RegionName": r.region_name,
2510                    "ReplicaStatus": r.replica_status,
2511                    "ReplicaProvisionedReadCapacityUnits": 0,
2512                    "ReplicaProvisionedWriteCapacityUnits": 0
2513                })
2514            })
2515            .collect();
2516
2517        Self::ok_json(json!({
2518            "GlobalTableName": gt.global_table_name,
2519            "ReplicaSettings": replica_settings
2520        }))
2521    }
2522
2523    fn describe_table_replica_auto_scaling(
2524        &self,
2525        req: &AwsRequest,
2526    ) -> Result<AwsResponse, AwsServiceError> {
2527        let body = Self::parse_body(req)?;
2528        let table_name = require_str(&body, "TableName")?;
2529
2530        let state = self.state.read();
2531        let table = get_table(&state.tables, table_name)?;
2532
2533        Self::ok_json(json!({
2534            "TableAutoScalingDescription": {
2535                "TableName": table.name,
2536                "TableStatus": table.status,
2537                "Replicas": []
2538            }
2539        }))
2540    }
2541
2542    fn update_table_replica_auto_scaling(
2543        &self,
2544        req: &AwsRequest,
2545    ) -> Result<AwsResponse, AwsServiceError> {
2546        let body = Self::parse_body(req)?;
2547        let table_name = require_str(&body, "TableName")?;
2548
2549        let state = self.state.read();
2550        let table = get_table(&state.tables, table_name)?;
2551
2552        Self::ok_json(json!({
2553            "TableAutoScalingDescription": {
2554                "TableName": table.name,
2555                "TableStatus": table.status,
2556                "Replicas": []
2557            }
2558        }))
2559    }
2560
2561    // ── Kinesis Streaming ──────────────────────────────────────────────
2562
2563    fn enable_kinesis_streaming_destination(
2564        &self,
2565        req: &AwsRequest,
2566    ) -> Result<AwsResponse, AwsServiceError> {
2567        let body = Self::parse_body(req)?;
2568        let table_name = require_str(&body, "TableName")?;
2569        let stream_arn = require_str(&body, "StreamArn")?;
2570        let precision = body["EnableKinesisStreamingConfiguration"]
2571            ["ApproximateCreationDateTimePrecision"]
2572            .as_str()
2573            .unwrap_or("MILLISECOND");
2574
2575        let mut state = self.state.write();
2576        let table = get_table_mut(&mut state.tables, table_name)?;
2577
2578        table.kinesis_destinations.push(KinesisDestination {
2579            stream_arn: stream_arn.to_string(),
2580            destination_status: "ACTIVE".to_string(),
2581            approximate_creation_date_time_precision: precision.to_string(),
2582        });
2583
2584        Self::ok_json(json!({
2585            "TableName": table_name,
2586            "StreamArn": stream_arn,
2587            "DestinationStatus": "ACTIVE",
2588            "EnableKinesisStreamingConfiguration": {
2589                "ApproximateCreationDateTimePrecision": precision
2590            }
2591        }))
2592    }
2593
2594    fn disable_kinesis_streaming_destination(
2595        &self,
2596        req: &AwsRequest,
2597    ) -> Result<AwsResponse, AwsServiceError> {
2598        let body = Self::parse_body(req)?;
2599        let table_name = require_str(&body, "TableName")?;
2600        let stream_arn = require_str(&body, "StreamArn")?;
2601
2602        let mut state = self.state.write();
2603        let table = get_table_mut(&mut state.tables, table_name)?;
2604
2605        if let Some(dest) = table
2606            .kinesis_destinations
2607            .iter_mut()
2608            .find(|d| d.stream_arn == stream_arn)
2609        {
2610            dest.destination_status = "DISABLED".to_string();
2611        }
2612
2613        Self::ok_json(json!({
2614            "TableName": table_name,
2615            "StreamArn": stream_arn,
2616            "DestinationStatus": "DISABLED"
2617        }))
2618    }
2619
2620    fn describe_kinesis_streaming_destination(
2621        &self,
2622        req: &AwsRequest,
2623    ) -> Result<AwsResponse, AwsServiceError> {
2624        let body = Self::parse_body(req)?;
2625        let table_name = require_str(&body, "TableName")?;
2626
2627        let state = self.state.read();
2628        let table = get_table(&state.tables, table_name)?;
2629
2630        let destinations: Vec<Value> = table
2631            .kinesis_destinations
2632            .iter()
2633            .map(|d| {
2634                json!({
2635                    "StreamArn": d.stream_arn,
2636                    "DestinationStatus": d.destination_status,
2637                    "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2638                })
2639            })
2640            .collect();
2641
2642        Self::ok_json(json!({
2643            "TableName": table_name,
2644            "KinesisDataStreamDestinations": destinations
2645        }))
2646    }
2647
2648    fn update_kinesis_streaming_destination(
2649        &self,
2650        req: &AwsRequest,
2651    ) -> Result<AwsResponse, AwsServiceError> {
2652        let body = Self::parse_body(req)?;
2653        let table_name = require_str(&body, "TableName")?;
2654        let stream_arn = require_str(&body, "StreamArn")?;
2655        let precision = body["UpdateKinesisStreamingConfiguration"]
2656            ["ApproximateCreationDateTimePrecision"]
2657            .as_str()
2658            .unwrap_or("MILLISECOND");
2659
2660        let mut state = self.state.write();
2661        let table = get_table_mut(&mut state.tables, table_name)?;
2662
2663        if let Some(dest) = table
2664            .kinesis_destinations
2665            .iter_mut()
2666            .find(|d| d.stream_arn == stream_arn)
2667        {
2668            dest.approximate_creation_date_time_precision = precision.to_string();
2669        }
2670
2671        Self::ok_json(json!({
2672            "TableName": table_name,
2673            "StreamArn": stream_arn,
2674            "DestinationStatus": "ACTIVE",
2675            "UpdateKinesisStreamingConfiguration": {
2676                "ApproximateCreationDateTimePrecision": precision
2677            }
2678        }))
2679    }
2680
2681    // ── Contributor Insights ───────────────────────────────────────────
2682
2683    fn describe_contributor_insights(
2684        &self,
2685        req: &AwsRequest,
2686    ) -> Result<AwsResponse, AwsServiceError> {
2687        let body = Self::parse_body(req)?;
2688        let table_name = require_str(&body, "TableName")?;
2689        let index_name = body["IndexName"].as_str();
2690
2691        let state = self.state.read();
2692        let table = get_table(&state.tables, table_name)?;
2693
2694        let top = table.top_contributors(10);
2695        let contributors: Vec<Value> = top
2696            .iter()
2697            .map(|(key, count)| {
2698                json!({
2699                    "Key": key,
2700                    "Count": count
2701                })
2702            })
2703            .collect();
2704
2705        let mut result = json!({
2706            "TableName": table_name,
2707            "ContributorInsightsStatus": table.contributor_insights_status,
2708            "ContributorInsightsRuleList": ["DynamoDBContributorInsights"],
2709            "TopContributors": contributors
2710        });
2711        if let Some(idx) = index_name {
2712            result["IndexName"] = json!(idx);
2713        }
2714
2715        Self::ok_json(result)
2716    }
2717
2718    fn update_contributor_insights(
2719        &self,
2720        req: &AwsRequest,
2721    ) -> Result<AwsResponse, AwsServiceError> {
2722        let body = Self::parse_body(req)?;
2723        let table_name = require_str(&body, "TableName")?;
2724        let action = require_str(&body, "ContributorInsightsAction")?;
2725        let index_name = body["IndexName"].as_str();
2726
2727        let mut state = self.state.write();
2728        let table = get_table_mut(&mut state.tables, table_name)?;
2729
2730        let status = match action {
2731            "ENABLE" => "ENABLED",
2732            "DISABLE" => "DISABLED",
2733            _ => {
2734                return Err(AwsServiceError::aws_error(
2735                    StatusCode::BAD_REQUEST,
2736                    "ValidationException",
2737                    format!("Invalid ContributorInsightsAction: {action}"),
2738                ))
2739            }
2740        };
2741        table.contributor_insights_status = status.to_string();
2742        if status == "DISABLED" {
2743            table.contributor_insights_counters.clear();
2744        }
2745
2746        let mut result = json!({
2747            "TableName": table_name,
2748            "ContributorInsightsStatus": status
2749        });
2750        if let Some(idx) = index_name {
2751            result["IndexName"] = json!(idx);
2752        }
2753
2754        Self::ok_json(result)
2755    }
2756
2757    fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2758        let body = Self::parse_body(req)?;
2759        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2760        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2761        let table_name = body["TableName"].as_str();
2762
2763        let state = self.state.read();
2764        let summaries: Vec<Value> = state
2765            .tables
2766            .values()
2767            .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2768            .map(|t| {
2769                json!({
2770                    "TableName": t.name,
2771                    "ContributorInsightsStatus": t.contributor_insights_status
2772                })
2773            })
2774            .collect();
2775
2776        Self::ok_json(json!({
2777            "ContributorInsightsSummaries": summaries
2778        }))
2779    }
2780
2781    // ── Import/Export ──────────────────────────────────────────────────
2782
2783    fn export_table_to_point_in_time(
2784        &self,
2785        req: &AwsRequest,
2786    ) -> Result<AwsResponse, AwsServiceError> {
2787        let body = Self::parse_body(req)?;
2788        let table_arn = require_str(&body, "TableArn")?;
2789        let s3_bucket = require_str(&body, "S3Bucket")?;
2790        let s3_prefix = body["S3Prefix"].as_str();
2791        let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2792
2793        let state = self.state.read();
2794        // Verify table exists and get items
2795        let table = find_table_by_arn(&state.tables, table_arn)?;
2796        let items = table.items.clone();
2797        let item_count = items.len() as i64;
2798
2799        let now = Utc::now();
2800        let export_arn = format!(
2801            "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2802            state.region,
2803            state.account_id,
2804            table_arn.rsplit('/').next().unwrap_or("unknown"),
2805            uuid::Uuid::new_v4()
2806        );
2807
2808        drop(state);
2809
2810        // Serialize items as JSON Lines and write to S3
2811        let mut json_lines = String::new();
2812        for item in &items {
2813            let item_json = if export_format == "DYNAMODB_JSON" {
2814                json!({ "Item": item })
2815            } else {
2816                json!(item)
2817            };
2818            json_lines.push_str(&serde_json::to_string(&item_json).unwrap_or_default());
2819            json_lines.push('\n');
2820        }
2821        let data_size = json_lines.len() as i64;
2822
2823        // Build S3 key for the export data
2824        let s3_key = if let Some(prefix) = s3_prefix {
2825            format!("{prefix}/data/manifest-files.json")
2826        } else {
2827            "data/manifest-files.json".to_string()
2828        };
2829
2830        // Write to S3 if we have access to S3 state
2831        let mut export_failed = false;
2832        let mut failure_reason = String::new();
2833        if let Some(ref s3_state) = self.s3_state {
2834            let mut s3 = s3_state.write();
2835            if let Some(bucket) = s3.buckets.get_mut(s3_bucket) {
2836                let etag = uuid::Uuid::new_v4().to_string().replace('-', "");
2837                let obj = fakecloud_s3::state::S3Object {
2838                    key: s3_key.clone(),
2839                    data: bytes::Bytes::from(json_lines),
2840                    content_type: "application/json".to_string(),
2841                    etag,
2842                    size: data_size as u64,
2843                    last_modified: now,
2844                    metadata: HashMap::new(),
2845                    storage_class: "STANDARD".to_string(),
2846                    tags: HashMap::new(),
2847                    acl_grants: Vec::new(),
2848                    acl_owner_id: None,
2849                    parts_count: None,
2850                    part_sizes: None,
2851                    sse_algorithm: None,
2852                    sse_kms_key_id: None,
2853                    bucket_key_enabled: None,
2854                    version_id: None,
2855                    is_delete_marker: false,
2856                    content_encoding: None,
2857                    website_redirect_location: None,
2858                    restore_ongoing: None,
2859                    restore_expiry: None,
2860                    checksum_algorithm: None,
2861                    checksum_value: None,
2862                    lock_mode: None,
2863                    lock_retain_until: None,
2864                    lock_legal_hold: None,
2865                };
2866                bucket.objects.insert(s3_key, obj);
2867            } else {
2868                export_failed = true;
2869                failure_reason = format!("S3 bucket does not exist: {s3_bucket}");
2870            }
2871        }
2872
2873        let export_status = if export_failed { "FAILED" } else { "COMPLETED" };
2874
2875        let export = ExportDescription {
2876            export_arn: export_arn.clone(),
2877            export_status: export_status.to_string(),
2878            table_arn: table_arn.to_string(),
2879            s3_bucket: s3_bucket.to_string(),
2880            s3_prefix: s3_prefix.map(|s| s.to_string()),
2881            export_format: export_format.to_string(),
2882            start_time: now,
2883            end_time: now,
2884            export_time: now,
2885            item_count,
2886            billed_size_bytes: data_size,
2887        };
2888
2889        let mut state = self.state.write();
2890        state.exports.insert(export_arn.clone(), export);
2891
2892        let mut response = json!({
2893            "ExportDescription": {
2894                "ExportArn": export_arn,
2895                "ExportStatus": export_status,
2896                "TableArn": table_arn,
2897                "S3Bucket": s3_bucket,
2898                "S3Prefix": s3_prefix,
2899                "ExportFormat": export_format,
2900                "StartTime": now.timestamp() as f64,
2901                "EndTime": now.timestamp() as f64,
2902                "ExportTime": now.timestamp() as f64,
2903                "ItemCount": item_count,
2904                "BilledSizeBytes": data_size
2905            }
2906        });
2907        if export_failed {
2908            response["ExportDescription"]["FailureCode"] = json!("S3NoSuchBucket");
2909            response["ExportDescription"]["FailureMessage"] = json!(failure_reason);
2910        }
2911        Self::ok_json(response)
2912    }
2913
2914    fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2915        let body = Self::parse_body(req)?;
2916        let export_arn = require_str(&body, "ExportArn")?;
2917
2918        let state = self.state.read();
2919        let export = state.exports.get(export_arn).ok_or_else(|| {
2920            AwsServiceError::aws_error(
2921                StatusCode::BAD_REQUEST,
2922                "ExportNotFoundException",
2923                format!("Export not found: {export_arn}"),
2924            )
2925        })?;
2926
2927        Self::ok_json(json!({
2928            "ExportDescription": {
2929                "ExportArn": export.export_arn,
2930                "ExportStatus": export.export_status,
2931                "TableArn": export.table_arn,
2932                "S3Bucket": export.s3_bucket,
2933                "S3Prefix": export.s3_prefix,
2934                "ExportFormat": export.export_format,
2935                "StartTime": export.start_time.timestamp() as f64,
2936                "EndTime": export.end_time.timestamp() as f64,
2937                "ExportTime": export.export_time.timestamp() as f64,
2938                "ItemCount": export.item_count,
2939                "BilledSizeBytes": export.billed_size_bytes
2940            }
2941        }))
2942    }
2943
2944    fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2945        let body = Self::parse_body(req)?;
2946        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2947        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2948        let table_arn = body["TableArn"].as_str();
2949
2950        let state = self.state.read();
2951        let summaries: Vec<Value> = state
2952            .exports
2953            .values()
2954            .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2955            .map(|e| {
2956                json!({
2957                    "ExportArn": e.export_arn,
2958                    "ExportStatus": e.export_status,
2959                    "TableArn": e.table_arn
2960                })
2961            })
2962            .collect();
2963
2964        Self::ok_json(json!({
2965            "ExportSummaries": summaries
2966        }))
2967    }
2968
2969    fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2970        let body = Self::parse_body(req)?;
2971        let input_format = require_str(&body, "InputFormat")?;
2972        let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2973            AwsServiceError::aws_error(
2974                StatusCode::BAD_REQUEST,
2975                "ValidationException",
2976                "S3BucketSource is required",
2977            )
2978        })?;
2979        let s3_bucket = s3_source
2980            .get("S3Bucket")
2981            .and_then(|v| v.as_str())
2982            .unwrap_or("");
2983        let s3_key_prefix = s3_source
2984            .get("S3KeyPrefix")
2985            .and_then(|v| v.as_str())
2986            .unwrap_or("");
2987
2988        let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2989            AwsServiceError::aws_error(
2990                StatusCode::BAD_REQUEST,
2991                "ValidationException",
2992                "TableCreationParameters is required",
2993            )
2994        })?;
2995        let table_name = table_params
2996            .get("TableName")
2997            .and_then(|v| v.as_str())
2998            .ok_or_else(|| {
2999                AwsServiceError::aws_error(
3000                    StatusCode::BAD_REQUEST,
3001                    "ValidationException",
3002                    "TableCreationParameters.TableName is required",
3003                )
3004            })?;
3005
3006        let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
3007        let attribute_definitions = parse_attribute_definitions(
3008            table_params
3009                .get("AttributeDefinitions")
3010                .unwrap_or(&Value::Null),
3011        )?;
3012
3013        // Read items from S3 if we have access
3014        let mut imported_items: Vec<HashMap<String, Value>> = Vec::new();
3015        let mut processed_size_bytes: i64 = 0;
3016        if let Some(ref s3_state) = self.s3_state {
3017            let s3 = s3_state.read();
3018            let bucket = s3.buckets.get(s3_bucket).ok_or_else(|| {
3019                AwsServiceError::aws_error(
3020                    StatusCode::BAD_REQUEST,
3021                    "ImportConflictException",
3022                    format!("S3 bucket does not exist: {s3_bucket}"),
3023                )
3024            })?;
3025            // Find all objects under the prefix and try to parse JSON Lines from each
3026            let prefix = if s3_key_prefix.is_empty() {
3027                String::new()
3028            } else {
3029                s3_key_prefix.to_string()
3030            };
3031            for (key, obj) in &bucket.objects {
3032                if !prefix.is_empty() && !key.starts_with(&prefix) {
3033                    continue;
3034                }
3035                let data = std::str::from_utf8(&obj.data).unwrap_or("");
3036                processed_size_bytes += obj.size as i64;
3037                for line in data.lines() {
3038                    let line = line.trim();
3039                    if line.is_empty() {
3040                        continue;
3041                    }
3042                    if let Ok(parsed) = serde_json::from_str::<Value>(line) {
3043                        // DYNAMODB_JSON format wraps items in {"Item": {...}}
3044                        let item = if input_format == "DYNAMODB_JSON" {
3045                            if let Some(item_obj) = parsed.get("Item") {
3046                                item_obj.as_object().cloned().unwrap_or_default()
3047                            } else {
3048                                parsed.as_object().cloned().unwrap_or_default()
3049                            }
3050                        } else {
3051                            parsed.as_object().cloned().unwrap_or_default()
3052                        };
3053                        if !item.is_empty() {
3054                            imported_items
3055                                .push(item.into_iter().collect::<HashMap<String, Value>>());
3056                        }
3057                    }
3058                }
3059            }
3060        }
3061
3062        let mut state = self.state.write();
3063
3064        if state.tables.contains_key(table_name) {
3065            return Err(AwsServiceError::aws_error(
3066                StatusCode::BAD_REQUEST,
3067                "ResourceInUseException",
3068                format!("Table already exists: {table_name}"),
3069            ));
3070        }
3071
3072        let now = Utc::now();
3073        let table_arn = format!(
3074            "arn:aws:dynamodb:{}:{}:table/{}",
3075            state.region, state.account_id, table_name
3076        );
3077        let import_arn = format!(
3078            "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
3079            state.region,
3080            state.account_id,
3081            table_name,
3082            uuid::Uuid::new_v4()
3083        );
3084
3085        let processed_item_count = imported_items.len() as i64;
3086
3087        let mut table = DynamoTable {
3088            name: table_name.to_string(),
3089            arn: table_arn.clone(),
3090            key_schema,
3091            attribute_definitions,
3092            provisioned_throughput: ProvisionedThroughput {
3093                read_capacity_units: 0,
3094                write_capacity_units: 0,
3095            },
3096            items: imported_items,
3097            gsi: Vec::new(),
3098            lsi: Vec::new(),
3099            tags: HashMap::new(),
3100            created_at: now,
3101            status: "ACTIVE".to_string(),
3102            item_count: 0,
3103            size_bytes: 0,
3104            billing_mode: "PAY_PER_REQUEST".to_string(),
3105            ttl_attribute: None,
3106            ttl_enabled: false,
3107            resource_policy: None,
3108            pitr_enabled: false,
3109            kinesis_destinations: Vec::new(),
3110            contributor_insights_status: "DISABLED".to_string(),
3111            contributor_insights_counters: HashMap::new(),
3112            stream_enabled: false,
3113            stream_view_type: None,
3114            stream_arn: None,
3115            stream_records: Arc::new(RwLock::new(Vec::new())),
3116            sse_type: None,
3117            sse_kms_key_arn: None,
3118        };
3119        table.recalculate_stats();
3120        state.tables.insert(table_name.to_string(), table);
3121
3122        let import_desc = ImportDescription {
3123            import_arn: import_arn.clone(),
3124            import_status: "COMPLETED".to_string(),
3125            table_arn: table_arn.clone(),
3126            table_name: table_name.to_string(),
3127            s3_bucket_source: s3_bucket.to_string(),
3128            input_format: input_format.to_string(),
3129            start_time: now,
3130            end_time: now,
3131            processed_item_count,
3132            processed_size_bytes,
3133        };
3134        state.imports.insert(import_arn.clone(), import_desc);
3135
3136        let table_ref = state.tables.get(table_name).unwrap();
3137        let ks: Vec<Value> = table_ref
3138            .key_schema
3139            .iter()
3140            .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3141            .collect();
3142        let ad: Vec<Value> = table_ref
3143            .attribute_definitions
3144            .iter()
3145            .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3146            .collect();
3147
3148        Self::ok_json(json!({
3149            "ImportTableDescription": {
3150                "ImportArn": import_arn,
3151                "ImportStatus": "COMPLETED",
3152                "TableArn": table_arn,
3153                "TableId": uuid::Uuid::new_v4().to_string(),
3154                "S3BucketSource": {
3155                    "S3Bucket": s3_bucket
3156                },
3157                "InputFormat": input_format,
3158                "TableCreationParameters": {
3159                    "TableName": table_name,
3160                    "KeySchema": ks,
3161                    "AttributeDefinitions": ad
3162                },
3163                "StartTime": now.timestamp() as f64,
3164                "EndTime": now.timestamp() as f64,
3165                "ProcessedItemCount": processed_item_count,
3166                "ProcessedSizeBytes": processed_size_bytes
3167            }
3168        }))
3169    }
3170
3171    fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3172        let body = Self::parse_body(req)?;
3173        let import_arn = require_str(&body, "ImportArn")?;
3174
3175        let state = self.state.read();
3176        let import = state.imports.get(import_arn).ok_or_else(|| {
3177            AwsServiceError::aws_error(
3178                StatusCode::BAD_REQUEST,
3179                "ImportNotFoundException",
3180                format!("Import not found: {import_arn}"),
3181            )
3182        })?;
3183
3184        Self::ok_json(json!({
3185            "ImportTableDescription": {
3186                "ImportArn": import.import_arn,
3187                "ImportStatus": import.import_status,
3188                "TableArn": import.table_arn,
3189                "S3BucketSource": {
3190                    "S3Bucket": import.s3_bucket_source
3191                },
3192                "InputFormat": import.input_format,
3193                "StartTime": import.start_time.timestamp() as f64,
3194                "EndTime": import.end_time.timestamp() as f64,
3195                "ProcessedItemCount": import.processed_item_count,
3196                "ProcessedSizeBytes": import.processed_size_bytes
3197            }
3198        }))
3199    }
3200
3201    fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3202        let body = Self::parse_body(req)?;
3203        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
3204        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
3205        validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
3206        let table_arn = body["TableArn"].as_str();
3207
3208        let state = self.state.read();
3209        let summaries: Vec<Value> = state
3210            .imports
3211            .values()
3212            .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
3213            .map(|i| {
3214                json!({
3215                    "ImportArn": i.import_arn,
3216                    "ImportStatus": i.import_status,
3217                    "TableArn": i.table_arn
3218                })
3219            })
3220            .collect();
3221
3222        Self::ok_json(json!({
3223            "ImportSummaryList": summaries
3224        }))
3225    }
3226}
3227
3228#[async_trait]
3229impl AwsService for DynamoDbService {
3230    fn service_name(&self) -> &str {
3231        "dynamodb"
3232    }
3233
3234    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3235        match req.action.as_str() {
3236            "CreateTable" => self.create_table(&req),
3237            "DeleteTable" => self.delete_table(&req),
3238            "DescribeTable" => self.describe_table(&req),
3239            "ListTables" => self.list_tables(&req),
3240            "UpdateTable" => self.update_table(&req),
3241            "PutItem" => self.put_item(&req),
3242            "GetItem" => self.get_item(&req),
3243            "DeleteItem" => self.delete_item(&req),
3244            "UpdateItem" => self.update_item(&req),
3245            "Query" => self.query(&req),
3246            "Scan" => self.scan(&req),
3247            "BatchGetItem" => self.batch_get_item(&req),
3248            "BatchWriteItem" => self.batch_write_item(&req),
3249            "TagResource" => self.tag_resource(&req),
3250            "UntagResource" => self.untag_resource(&req),
3251            "ListTagsOfResource" => self.list_tags_of_resource(&req),
3252            "TransactGetItems" => self.transact_get_items(&req),
3253            "TransactWriteItems" => self.transact_write_items(&req),
3254            "ExecuteStatement" => self.execute_statement(&req),
3255            "BatchExecuteStatement" => self.batch_execute_statement(&req),
3256            "ExecuteTransaction" => self.execute_transaction(&req),
3257            "UpdateTimeToLive" => self.update_time_to_live(&req),
3258            "DescribeTimeToLive" => self.describe_time_to_live(&req),
3259            "PutResourcePolicy" => self.put_resource_policy(&req),
3260            "GetResourcePolicy" => self.get_resource_policy(&req),
3261            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
3262            // Stubs
3263            "DescribeEndpoints" => self.describe_endpoints(&req),
3264            "DescribeLimits" => self.describe_limits(&req),
3265            // Backups
3266            "CreateBackup" => self.create_backup(&req),
3267            "DeleteBackup" => self.delete_backup(&req),
3268            "DescribeBackup" => self.describe_backup(&req),
3269            "ListBackups" => self.list_backups(&req),
3270            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
3271            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
3272            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
3273            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
3274            // Global tables
3275            "CreateGlobalTable" => self.create_global_table(&req),
3276            "DescribeGlobalTable" => self.describe_global_table(&req),
3277            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
3278            "ListGlobalTables" => self.list_global_tables(&req),
3279            "UpdateGlobalTable" => self.update_global_table(&req),
3280            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
3281            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
3282            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
3283            // Kinesis streaming
3284            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
3285            "DisableKinesisStreamingDestination" => {
3286                self.disable_kinesis_streaming_destination(&req)
3287            }
3288            "DescribeKinesisStreamingDestination" => {
3289                self.describe_kinesis_streaming_destination(&req)
3290            }
3291            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
3292            // Contributor insights
3293            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
3294            "UpdateContributorInsights" => self.update_contributor_insights(&req),
3295            "ListContributorInsights" => self.list_contributor_insights(&req),
3296            // Import/Export
3297            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
3298            "DescribeExport" => self.describe_export(&req),
3299            "ListExports" => self.list_exports(&req),
3300            "ImportTable" => self.import_table(&req),
3301            "DescribeImport" => self.describe_import(&req),
3302            "ListImports" => self.list_imports(&req),
3303            _ => Err(AwsServiceError::action_not_implemented(
3304                "dynamodb",
3305                &req.action,
3306            )),
3307        }
3308    }
3309
3310    fn supported_actions(&self) -> &[&str] {
3311        &[
3312            "CreateTable",
3313            "DeleteTable",
3314            "DescribeTable",
3315            "ListTables",
3316            "UpdateTable",
3317            "PutItem",
3318            "GetItem",
3319            "DeleteItem",
3320            "UpdateItem",
3321            "Query",
3322            "Scan",
3323            "BatchGetItem",
3324            "BatchWriteItem",
3325            "TagResource",
3326            "UntagResource",
3327            "ListTagsOfResource",
3328            "TransactGetItems",
3329            "TransactWriteItems",
3330            "ExecuteStatement",
3331            "BatchExecuteStatement",
3332            "ExecuteTransaction",
3333            "UpdateTimeToLive",
3334            "DescribeTimeToLive",
3335            "PutResourcePolicy",
3336            "GetResourcePolicy",
3337            "DeleteResourcePolicy",
3338            "DescribeEndpoints",
3339            "DescribeLimits",
3340            "CreateBackup",
3341            "DeleteBackup",
3342            "DescribeBackup",
3343            "ListBackups",
3344            "RestoreTableFromBackup",
3345            "RestoreTableToPointInTime",
3346            "UpdateContinuousBackups",
3347            "DescribeContinuousBackups",
3348            "CreateGlobalTable",
3349            "DescribeGlobalTable",
3350            "DescribeGlobalTableSettings",
3351            "ListGlobalTables",
3352            "UpdateGlobalTable",
3353            "UpdateGlobalTableSettings",
3354            "DescribeTableReplicaAutoScaling",
3355            "UpdateTableReplicaAutoScaling",
3356            "EnableKinesisStreamingDestination",
3357            "DisableKinesisStreamingDestination",
3358            "DescribeKinesisStreamingDestination",
3359            "UpdateKinesisStreamingDestination",
3360            "DescribeContributorInsights",
3361            "UpdateContributorInsights",
3362            "ListContributorInsights",
3363            "ExportTableToPointInTime",
3364            "DescribeExport",
3365            "ListExports",
3366            "ImportTable",
3367            "DescribeImport",
3368            "ListImports",
3369        ]
3370    }
3371}
3372
3373// ── Helper functions ────────────────────────────────────────────────────
3374
3375fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
3376    body[field].as_str().ok_or_else(|| {
3377        AwsServiceError::aws_error(
3378            StatusCode::BAD_REQUEST,
3379            "ValidationException",
3380            format!("{field} is required"),
3381        )
3382    })
3383}
3384
3385fn require_object(
3386    body: &Value,
3387    field: &str,
3388) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
3389    let obj = body[field].as_object().ok_or_else(|| {
3390        AwsServiceError::aws_error(
3391            StatusCode::BAD_REQUEST,
3392            "ValidationException",
3393            format!("{field} is required"),
3394        )
3395    })?;
3396    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3397}
3398
3399fn get_table<'a>(
3400    tables: &'a HashMap<String, DynamoTable>,
3401    name: &str,
3402) -> Result<&'a DynamoTable, AwsServiceError> {
3403    tables.get(name).ok_or_else(|| {
3404        AwsServiceError::aws_error(
3405            StatusCode::BAD_REQUEST,
3406            "ResourceNotFoundException",
3407            format!("Requested resource not found: Table: {name} not found"),
3408        )
3409    })
3410}
3411
3412fn get_table_mut<'a>(
3413    tables: &'a mut HashMap<String, DynamoTable>,
3414    name: &str,
3415) -> Result<&'a mut DynamoTable, AwsServiceError> {
3416    tables.get_mut(name).ok_or_else(|| {
3417        AwsServiceError::aws_error(
3418            StatusCode::BAD_REQUEST,
3419            "ResourceNotFoundException",
3420            format!("Requested resource not found: Table: {name} not found"),
3421        )
3422    })
3423}
3424
3425fn find_table_by_arn<'a>(
3426    tables: &'a HashMap<String, DynamoTable>,
3427    arn: &str,
3428) -> Result<&'a DynamoTable, AwsServiceError> {
3429    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
3430        AwsServiceError::aws_error(
3431            StatusCode::BAD_REQUEST,
3432            "ResourceNotFoundException",
3433            format!("Requested resource not found: {arn}"),
3434        )
3435    })
3436}
3437
3438fn find_table_by_arn_mut<'a>(
3439    tables: &'a mut HashMap<String, DynamoTable>,
3440    arn: &str,
3441) -> Result<&'a mut DynamoTable, AwsServiceError> {
3442    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
3443        AwsServiceError::aws_error(
3444            StatusCode::BAD_REQUEST,
3445            "ResourceNotFoundException",
3446            format!("Requested resource not found: {arn}"),
3447        )
3448    })
3449}
3450
3451fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
3452    let arr = val.as_array().ok_or_else(|| {
3453        AwsServiceError::aws_error(
3454            StatusCode::BAD_REQUEST,
3455            "ValidationException",
3456            "KeySchema is required",
3457        )
3458    })?;
3459    Ok(arr
3460        .iter()
3461        .map(|elem| KeySchemaElement {
3462            attribute_name: elem["AttributeName"]
3463                .as_str()
3464                .unwrap_or_default()
3465                .to_string(),
3466            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
3467        })
3468        .collect())
3469}
3470
3471fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
3472    let arr = val.as_array().ok_or_else(|| {
3473        AwsServiceError::aws_error(
3474            StatusCode::BAD_REQUEST,
3475            "ValidationException",
3476            "AttributeDefinitions is required",
3477        )
3478    })?;
3479    Ok(arr
3480        .iter()
3481        .map(|elem| AttributeDefinition {
3482            attribute_name: elem["AttributeName"]
3483                .as_str()
3484                .unwrap_or_default()
3485                .to_string(),
3486            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
3487        })
3488        .collect())
3489}
3490
3491fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
3492    Ok(ProvisionedThroughput {
3493        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
3494        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
3495    })
3496}
3497
3498fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
3499    let Some(arr) = val.as_array() else {
3500        return Vec::new();
3501    };
3502    arr.iter()
3503        .filter_map(|g| {
3504            Some(GlobalSecondaryIndex {
3505                index_name: g["IndexName"].as_str()?.to_string(),
3506                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
3507                projection: parse_projection(&g["Projection"]),
3508                provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
3509                    .ok(),
3510            })
3511        })
3512        .collect()
3513}
3514
3515fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
3516    let Some(arr) = val.as_array() else {
3517        return Vec::new();
3518    };
3519    arr.iter()
3520        .filter_map(|l| {
3521            Some(LocalSecondaryIndex {
3522                index_name: l["IndexName"].as_str()?.to_string(),
3523                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
3524                projection: parse_projection(&l["Projection"]),
3525            })
3526        })
3527        .collect()
3528}
3529
3530fn parse_projection(val: &Value) -> Projection {
3531    Projection {
3532        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
3533        non_key_attributes: val["NonKeyAttributes"]
3534            .as_array()
3535            .map(|arr| {
3536                arr.iter()
3537                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3538                    .collect()
3539            })
3540            .unwrap_or_default(),
3541    }
3542}
3543
3544fn parse_tags(val: &Value) -> HashMap<String, String> {
3545    let mut tags = HashMap::new();
3546    if let Some(arr) = val.as_array() {
3547        for tag in arr {
3548            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
3549                tags.insert(k.to_string(), v.to_string());
3550            }
3551        }
3552    }
3553    tags
3554}
3555
3556fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
3557    let mut names = HashMap::new();
3558    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
3559        for (k, v) in obj {
3560            if let Some(s) = v.as_str() {
3561                names.insert(k.clone(), s.to_string());
3562            }
3563        }
3564    }
3565    names
3566}
3567
3568fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
3569    let mut values = HashMap::new();
3570    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
3571        for (k, v) in obj {
3572            values.insert(k.clone(), v.clone());
3573        }
3574    }
3575    values
3576}
3577
3578fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
3579    if name.starts_with('#') {
3580        expr_attr_names
3581            .get(name)
3582            .cloned()
3583            .unwrap_or_else(|| name.to_string())
3584    } else {
3585        name.to_string()
3586    }
3587}
3588
3589fn extract_key(
3590    table: &DynamoTable,
3591    item: &HashMap<String, AttributeValue>,
3592) -> HashMap<String, AttributeValue> {
3593    let mut key = HashMap::new();
3594    let hash_key = table.hash_key_name();
3595    if let Some(v) = item.get(hash_key) {
3596        key.insert(hash_key.to_string(), v.clone());
3597    }
3598    if let Some(range_key) = table.range_key_name() {
3599        if let Some(v) = item.get(range_key) {
3600            key.insert(range_key.to_string(), v.clone());
3601        }
3602    }
3603    key
3604}
3605
3606/// Parse a JSON object into a key map (used for ExclusiveStartKey).
3607fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
3608    let obj = value.as_object()?;
3609    if obj.is_empty() {
3610        return None;
3611    }
3612    Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3613}
3614
3615/// Check whether an item's key attributes match the given key map.
3616fn item_matches_key(
3617    item: &HashMap<String, AttributeValue>,
3618    key: &HashMap<String, AttributeValue>,
3619    hash_key_name: &str,
3620    range_key_name: Option<&str>,
3621) -> bool {
3622    let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
3623        (Some(a), Some(b)) => a == b,
3624        _ => false,
3625    };
3626    if !hash_match {
3627        return false;
3628    }
3629    match range_key_name {
3630        Some(rk) => match (item.get(rk), key.get(rk)) {
3631            (Some(a), Some(b)) => a == b,
3632            (None, None) => true,
3633            _ => false,
3634        },
3635        None => true,
3636    }
3637}
3638
3639/// Extract the primary key from an item given explicit key attribute names.
3640fn extract_key_for_schema(
3641    item: &HashMap<String, AttributeValue>,
3642    hash_key_name: &str,
3643    range_key_name: Option<&str>,
3644) -> HashMap<String, AttributeValue> {
3645    let mut key = HashMap::new();
3646    if let Some(v) = item.get(hash_key_name) {
3647        key.insert(hash_key_name.to_string(), v.clone());
3648    }
3649    if let Some(rk) = range_key_name {
3650        if let Some(v) = item.get(rk) {
3651            key.insert(rk.to_string(), v.clone());
3652        }
3653    }
3654    key
3655}
3656
3657fn validate_key_in_item(
3658    table: &DynamoTable,
3659    item: &HashMap<String, AttributeValue>,
3660) -> Result<(), AwsServiceError> {
3661    let hash_key = table.hash_key_name();
3662    if !item.contains_key(hash_key) {
3663        return Err(AwsServiceError::aws_error(
3664            StatusCode::BAD_REQUEST,
3665            "ValidationException",
3666            format!("Missing the key {hash_key} in the item"),
3667        ));
3668    }
3669    if let Some(range_key) = table.range_key_name() {
3670        if !item.contains_key(range_key) {
3671            return Err(AwsServiceError::aws_error(
3672                StatusCode::BAD_REQUEST,
3673                "ValidationException",
3674                format!("Missing the key {range_key} in the item"),
3675            ));
3676        }
3677    }
3678    Ok(())
3679}
3680
3681fn validate_key_attributes_in_key(
3682    table: &DynamoTable,
3683    key: &HashMap<String, AttributeValue>,
3684) -> Result<(), AwsServiceError> {
3685    let hash_key = table.hash_key_name();
3686    if !key.contains_key(hash_key) {
3687        return Err(AwsServiceError::aws_error(
3688            StatusCode::BAD_REQUEST,
3689            "ValidationException",
3690            format!("Missing the key {hash_key} in the item"),
3691        ));
3692    }
3693    Ok(())
3694}
3695
3696fn project_item(
3697    item: &HashMap<String, AttributeValue>,
3698    body: &Value,
3699) -> HashMap<String, AttributeValue> {
3700    let projection = body["ProjectionExpression"].as_str();
3701    match projection {
3702        Some(proj) if !proj.is_empty() => {
3703            let expr_attr_names = parse_expression_attribute_names(body);
3704            let attrs: Vec<String> = proj
3705                .split(',')
3706                .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3707                .collect();
3708            let mut result = HashMap::new();
3709            for attr in &attrs {
3710                if let Some(v) = resolve_nested_path(item, attr) {
3711                    insert_nested_value(&mut result, attr, v);
3712                }
3713            }
3714            result
3715        }
3716        _ => item.clone(),
3717    }
3718}
3719
3720/// Resolve expression attribute names within each segment of a projection path.
3721/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
3722fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3723    // Split on dots, resolve each part, rejoin
3724    let mut result = String::new();
3725    for (i, segment) in path.split('.').enumerate() {
3726        if i > 0 {
3727            result.push('.');
3728        }
3729        // A segment might be like "#n" or "people[0]" or "#attr[0]"
3730        if let Some(bracket_pos) = segment.find('[') {
3731            let key_part = &segment[..bracket_pos];
3732            let index_part = &segment[bracket_pos..];
3733            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3734            result.push_str(index_part);
3735        } else {
3736            result.push_str(&resolve_attr_name(segment, expr_attr_names));
3737        }
3738    }
3739    result
3740}
3741
3742/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
3743fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3744    let segments = parse_path_segments(path);
3745    if segments.is_empty() {
3746        return None;
3747    }
3748
3749    let first = &segments[0];
3750    let top_key = match first {
3751        PathSegment::Key(k) => k.as_str(),
3752        _ => return None,
3753    };
3754
3755    let mut current = item.get(top_key)?.clone();
3756
3757    for segment in &segments[1..] {
3758        match segment {
3759            PathSegment::Key(k) => {
3760                // Navigate into a Map: {"M": {"key": ...}}
3761                current = current.get("M")?.get(k)?.clone();
3762            }
3763            PathSegment::Index(idx) => {
3764                // Navigate into a List: {"L": [...]}
3765                current = current.get("L")?.get(*idx)?.clone();
3766            }
3767        }
3768    }
3769
3770    Some(current)
3771}
3772
3773#[derive(Debug)]
3774enum PathSegment {
3775    Key(String),
3776    Index(usize),
3777}
3778
3779/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
3780fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3781    let mut segments = Vec::new();
3782    let mut current = String::new();
3783
3784    let chars: Vec<char> = path.chars().collect();
3785    let mut i = 0;
3786    while i < chars.len() {
3787        match chars[i] {
3788            '.' => {
3789                if !current.is_empty() {
3790                    segments.push(PathSegment::Key(current.clone()));
3791                    current.clear();
3792                }
3793            }
3794            '[' => {
3795                if !current.is_empty() {
3796                    segments.push(PathSegment::Key(current.clone()));
3797                    current.clear();
3798                }
3799                i += 1;
3800                let mut num = String::new();
3801                while i < chars.len() && chars[i] != ']' {
3802                    num.push(chars[i]);
3803                    i += 1;
3804                }
3805                if let Ok(idx) = num.parse::<usize>() {
3806                    segments.push(PathSegment::Index(idx));
3807                }
3808                // skip ']'
3809            }
3810            c => {
3811                current.push(c);
3812            }
3813        }
3814        i += 1;
3815    }
3816    if !current.is_empty() {
3817        segments.push(PathSegment::Key(current));
3818    }
3819    segments
3820}
3821
3822/// Insert a value at a nested path in the result HashMap.
3823/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
3824fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3825    // Simple case: no nesting
3826    if !path.contains('.') && !path.contains('[') {
3827        result.insert(path.to_string(), value);
3828        return;
3829    }
3830
3831    let segments = parse_path_segments(path);
3832    if segments.is_empty() {
3833        return;
3834    }
3835
3836    let top_key = match &segments[0] {
3837        PathSegment::Key(k) => k.clone(),
3838        _ => return,
3839    };
3840
3841    if segments.len() == 1 {
3842        result.insert(top_key, value);
3843        return;
3844    }
3845
3846    // For nested paths, wrap the value back into the nested structure
3847    let wrapped = wrap_value_in_path(&segments[1..], value);
3848    // Merge into existing value if present
3849    let existing = result.remove(&top_key);
3850    let merged = match existing {
3851        Some(existing) => merge_attribute_values(existing, wrapped),
3852        None => wrapped,
3853    };
3854    result.insert(top_key, merged);
3855}
3856
3857/// Wrap a value in the nested path structure.
3858fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3859    if segments.is_empty() {
3860        return value;
3861    }
3862    let inner = wrap_value_in_path(&segments[1..], value);
3863    match &segments[0] {
3864        PathSegment::Key(k) => {
3865            json!({"M": {k.clone(): inner}})
3866        }
3867        PathSegment::Index(idx) => {
3868            let mut arr = vec![Value::Null; idx + 1];
3869            arr[*idx] = inner;
3870            json!({"L": arr})
3871        }
3872    }
3873}
3874
3875/// Merge two attribute values (for overlapping projections).
3876fn merge_attribute_values(a: Value, b: Value) -> Value {
3877    if let (Some(a_map), Some(b_map)) = (
3878        a.get("M").and_then(|v| v.as_object()),
3879        b.get("M").and_then(|v| v.as_object()),
3880    ) {
3881        let mut merged = a_map.clone();
3882        for (k, v) in b_map {
3883            if let Some(existing) = merged.get(k) {
3884                merged.insert(
3885                    k.clone(),
3886                    merge_attribute_values(existing.clone(), v.clone()),
3887                );
3888            } else {
3889                merged.insert(k.clone(), v.clone());
3890            }
3891        }
3892        json!({"M": merged})
3893    } else {
3894        b
3895    }
3896}
3897
3898fn evaluate_condition(
3899    condition: &str,
3900    existing: Option<&HashMap<String, AttributeValue>>,
3901    expr_attr_names: &HashMap<String, String>,
3902    expr_attr_values: &HashMap<String, Value>,
3903) -> Result<(), AwsServiceError> {
3904    // ConditionExpression and FilterExpression share the same DynamoDB grammar,
3905    // so we delegate to evaluate_filter_expression. An empty map models "item
3906    // doesn't exist" correctly: attribute_exists → false, attribute_not_exists
3907    // → true, comparisons against missing attributes → None vs Some(val).
3908    let empty = HashMap::new();
3909    let item = existing.unwrap_or(&empty);
3910    if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
3911        Ok(())
3912    } else {
3913        Err(AwsServiceError::aws_error(
3914            StatusCode::BAD_REQUEST,
3915            "ConditionalCheckFailedException",
3916            "The conditional request failed",
3917        ))
3918    }
3919}
3920
3921fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3922    // aws-sdk-go v2's expression builder emits function calls with a space
3923    // between the name and the opening paren (`attribute_exists (#0)`),
3924    // while hand-written expressions usually don't — accept both.
3925    let with_paren = format!("{func_name}(");
3926    let with_space = format!("{func_name} (");
3927    let rest = expr
3928        .strip_prefix(&with_paren)
3929        .or_else(|| expr.strip_prefix(&with_space))?;
3930    let inner = rest.strip_suffix(')')?;
3931    Some(inner.trim())
3932}
3933
3934fn evaluate_key_condition(
3935    expr: &str,
3936    item: &HashMap<String, AttributeValue>,
3937    hash_key_name: &str,
3938    _range_key_name: Option<&str>,
3939    expr_attr_names: &HashMap<String, String>,
3940    expr_attr_values: &HashMap<String, Value>,
3941) -> bool {
3942    let parts: Vec<&str> = split_on_and(expr);
3943    for part in &parts {
3944        let part = part.trim();
3945        if !evaluate_single_key_condition(
3946            part,
3947            item,
3948            hash_key_name,
3949            expr_attr_names,
3950            expr_attr_values,
3951        ) {
3952            return false;
3953        }
3954    }
3955    true
3956}
3957
3958fn split_on_and(expr: &str) -> Vec<&str> {
3959    let mut parts = Vec::new();
3960    let mut start = 0;
3961    let len = expr.len();
3962    let mut i = 0;
3963    let mut depth = 0;
3964    while i < len {
3965        let ch = expr.as_bytes()[i];
3966        if ch == b'(' {
3967            depth += 1;
3968        } else if ch == b')' {
3969            if depth > 0 {
3970                depth -= 1;
3971            }
3972        } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
3973            parts.push(&expr[start..i]);
3974            start = i + 5;
3975            i = start;
3976            continue;
3977        }
3978        i += 1;
3979    }
3980    parts.push(&expr[start..]);
3981    parts
3982}
3983
3984fn split_on_or(expr: &str) -> Vec<&str> {
3985    let mut parts = Vec::new();
3986    let mut start = 0;
3987    let len = expr.len();
3988    let mut i = 0;
3989    let mut depth = 0;
3990    while i < len {
3991        let ch = expr.as_bytes()[i];
3992        if ch == b'(' {
3993            depth += 1;
3994        } else if ch == b')' {
3995            if depth > 0 {
3996                depth -= 1;
3997            }
3998        } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
3999            parts.push(&expr[start..i]);
4000            start = i + 4;
4001            i = start;
4002            continue;
4003        }
4004        i += 1;
4005    }
4006    parts.push(&expr[start..]);
4007    parts
4008}
4009
4010fn evaluate_single_key_condition(
4011    part: &str,
4012    item: &HashMap<String, AttributeValue>,
4013    _hash_key_name: &str,
4014    expr_attr_names: &HashMap<String, String>,
4015    expr_attr_values: &HashMap<String, Value>,
4016) -> bool {
4017    let part = part.trim();
4018
4019    // begins_with(attr, :val) — S type only
4020    if let Some(rest) = part
4021        .strip_prefix("begins_with(")
4022        .or_else(|| part.strip_prefix("begins_with ("))
4023    {
4024        if let Some(inner) = rest.strip_suffix(')') {
4025            let mut split = inner.splitn(2, ',');
4026            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4027                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4028                let val_ref = val_ref.trim();
4029                let expected = expr_attr_values.get(val_ref);
4030                let actual = item.get(&attr_name);
4031                return match (actual, expected) {
4032                    (Some(a), Some(e)) => {
4033                        let a_str = a.get("S").and_then(|v| v.as_str());
4034                        let e_str = e.get("S").and_then(|v| v.as_str());
4035                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
4036                    }
4037                    _ => false,
4038                };
4039            }
4040        }
4041        return false;
4042    }
4043
4044    // BETWEEN
4045    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
4046        let attr_part = part[..between_pos].trim();
4047        let attr_name = resolve_attr_name(attr_part, expr_attr_names);
4048        let range_part = &part[between_pos + 7..];
4049        if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
4050            let lo_ref = range_part[..and_pos].trim();
4051            let hi_ref = range_part[and_pos + 5..].trim();
4052            let lo = expr_attr_values.get(lo_ref);
4053            let hi = expr_attr_values.get(hi_ref);
4054            let actual = item.get(&attr_name);
4055            return match (actual, lo, hi) {
4056                (Some(a), Some(l), Some(h)) => {
4057                    compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
4058                        && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
4059                }
4060                _ => false,
4061            };
4062        }
4063    }
4064
4065    // Simple comparison: attr <op> :val
4066    for op in &["<=", ">=", "<>", "=", "<", ">"] {
4067        if let Some(pos) = part.find(op) {
4068            let left = part[..pos].trim();
4069            let right = part[pos + op.len()..].trim();
4070            let attr_name = resolve_attr_name(left, expr_attr_names);
4071            let expected = expr_attr_values.get(right);
4072            let actual = item.get(&attr_name);
4073
4074            return match *op {
4075                "=" => actual == expected,
4076                "<>" => actual != expected,
4077                "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
4078                ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
4079                "<=" => {
4080                    let cmp = compare_attribute_values(actual, expected);
4081                    cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
4082                }
4083                ">=" => {
4084                    let cmp = compare_attribute_values(actual, expected);
4085                    cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
4086                }
4087                _ => false,
4088            };
4089        }
4090    }
4091
4092    false
4093}
4094
4095/// Returns the "size" of a DynamoDB attribute value per AWS docs:
4096/// S → character count, N → always 0 (AWS returns size of internal representation, we approximate),
4097/// B → byte count, SS/NS/BS → element count, L → element count, M → element count,
4098/// BOOL/NULL → 1.
4099fn attribute_size(val: &Value) -> Option<usize> {
4100    if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
4101        return Some(s.len());
4102    }
4103    if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
4104        // B is base64-encoded, decode length = raw bytes
4105        return Some(b.len()); // approximate with encoded length
4106    }
4107    if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
4108        return Some(arr.len());
4109    }
4110    if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
4111        return Some(arr.len());
4112    }
4113    if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
4114        return Some(arr.len());
4115    }
4116    if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
4117        return Some(arr.len());
4118    }
4119    if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
4120        return Some(obj.len());
4121    }
4122    if val.get("N").is_some() {
4123        // AWS returns numeric representation size; approximate with string length
4124        return val.get("N").and_then(|v| v.as_str()).map(|s| s.len());
4125    }
4126    if val.get("BOOL").is_some() || val.get("NULL").is_some() {
4127        return Some(1);
4128    }
4129    None
4130}
4131
4132/// Evaluate a `size(path) op :val` comparison expression.
4133fn evaluate_size_comparison(
4134    part: &str,
4135    item: &HashMap<String, AttributeValue>,
4136    expr_attr_names: &HashMap<String, String>,
4137    expr_attr_values: &HashMap<String, Value>,
4138) -> Option<bool> {
4139    // Find the closing paren of size(...)
4140    let open = part.find('(')?;
4141    let close = part[open..].find(')')? + open;
4142    let path = part[open + 1..close].trim();
4143    let remainder = part[close + 1..].trim();
4144
4145    // Parse operator and value ref
4146    let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
4147        ("<=", rest.trim())
4148    } else if let Some(rest) = remainder.strip_prefix(">=") {
4149        (">=", rest.trim())
4150    } else if let Some(rest) = remainder.strip_prefix("<>") {
4151        ("<>", rest.trim())
4152    } else if let Some(rest) = remainder.strip_prefix('<') {
4153        ("<", rest.trim())
4154    } else if let Some(rest) = remainder.strip_prefix('>') {
4155        (">", rest.trim())
4156    } else if let Some(rest) = remainder.strip_prefix('=') {
4157        ("=", rest.trim())
4158    } else {
4159        return None;
4160    };
4161
4162    let attr_name = resolve_attr_name(path, expr_attr_names);
4163    let actual = item.get(&attr_name)?;
4164    let size = attribute_size(actual)? as f64;
4165
4166    let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
4167
4168    Some(match op {
4169        "=" => (size - expected).abs() < f64::EPSILON,
4170        "<>" => (size - expected).abs() >= f64::EPSILON,
4171        "<" => size < expected,
4172        ">" => size > expected,
4173        "<=" => size <= expected,
4174        ">=" => size >= expected,
4175        _ => false,
4176    })
4177}
4178
4179fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
4180    match (a, b) {
4181        (None, None) => std::cmp::Ordering::Equal,
4182        (None, Some(_)) => std::cmp::Ordering::Less,
4183        (Some(_), None) => std::cmp::Ordering::Greater,
4184        (Some(a), Some(b)) => {
4185            let a_type = attribute_type_and_value(a);
4186            let b_type = attribute_type_and_value(b);
4187            match (a_type, b_type) {
4188                (Some(("S", a_val)), Some(("S", b_val))) => {
4189                    let a_str = a_val.as_str().unwrap_or("");
4190                    let b_str = b_val.as_str().unwrap_or("");
4191                    a_str.cmp(b_str)
4192                }
4193                (Some(("N", a_val)), Some(("N", b_val))) => {
4194                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4195                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4196                    a_num
4197                        .partial_cmp(&b_num)
4198                        .unwrap_or(std::cmp::Ordering::Equal)
4199                }
4200                (Some(("B", a_val)), Some(("B", b_val))) => {
4201                    let a_str = a_val.as_str().unwrap_or("");
4202                    let b_str = b_val.as_str().unwrap_or("");
4203                    a_str.cmp(b_str)
4204                }
4205                _ => std::cmp::Ordering::Equal,
4206            }
4207        }
4208    }
4209}
4210
4211fn evaluate_filter_expression(
4212    expr: &str,
4213    item: &HashMap<String, AttributeValue>,
4214    expr_attr_names: &HashMap<String, String>,
4215    expr_attr_values: &HashMap<String, Value>,
4216) -> bool {
4217    let trimmed = expr.trim();
4218
4219    // Split on OR first (lower precedence), respecting parentheses
4220    let or_parts = split_on_or(trimmed);
4221    if or_parts.len() > 1 {
4222        return or_parts.iter().any(|part| {
4223            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4224        });
4225    }
4226
4227    // Then split on AND (higher precedence), respecting parentheses
4228    let and_parts = split_on_and(trimmed);
4229    if and_parts.len() > 1 {
4230        return and_parts.iter().all(|part| {
4231            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4232        });
4233    }
4234
4235    // Strip outer parentheses if present
4236    let stripped = strip_outer_parens(trimmed);
4237    if stripped != trimmed {
4238        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
4239    }
4240
4241    // Handle NOT prefix (case-insensitive)
4242    if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
4243        return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
4244    }
4245
4246    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
4247}
4248
4249/// Strip matching outer parentheses from an expression.
4250fn strip_outer_parens(expr: &str) -> &str {
4251    let trimmed = expr.trim();
4252    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
4253        return trimmed;
4254    }
4255    // Verify the outer parens actually match each other
4256    let inner = &trimmed[1..trimmed.len() - 1];
4257    let mut depth = 0;
4258    for ch in inner.bytes() {
4259        match ch {
4260            b'(' => depth += 1,
4261            b')' => {
4262                if depth == 0 {
4263                    return trimmed; // closing paren matches something inside, not the outer one
4264                }
4265                depth -= 1;
4266            }
4267            _ => {}
4268        }
4269    }
4270    if depth == 0 {
4271        inner
4272    } else {
4273        trimmed
4274    }
4275}
4276
4277fn evaluate_single_filter_condition(
4278    part: &str,
4279    item: &HashMap<String, AttributeValue>,
4280    expr_attr_names: &HashMap<String, String>,
4281    expr_attr_values: &HashMap<String, Value>,
4282) -> bool {
4283    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
4284        let attr = resolve_attr_name(inner, expr_attr_names);
4285        return item.contains_key(&attr);
4286    }
4287
4288    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
4289        let attr = resolve_attr_name(inner, expr_attr_names);
4290        return !item.contains_key(&attr);
4291    }
4292
4293    // begins_with only works on S (string) type — not N
4294    if let Some(rest) = part
4295        .strip_prefix("begins_with(")
4296        .or_else(|| part.strip_prefix("begins_with ("))
4297    {
4298        if let Some(inner) = rest.strip_suffix(')') {
4299            let mut split = inner.splitn(2, ',');
4300            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4301                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4302                let expected = expr_attr_values.get(val_ref.trim());
4303                let actual = item.get(&attr_name);
4304                return match (actual, expected) {
4305                    (Some(a), Some(e)) => {
4306                        let a_str = a.get("S").and_then(|v| v.as_str());
4307                        let e_str = e.get("S").and_then(|v| v.as_str());
4308                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
4309                    }
4310                    _ => false,
4311                };
4312            }
4313        }
4314    }
4315
4316    // contains: works on S (substring), SS/NS/BS/L (set membership)
4317    if let Some(rest) = part
4318        .strip_prefix("contains(")
4319        .or_else(|| part.strip_prefix("contains ("))
4320    {
4321        if let Some(inner) = rest.strip_suffix(')') {
4322            let mut split = inner.splitn(2, ',');
4323            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4324                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4325                let expected = expr_attr_values.get(val_ref.trim());
4326                let actual = item.get(&attr_name);
4327                return match (actual, expected) {
4328                    (Some(a), Some(e)) => {
4329                        // String substring check (S type only)
4330                        if let (Some(a_s), Some(e_s)) = (
4331                            a.get("S").and_then(|v| v.as_str()),
4332                            e.get("S").and_then(|v| v.as_str()),
4333                        ) {
4334                            return a_s.contains(e_s);
4335                        }
4336                        // Set/list membership
4337                        if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
4338                            if let Some(val) = e.get("S") {
4339                                return set.contains(val);
4340                            }
4341                        }
4342                        if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
4343                            if let Some(val) = e.get("N") {
4344                                return set.contains(val);
4345                            }
4346                        }
4347                        if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
4348                            if let Some(val) = e.get("B") {
4349                                return set.contains(val);
4350                            }
4351                        }
4352                        if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
4353                            return list.contains(e);
4354                        }
4355                        false
4356                    }
4357                    _ => false,
4358                };
4359            }
4360        }
4361    }
4362
4363    // size(path) op :val — attribute size comparison
4364    if part.starts_with("size(") || part.starts_with("size (") {
4365        if let Some(result) =
4366            evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
4367        {
4368            return result;
4369        }
4370    }
4371
4372    // attribute_type(path, :type)
4373    if part.starts_with("attribute_type(") || part.starts_with("attribute_type (") {
4374        if let Some(rest) = part
4375            .strip_prefix("attribute_type(")
4376            .or_else(|| part.strip_prefix("attribute_type ("))
4377        {
4378            if let Some(inner) = rest.strip_suffix(')') {
4379                let mut split = inner.splitn(2, ',');
4380                if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4381                    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4382                    let expected_type = expr_attr_values
4383                        .get(val_ref.trim())
4384                        .and_then(|v| v.get("S"))
4385                        .and_then(|v| v.as_str());
4386                    let actual = item.get(&attr_name);
4387                    return match (actual, expected_type) {
4388                        (Some(val), Some(t)) => match t {
4389                            "S" => val.get("S").is_some(),
4390                            "N" => val.get("N").is_some(),
4391                            "B" => val.get("B").is_some(),
4392                            "BOOL" => val.get("BOOL").is_some(),
4393                            "NULL" => val.get("NULL").is_some(),
4394                            "SS" => val.get("SS").is_some(),
4395                            "NS" => val.get("NS").is_some(),
4396                            "BS" => val.get("BS").is_some(),
4397                            "L" => val.get("L").is_some(),
4398                            "M" => val.get("M").is_some(),
4399                            _ => false,
4400                        },
4401                        _ => false,
4402                    };
4403                }
4404            }
4405        }
4406    }
4407
4408    if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
4409        let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
4410        let actual = item.get(&attr_name);
4411        return evaluate_in_match(actual, &value_refs, expr_attr_values);
4412    }
4413
4414    evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
4415}
4416
4417/// Parse an `attr IN (:v1, :v2, ...)` expression. Mirrors the DynamoDB
4418/// ConditionExpression / FilterExpression grammar where IN takes a single
4419/// operand on the left and 1–100 comma-separated value refs inside parens
4420/// on the right. Case-insensitive; tolerates missing spaces after commas
4421/// (aws-sdk-go's `expression` builder emits ", " but hand-built expressions
4422/// often use `strings.Join(..., ",")`). Returns None for non-IN inputs so
4423/// callers can fall through to their other grammar branches.
4424fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
4425    let upper = expr.to_ascii_uppercase();
4426    let in_pos = upper.find(" IN ")?;
4427    let attr_ref = expr[..in_pos].trim();
4428    if attr_ref.is_empty() {
4429        return None;
4430    }
4431    let rest = expr[in_pos + 4..].trim_start();
4432    let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
4433    let values: Vec<&str> = inner
4434        .split(',')
4435        .map(|s| s.trim())
4436        .filter(|s| !s.is_empty())
4437        .collect();
4438    if values.is_empty() {
4439        return None;
4440    }
4441    Some((attr_ref, values))
4442}
4443
4444/// Return true iff `actual` equals any of the `value_refs` resolved through
4445/// `expr_attr_values`. A missing attribute never matches (mirrors AWS, which
4446/// evaluates `IN` against undefined attributes as false).
4447fn evaluate_in_match(
4448    actual: Option<&AttributeValue>,
4449    value_refs: &[&str],
4450    expr_attr_values: &HashMap<String, Value>,
4451) -> bool {
4452    value_refs.iter().any(|v_ref| {
4453        let expected = expr_attr_values.get(*v_ref);
4454        matches!((actual, expected), (Some(a), Some(e)) if a == e)
4455    })
4456}
4457
4458fn apply_update_expression(
4459    item: &mut HashMap<String, AttributeValue>,
4460    expr: &str,
4461    expr_attr_names: &HashMap<String, String>,
4462    expr_attr_values: &HashMap<String, Value>,
4463) -> Result<(), AwsServiceError> {
4464    let clauses = parse_update_clauses(expr);
4465    if clauses.is_empty() && !expr.trim().is_empty() {
4466        return Err(AwsServiceError::aws_error(
4467            StatusCode::BAD_REQUEST,
4468            "ValidationException",
4469            "Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
4470        ));
4471    }
4472    for (action, assignments) in &clauses {
4473        match action.to_ascii_uppercase().as_str() {
4474            "SET" => {
4475                for assignment in assignments {
4476                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4477                }
4478            }
4479            "REMOVE" => {
4480                for attr_ref in assignments {
4481                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4482                    item.remove(&attr);
4483                }
4484            }
4485            "ADD" => {
4486                for assignment in assignments {
4487                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4488                }
4489            }
4490            "DELETE" => {
4491                for assignment in assignments {
4492                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4493                }
4494            }
4495            other => {
4496                return Err(AwsServiceError::aws_error(
4497                    StatusCode::BAD_REQUEST,
4498                    "ValidationException",
4499                    format!("Invalid UpdateExpression: Invalid action: {}", other),
4500                ));
4501            }
4502        }
4503    }
4504    Ok(())
4505}
4506
4507fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
4508    let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
4509    let upper = expr.to_ascii_uppercase();
4510    let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
4511    let mut positions: Vec<(usize, &str)> = Vec::new();
4512
4513    for kw in &keywords {
4514        let mut search_from = 0;
4515        while let Some(pos) = upper[search_from..].find(kw) {
4516            let abs_pos = search_from + pos;
4517            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
4518            let after_pos = abs_pos + kw.len();
4519            let after_ok =
4520                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
4521            if before_ok && after_ok {
4522                positions.push((abs_pos, kw));
4523            }
4524            search_from = abs_pos + kw.len();
4525        }
4526    }
4527
4528    positions.sort_by_key(|(pos, _)| *pos);
4529
4530    for (i, &(pos, kw)) in positions.iter().enumerate() {
4531        let start = pos + kw.len();
4532        let end = if i + 1 < positions.len() {
4533            positions[i + 1].0
4534        } else {
4535            expr.len()
4536        };
4537        let content = expr[start..end].trim();
4538        let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
4539        clauses.push((kw.to_string(), assignments));
4540    }
4541
4542    clauses
4543}
4544
4545fn apply_set_assignment(
4546    item: &mut HashMap<String, AttributeValue>,
4547    assignment: &str,
4548    expr_attr_names: &HashMap<String, String>,
4549    expr_attr_values: &HashMap<String, Value>,
4550) -> Result<(), AwsServiceError> {
4551    let Some((left, right)) = assignment.split_once('=') else {
4552        return Ok(());
4553    };
4554
4555    let left_trimmed = left.trim();
4556    // Split off a trailing `[N]` list-index suffix so we can resolve the
4557    // attribute name ref on its own. Without this, `resolve_attr_name` sees
4558    // "#items[0]" as a whole and misses the `#items` → `items` mapping.
4559    let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
4560        Some((name, idx)) => (name, Some(idx)),
4561        None => (left_trimmed, None),
4562    };
4563    let attr = resolve_attr_name(attr_ref, expr_attr_names);
4564    let right = right.trim();
4565
4566    // if_not_exists(attr, :val)
4567    if let Some(rest) = right
4568        .strip_prefix("if_not_exists(")
4569        .or_else(|| right.strip_prefix("if_not_exists ("))
4570    {
4571        if let Some(inner) = rest.strip_suffix(')') {
4572            let mut split = inner.splitn(2, ',');
4573            if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
4574                let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
4575                if !item.contains_key(&check_name) {
4576                    if let Some(val) = expr_attr_values.get(default_ref.trim()) {
4577                        item.insert(attr, val.clone());
4578                    }
4579                }
4580                return Ok(());
4581            }
4582        }
4583    }
4584
4585    // list_append(a, b)
4586    if let Some(rest) = right
4587        .strip_prefix("list_append(")
4588        .or_else(|| right.strip_prefix("list_append ("))
4589    {
4590        if let Some(inner) = rest.strip_suffix(')') {
4591            let mut split = inner.splitn(2, ',');
4592            if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
4593                let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
4594                let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
4595
4596                let mut merged = Vec::new();
4597                if let Some(Value::Object(obj)) = &a_val {
4598                    if let Some(Value::Array(arr)) = obj.get("L") {
4599                        merged.extend(arr.clone());
4600                    }
4601                }
4602                if let Some(Value::Object(obj)) = &b_val {
4603                    if let Some(Value::Array(arr)) = obj.get("L") {
4604                        merged.extend(arr.clone());
4605                    }
4606                }
4607
4608                item.insert(attr, json!({"L": merged}));
4609                return Ok(());
4610            }
4611        }
4612    }
4613
4614    // Arithmetic: attr + :val or attr - :val
4615    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
4616        let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
4617        let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
4618
4619        // Both operands must be numeric (N type)
4620        let left_num = match extract_number(&left_val) {
4621            Some(n) => n,
4622            None if left_val.is_some() => {
4623                return Err(AwsServiceError::aws_error(
4624                    StatusCode::BAD_REQUEST,
4625                    "ValidationException",
4626                    "An operand in the update expression has an incorrect data type",
4627                ));
4628            }
4629            None => 0.0, // attribute doesn't exist yet — treat as 0
4630        };
4631        let right_num = match extract_number(&right_val) {
4632            Some(n) => n,
4633            None => {
4634                return Err(AwsServiceError::aws_error(
4635                    StatusCode::BAD_REQUEST,
4636                    "ValidationException",
4637                    "An operand in the update expression has an incorrect data type",
4638                ));
4639            }
4640        };
4641
4642        let result = if is_add {
4643            left_num + right_num
4644        } else {
4645            left_num - right_num
4646        };
4647
4648        let num_str = if result == result.trunc() {
4649            format!("{}", result as i64)
4650        } else {
4651            format!("{result}")
4652        };
4653
4654        item.insert(attr, json!({"N": num_str}));
4655        return Ok(());
4656    }
4657
4658    // Simple assignment
4659    let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
4660    if let Some(v) = val {
4661        match list_index {
4662            Some(idx) => assign_list_index(item, &attr, idx, v)?,
4663            None => {
4664                item.insert(attr, v);
4665            }
4666        }
4667    }
4668
4669    Ok(())
4670}
4671
4672/// Parse a trailing `[N]` list-index suffix off the LHS of a SET assignment.
4673/// Returns the bare attribute reference and the index, or None when the LHS
4674/// is a plain attribute (or a path shape we don't yet support).
4675fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
4676    let path = path.trim();
4677    if !path.ends_with(']') {
4678        return None;
4679    }
4680    let open = path.rfind('[')?;
4681    // Require no further `.` / `[` / `]` inside the bracketed portion and no
4682    // further path segments after — we only handle the single-index case
4683    // `name[N]`, not nested shapes like `a.b[0].c`.
4684    let idx_str = &path[open + 1..path.len() - 1];
4685    let idx: usize = idx_str.parse().ok()?;
4686    let name = &path[..open];
4687    if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
4688        return None;
4689    }
4690    Some((name, idx))
4691}
4692
4693/// Assign a value to a specific index of a `L`-typed attribute. If `idx` is
4694/// within the current list, replaces that slot; if it's at the end, appends.
4695/// AWS rejects writes beyond `len`, so we return a `ValidationException` for
4696/// out-of-range indices and non-list attributes.
4697fn assign_list_index(
4698    item: &mut HashMap<String, AttributeValue>,
4699    attr: &str,
4700    idx: usize,
4701    value: Value,
4702) -> Result<(), AwsServiceError> {
4703    let Some(existing) = item.get_mut(attr) else {
4704        return Err(AwsServiceError::aws_error(
4705            StatusCode::BAD_REQUEST,
4706            "ValidationException",
4707            "The document path provided in the update expression is invalid for update",
4708        ));
4709    };
4710    let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
4711        return Err(AwsServiceError::aws_error(
4712            StatusCode::BAD_REQUEST,
4713            "ValidationException",
4714            "The document path provided in the update expression is invalid for update",
4715        ));
4716    };
4717    if idx < list.len() {
4718        list[idx] = value;
4719    } else if idx == list.len() {
4720        list.push(value);
4721    } else {
4722        return Err(AwsServiceError::aws_error(
4723            StatusCode::BAD_REQUEST,
4724            "ValidationException",
4725            "The document path provided in the update expression is invalid for update",
4726        ));
4727    }
4728    Ok(())
4729}
4730
4731fn resolve_value(
4732    reference: &str,
4733    item: &HashMap<String, AttributeValue>,
4734    expr_attr_names: &HashMap<String, String>,
4735    expr_attr_values: &HashMap<String, Value>,
4736) -> Option<Value> {
4737    let reference = reference.trim();
4738    if reference.starts_with(':') {
4739        expr_attr_values.get(reference).cloned()
4740    } else {
4741        let attr_name = resolve_attr_name(reference, expr_attr_names);
4742        item.get(&attr_name).cloned()
4743    }
4744}
4745
4746fn extract_number(val: &Option<Value>) -> Option<f64> {
4747    val.as_ref()
4748        .and_then(|v| v.get("N"))
4749        .and_then(|n| n.as_str())
4750        .and_then(|s| s.parse().ok())
4751}
4752
4753fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
4754    let mut depth = 0;
4755    for (i, c) in expr.char_indices() {
4756        match c {
4757            '(' => depth += 1,
4758            ')' => depth -= 1,
4759            '+' if depth == 0 && i > 0 => {
4760                return Some((&expr[..i], &expr[i + 1..], true));
4761            }
4762            '-' if depth == 0 && i > 0 => {
4763                return Some((&expr[..i], &expr[i + 1..], false));
4764            }
4765            _ => {}
4766        }
4767    }
4768    None
4769}
4770
4771fn apply_add_assignment(
4772    item: &mut HashMap<String, AttributeValue>,
4773    assignment: &str,
4774    expr_attr_names: &HashMap<String, String>,
4775    expr_attr_values: &HashMap<String, Value>,
4776) -> Result<(), AwsServiceError> {
4777    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4778    if parts.len() != 2 {
4779        return Ok(());
4780    }
4781
4782    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4783    let val_ref = parts[1].trim();
4784    let add_val = expr_attr_values.get(val_ref);
4785
4786    if let Some(add_val) = add_val {
4787        if let Some(existing) = item.get(&attr) {
4788            if let (Some(existing_num), Some(add_num)) = (
4789                extract_number(&Some(existing.clone())),
4790                extract_number(&Some(add_val.clone())),
4791            ) {
4792                let result = existing_num + add_num;
4793                let num_str = if result == result.trunc() {
4794                    format!("{}", result as i64)
4795                } else {
4796                    format!("{result}")
4797                };
4798                item.insert(attr, json!({"N": num_str}));
4799            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
4800                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
4801                    let mut merged: Vec<Value> = existing_set.clone();
4802                    for v in add_set {
4803                        if !merged.contains(v) {
4804                            merged.push(v.clone());
4805                        }
4806                    }
4807                    item.insert(attr, json!({"SS": merged}));
4808                }
4809            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
4810                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
4811                    let mut merged: Vec<Value> = existing_set.clone();
4812                    for v in add_set {
4813                        if !merged.contains(v) {
4814                            merged.push(v.clone());
4815                        }
4816                    }
4817                    item.insert(attr, json!({"NS": merged}));
4818                }
4819            } else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
4820                if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
4821                    let mut merged: Vec<Value> = existing_set.clone();
4822                    for v in add_set {
4823                        if !merged.contains(v) {
4824                            merged.push(v.clone());
4825                        }
4826                    }
4827                    item.insert(attr, json!({"BS": merged}));
4828                }
4829            }
4830        } else {
4831            item.insert(attr, add_val.clone());
4832        }
4833    }
4834
4835    Ok(())
4836}
4837
4838fn apply_delete_assignment(
4839    item: &mut HashMap<String, AttributeValue>,
4840    assignment: &str,
4841    expr_attr_names: &HashMap<String, String>,
4842    expr_attr_values: &HashMap<String, Value>,
4843) -> Result<(), AwsServiceError> {
4844    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4845    if parts.len() != 2 {
4846        return Ok(());
4847    }
4848
4849    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4850    let val_ref = parts[1].trim();
4851    let del_val = expr_attr_values.get(val_ref);
4852
4853    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
4854        if let (Some(existing_set), Some(del_set)) = (
4855            existing.get("SS").and_then(|v| v.as_array()),
4856            del_val.get("SS").and_then(|v| v.as_array()),
4857        ) {
4858            let filtered: Vec<Value> = existing_set
4859                .iter()
4860                .filter(|v| !del_set.contains(v))
4861                .cloned()
4862                .collect();
4863            if filtered.is_empty() {
4864                item.remove(&attr);
4865            } else {
4866                item.insert(attr, json!({"SS": filtered}));
4867            }
4868        } else if let (Some(existing_set), Some(del_set)) = (
4869            existing.get("NS").and_then(|v| v.as_array()),
4870            del_val.get("NS").and_then(|v| v.as_array()),
4871        ) {
4872            let filtered: Vec<Value> = existing_set
4873                .iter()
4874                .filter(|v| !del_set.contains(v))
4875                .cloned()
4876                .collect();
4877            if filtered.is_empty() {
4878                item.remove(&attr);
4879            } else {
4880                item.insert(attr, json!({"NS": filtered}));
4881            }
4882        } else if let (Some(existing_set), Some(del_set)) = (
4883            existing.get("BS").and_then(|v| v.as_array()),
4884            del_val.get("BS").and_then(|v| v.as_array()),
4885        ) {
4886            let filtered: Vec<Value> = existing_set
4887                .iter()
4888                .filter(|v| !del_set.contains(v))
4889                .cloned()
4890                .collect();
4891            if filtered.is_empty() {
4892                item.remove(&attr);
4893            } else {
4894                item.insert(attr, json!({"BS": filtered}));
4895            }
4896        }
4897    }
4898
4899    Ok(())
4900}
4901
4902#[allow(clippy::too_many_arguments)]
4903fn build_table_description_json(
4904    arn: &str,
4905    key_schema: &[KeySchemaElement],
4906    attribute_definitions: &[AttributeDefinition],
4907    provisioned_throughput: &ProvisionedThroughput,
4908    gsi: &[GlobalSecondaryIndex],
4909    lsi: &[LocalSecondaryIndex],
4910    billing_mode: &str,
4911    created_at: chrono::DateTime<chrono::Utc>,
4912    item_count: i64,
4913    size_bytes: i64,
4914    status: &str,
4915) -> Value {
4916    let table_name = arn.rsplit('/').next().unwrap_or("");
4917    let creation_timestamp =
4918        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
4919
4920    let ks: Vec<Value> = key_schema
4921        .iter()
4922        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4923        .collect();
4924
4925    let ad: Vec<Value> = attribute_definitions
4926        .iter()
4927        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
4928        .collect();
4929
4930    let mut desc = json!({
4931        "TableName": table_name,
4932        "TableArn": arn,
4933        "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
4934        "TableStatus": status,
4935        "KeySchema": ks,
4936        "AttributeDefinitions": ad,
4937        "CreationDateTime": creation_timestamp,
4938        "ItemCount": item_count,
4939        "TableSizeBytes": size_bytes,
4940        "BillingModeSummary": { "BillingMode": billing_mode },
4941    });
4942
4943    if billing_mode != "PAY_PER_REQUEST" {
4944        desc["ProvisionedThroughput"] = json!({
4945            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
4946            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
4947            "NumberOfDecreasesToday": 0,
4948        });
4949    } else {
4950        desc["ProvisionedThroughput"] = json!({
4951            "ReadCapacityUnits": 0,
4952            "WriteCapacityUnits": 0,
4953            "NumberOfDecreasesToday": 0,
4954        });
4955    }
4956
4957    if !gsi.is_empty() {
4958        let gsi_json: Vec<Value> = gsi
4959            .iter()
4960            .map(|g| {
4961                let gks: Vec<Value> = g
4962                    .key_schema
4963                    .iter()
4964                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4965                    .collect();
4966                let mut idx = json!({
4967                    "IndexName": g.index_name,
4968                    "KeySchema": gks,
4969                    "Projection": { "ProjectionType": g.projection.projection_type },
4970                    "IndexStatus": "ACTIVE",
4971                    "IndexArn": format!("{arn}/index/{}", g.index_name),
4972                    "ItemCount": 0,
4973                    "IndexSizeBytes": 0,
4974                });
4975                if !g.projection.non_key_attributes.is_empty() {
4976                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
4977                }
4978                if let Some(ref pt) = g.provisioned_throughput {
4979                    idx["ProvisionedThroughput"] = json!({
4980                        "ReadCapacityUnits": pt.read_capacity_units,
4981                        "WriteCapacityUnits": pt.write_capacity_units,
4982                        "NumberOfDecreasesToday": 0,
4983                    });
4984                }
4985                idx
4986            })
4987            .collect();
4988        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
4989    }
4990
4991    if !lsi.is_empty() {
4992        let lsi_json: Vec<Value> = lsi
4993            .iter()
4994            .map(|l| {
4995                let lks: Vec<Value> = l
4996                    .key_schema
4997                    .iter()
4998                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4999                    .collect();
5000                let mut idx = json!({
5001                    "IndexName": l.index_name,
5002                    "KeySchema": lks,
5003                    "Projection": { "ProjectionType": l.projection.projection_type },
5004                    "IndexArn": format!("{arn}/index/{}", l.index_name),
5005                    "ItemCount": 0,
5006                    "IndexSizeBytes": 0,
5007                });
5008                if !l.projection.non_key_attributes.is_empty() {
5009                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
5010                }
5011                idx
5012            })
5013            .collect();
5014        desc["LocalSecondaryIndexes"] = json!(lsi_json);
5015    }
5016
5017    desc
5018}
5019
5020fn build_table_description(table: &DynamoTable) -> Value {
5021    let mut desc = build_table_description_json(
5022        &table.arn,
5023        &table.key_schema,
5024        &table.attribute_definitions,
5025        &table.provisioned_throughput,
5026        &table.gsi,
5027        &table.lsi,
5028        &table.billing_mode,
5029        table.created_at,
5030        table.item_count,
5031        table.size_bytes,
5032        &table.status,
5033    );
5034
5035    // Add stream specification if streams are enabled
5036    if table.stream_enabled {
5037        if let Some(ref stream_arn) = table.stream_arn {
5038            desc["LatestStreamArn"] = json!(stream_arn);
5039            desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
5040        }
5041        if let Some(ref view_type) = table.stream_view_type {
5042            desc["StreamSpecification"] = json!({
5043                "StreamEnabled": true,
5044                "StreamViewType": view_type,
5045            });
5046        }
5047    }
5048
5049    // Add SSE description
5050    if let Some(ref sse_type) = table.sse_type {
5051        let mut sse_desc = json!({
5052            "Status": "ENABLED",
5053            "SSEType": sse_type,
5054        });
5055        if let Some(ref key_arn) = table.sse_kms_key_arn {
5056            sse_desc["KMSMasterKeyArn"] = json!(key_arn);
5057        }
5058        desc["SSEDescription"] = sse_desc;
5059    } else {
5060        // Default: AWS owned key encryption (always enabled in real AWS)
5061        desc["SSEDescription"] = json!({
5062            "Status": "ENABLED",
5063            "SSEType": "AES256",
5064        });
5065    }
5066
5067    desc
5068}
5069
5070fn execute_partiql_statement(
5071    state: &SharedDynamoDbState,
5072    statement: &str,
5073    parameters: &[Value],
5074) -> Result<AwsResponse, AwsServiceError> {
5075    let trimmed = statement.trim();
5076    let upper = trimmed.to_ascii_uppercase();
5077
5078    if upper.starts_with("SELECT") {
5079        execute_partiql_select(state, trimmed, parameters)
5080    } else if upper.starts_with("INSERT") {
5081        execute_partiql_insert(state, trimmed, parameters)
5082    } else if upper.starts_with("UPDATE") {
5083        execute_partiql_update(state, trimmed, parameters)
5084    } else if upper.starts_with("DELETE") {
5085        execute_partiql_delete(state, trimmed, parameters)
5086    } else {
5087        Err(AwsServiceError::aws_error(
5088            StatusCode::BAD_REQUEST,
5089            "ValidationException",
5090            format!("Unsupported PartiQL statement: {trimmed}"),
5091        ))
5092    }
5093}
5094
5095/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
5096fn execute_partiql_select(
5097    state: &SharedDynamoDbState,
5098    statement: &str,
5099    parameters: &[Value],
5100) -> Result<AwsResponse, AwsServiceError> {
5101    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
5102    let upper = statement.to_ascii_uppercase();
5103    let from_pos = upper.find("FROM").ok_or_else(|| {
5104        AwsServiceError::aws_error(
5105            StatusCode::BAD_REQUEST,
5106            "ValidationException",
5107            "Invalid SELECT statement: missing FROM",
5108        )
5109    })?;
5110
5111    let after_from = statement[from_pos + 4..].trim();
5112    let (table_name, rest) = parse_partiql_table_name(after_from);
5113
5114    let state = state.read();
5115    let table = get_table(&state.tables, &table_name)?;
5116
5117    let rest_upper = rest.trim().to_ascii_uppercase();
5118    if rest_upper.starts_with("WHERE") {
5119        let where_clause = rest.trim()[5..].trim();
5120        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
5121        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
5122        DynamoDbService::ok_json(json!({ "Items": items }))
5123    } else {
5124        // No WHERE, return all items
5125        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
5126        DynamoDbService::ok_json(json!({ "Items": items }))
5127    }
5128}
5129
5130fn execute_partiql_insert(
5131    state: &SharedDynamoDbState,
5132    statement: &str,
5133    parameters: &[Value],
5134) -> Result<AwsResponse, AwsServiceError> {
5135    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
5136    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
5137    let upper = statement.to_ascii_uppercase();
5138    let into_pos = upper.find("INTO").ok_or_else(|| {
5139        AwsServiceError::aws_error(
5140            StatusCode::BAD_REQUEST,
5141            "ValidationException",
5142            "Invalid INSERT statement: missing INTO",
5143        )
5144    })?;
5145
5146    let after_into = statement[into_pos + 4..].trim();
5147    let (table_name, rest) = parse_partiql_table_name(after_into);
5148
5149    let rest_upper = rest.trim().to_ascii_uppercase();
5150    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
5151        AwsServiceError::aws_error(
5152            StatusCode::BAD_REQUEST,
5153            "ValidationException",
5154            "Invalid INSERT statement: missing VALUE",
5155        )
5156    })?;
5157
5158    let value_str = rest.trim()[value_pos + 5..].trim();
5159    let item = parse_partiql_value_object(value_str, parameters)?;
5160
5161    let mut state = state.write();
5162    let table = get_table_mut(&mut state.tables, &table_name)?;
5163    let key = extract_key(table, &item);
5164    if table.find_item_index(&key).is_some() {
5165        // DynamoDB PartiQL INSERT fails if item exists
5166        return Err(AwsServiceError::aws_error(
5167            StatusCode::BAD_REQUEST,
5168            "DuplicateItemException",
5169            "Duplicate primary key exists in table",
5170        ));
5171    } else {
5172        table.items.push(item);
5173    }
5174    table.recalculate_stats();
5175
5176    DynamoDbService::ok_json(json!({}))
5177}
5178
5179fn execute_partiql_update(
5180    state: &SharedDynamoDbState,
5181    statement: &str,
5182    parameters: &[Value],
5183) -> Result<AwsResponse, AwsServiceError> {
5184    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
5185    // or: UPDATE "tablename" SET attr=? WHERE pk=?
5186    let after_update = statement[6..].trim(); // skip "UPDATE"
5187    let (table_name, rest) = parse_partiql_table_name(after_update);
5188
5189    let rest_upper = rest.trim().to_ascii_uppercase();
5190    let set_pos = rest_upper.find("SET").ok_or_else(|| {
5191        AwsServiceError::aws_error(
5192            StatusCode::BAD_REQUEST,
5193            "ValidationException",
5194            "Invalid UPDATE statement: missing SET",
5195        )
5196    })?;
5197
5198    let after_set = rest.trim()[set_pos + 3..].trim();
5199
5200    // Split on WHERE
5201    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
5202    let (set_clause, where_clause) = if let Some(wp) = where_pos {
5203        (&after_set[..wp], after_set[wp + 5..].trim())
5204    } else {
5205        (after_set, "")
5206    };
5207
5208    let mut state = state.write();
5209    let table = get_table_mut(&mut state.tables, &table_name)?;
5210
5211    let matched_indices = if !where_clause.is_empty() {
5212        find_partiql_where_indices(table, where_clause, parameters)?
5213    } else {
5214        (0..table.items.len()).collect()
5215    };
5216
5217    // Parse SET assignments: attr=value, attr2=value2
5218    let param_offset = count_params_in_str(where_clause);
5219    let assignments: Vec<&str> = set_clause.split(',').collect();
5220    for idx in &matched_indices {
5221        let mut local_offset = param_offset;
5222        for assignment in &assignments {
5223            let assignment = assignment.trim();
5224            if let Some((attr, val_str)) = assignment.split_once('=') {
5225                let attr = attr.trim().trim_matches('"');
5226                let val_str = val_str.trim();
5227                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
5228                if let Some(v) = value {
5229                    table.items[*idx].insert(attr.to_string(), v);
5230                }
5231            }
5232        }
5233    }
5234    table.recalculate_stats();
5235
5236    DynamoDbService::ok_json(json!({}))
5237}
5238
5239fn execute_partiql_delete(
5240    state: &SharedDynamoDbState,
5241    statement: &str,
5242    parameters: &[Value],
5243) -> Result<AwsResponse, AwsServiceError> {
5244    // Pattern: DELETE FROM "tablename" WHERE pk='val'
5245    let upper = statement.to_ascii_uppercase();
5246    let from_pos = upper.find("FROM").ok_or_else(|| {
5247        AwsServiceError::aws_error(
5248            StatusCode::BAD_REQUEST,
5249            "ValidationException",
5250            "Invalid DELETE statement: missing FROM",
5251        )
5252    })?;
5253
5254    let after_from = statement[from_pos + 4..].trim();
5255    let (table_name, rest) = parse_partiql_table_name(after_from);
5256
5257    let rest_upper = rest.trim().to_ascii_uppercase();
5258    if !rest_upper.starts_with("WHERE") {
5259        return Err(AwsServiceError::aws_error(
5260            StatusCode::BAD_REQUEST,
5261            "ValidationException",
5262            "DELETE requires a WHERE clause",
5263        ));
5264    }
5265    let where_clause = rest.trim()[5..].trim();
5266
5267    let mut state = state.write();
5268    let table = get_table_mut(&mut state.tables, &table_name)?;
5269
5270    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
5271    // Remove from highest index first to avoid invalidating lower indices
5272    indices.sort_unstable();
5273    indices.reverse();
5274    for idx in indices {
5275        table.items.remove(idx);
5276    }
5277    table.recalculate_stats();
5278
5279    DynamoDbService::ok_json(json!({}))
5280}
5281
5282/// Parse a table name that may be quoted with double quotes.
5283/// Returns (table_name, rest_of_string).
5284fn parse_partiql_table_name(s: &str) -> (String, &str) {
5285    let s = s.trim();
5286    if let Some(stripped) = s.strip_prefix('"') {
5287        // Quoted name
5288        if let Some(end) = stripped.find('"') {
5289            let name = &stripped[..end];
5290            let rest = &stripped[end + 1..];
5291            (name.to_string(), rest)
5292        } else {
5293            let end = s.find(' ').unwrap_or(s.len());
5294            (s[..end].trim_matches('"').to_string(), &s[end..])
5295        }
5296    } else {
5297        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
5298        (s[..end].to_string(), &s[end..])
5299    }
5300}
5301
5302/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
5303/// Returns matching items.
5304fn evaluate_partiql_where<'a>(
5305    table: &'a DynamoTable,
5306    where_clause: &str,
5307    parameters: &[Value],
5308) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
5309    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
5310    Ok(indices.iter().map(|i| &table.items[*i]).collect())
5311}
5312
5313fn find_partiql_where_indices(
5314    table: &DynamoTable,
5315    where_clause: &str,
5316    parameters: &[Value],
5317) -> Result<Vec<usize>, AwsServiceError> {
5318    // Support: col = 'val' AND col2 = 'val2'  or  col = ? AND col2 = ?
5319    // Case-insensitive AND splitting
5320    let upper = where_clause.to_uppercase();
5321    let conditions = if upper.contains(" AND ") {
5322        // Find positions of " AND " case-insensitively and split
5323        let mut parts = Vec::new();
5324        let mut last = 0;
5325        for (i, _) in upper.match_indices(" AND ") {
5326            parts.push(where_clause[last..i].trim());
5327            last = i + 5;
5328        }
5329        parts.push(where_clause[last..].trim());
5330        parts
5331    } else {
5332        vec![where_clause.trim()]
5333    };
5334
5335    let mut param_idx = 0usize;
5336    let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
5337
5338    for cond in &conditions {
5339        let cond = cond.trim();
5340        if let Some((left, right)) = cond.split_once('=') {
5341            let attr = left.trim().trim_matches('"').to_string();
5342            let val_str = right.trim();
5343            let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
5344            if let Some(v) = value {
5345                parsed_conditions.push((attr, v));
5346            }
5347        }
5348    }
5349
5350    let mut indices = Vec::new();
5351    for (i, item) in table.items.iter().enumerate() {
5352        let all_match = parsed_conditions
5353            .iter()
5354            .all(|(attr, expected)| item.get(attr) == Some(expected));
5355        if all_match {
5356            indices.push(i);
5357        }
5358    }
5359
5360    Ok(indices)
5361}
5362
5363/// Parse a PartiQL literal value. Supports:
5364/// - 'string' -> {"S": "string"}
5365/// - 123 -> {"N": "123"}
5366/// - ? -> parameter from list
5367fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
5368    let s = s.trim();
5369    if s == "?" {
5370        let idx = *param_idx;
5371        *param_idx += 1;
5372        parameters.get(idx).cloned()
5373    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
5374        let inner = &s[1..s.len() - 1];
5375        Some(json!({"S": inner}))
5376    } else if let Ok(n) = s.parse::<f64>() {
5377        let num_str = if n == n.trunc() {
5378            format!("{}", n as i64)
5379        } else {
5380            format!("{n}")
5381        };
5382        Some(json!({"N": num_str}))
5383    } else {
5384        None
5385    }
5386}
5387
5388/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
5389fn parse_partiql_value_object(
5390    s: &str,
5391    parameters: &[Value],
5392) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
5393    let s = s.trim();
5394    let inner = s
5395        .strip_prefix('{')
5396        .and_then(|s| s.strip_suffix('}'))
5397        .ok_or_else(|| {
5398            AwsServiceError::aws_error(
5399                StatusCode::BAD_REQUEST,
5400                "ValidationException",
5401                "Invalid VALUE: expected object literal",
5402            )
5403        })?;
5404
5405    let mut item = HashMap::new();
5406    let mut param_idx = 0usize;
5407
5408    // Simple comma-separated key:value parsing
5409    for pair in split_partiql_pairs(inner) {
5410        let pair = pair.trim();
5411        if pair.is_empty() {
5412            continue;
5413        }
5414        if let Some((key_part, val_part)) = pair.split_once(':') {
5415            let key = key_part
5416                .trim()
5417                .trim_matches('\'')
5418                .trim_matches('"')
5419                .to_string();
5420            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
5421                item.insert(key, val);
5422            }
5423        }
5424    }
5425
5426    Ok(item)
5427}
5428
5429/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
5430fn split_partiql_pairs(s: &str) -> Vec<&str> {
5431    let mut parts = Vec::new();
5432    let mut start = 0;
5433    let mut depth = 0;
5434    let mut in_quote = false;
5435
5436    for (i, c) in s.char_indices() {
5437        match c {
5438            '\'' if !in_quote => in_quote = true,
5439            '\'' if in_quote => in_quote = false,
5440            '{' if !in_quote => depth += 1,
5441            '}' if !in_quote => depth -= 1,
5442            ',' if !in_quote && depth == 0 => {
5443                parts.push(&s[start..i]);
5444                start = i + 1;
5445            }
5446            _ => {}
5447        }
5448    }
5449    parts.push(&s[start..]);
5450    parts
5451}
5452
5453/// Count ? parameters in a string.
5454fn count_params_in_str(s: &str) -> usize {
5455    s.chars().filter(|c| *c == '?').count()
5456}
5457
5458#[cfg(test)]
5459mod tests {
5460    use super::*;
5461    use serde_json::json;
5462
5463    #[test]
5464    fn test_parse_update_clauses_set() {
5465        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
5466        assert_eq!(clauses.len(), 1);
5467        assert_eq!(clauses[0].0, "SET");
5468        assert_eq!(clauses[0].1.len(), 2);
5469    }
5470
5471    #[test]
5472    fn test_parse_update_clauses_set_and_remove() {
5473        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
5474        assert_eq!(clauses.len(), 2);
5475        assert_eq!(clauses[0].0, "SET");
5476        assert_eq!(clauses[1].0, "REMOVE");
5477    }
5478
5479    #[test]
5480    fn test_evaluate_key_condition_simple() {
5481        let mut item = HashMap::new();
5482        item.insert("pk".to_string(), json!({"S": "user1"}));
5483        item.insert("sk".to_string(), json!({"S": "order1"}));
5484
5485        let mut expr_values = HashMap::new();
5486        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
5487
5488        assert!(evaluate_key_condition(
5489            "pk = :pk",
5490            &item,
5491            "pk",
5492            Some("sk"),
5493            &HashMap::new(),
5494            &expr_values,
5495        ));
5496    }
5497
5498    #[test]
5499    fn test_compare_attribute_values_numbers() {
5500        let a = json!({"N": "10"});
5501        let b = json!({"N": "20"});
5502        assert_eq!(
5503            compare_attribute_values(Some(&a), Some(&b)),
5504            std::cmp::Ordering::Less
5505        );
5506    }
5507
5508    #[test]
5509    fn test_compare_attribute_values_strings() {
5510        let a = json!({"S": "apple"});
5511        let b = json!({"S": "banana"});
5512        assert_eq!(
5513            compare_attribute_values(Some(&a), Some(&b)),
5514            std::cmp::Ordering::Less
5515        );
5516    }
5517
5518    #[test]
5519    fn test_split_on_and() {
5520        let parts = split_on_and("pk = :pk AND sk > :sk");
5521        assert_eq!(parts.len(), 2);
5522        assert_eq!(parts[0].trim(), "pk = :pk");
5523        assert_eq!(parts[1].trim(), "sk > :sk");
5524    }
5525
5526    #[test]
5527    fn test_split_on_and_respects_parentheses() {
5528        // Before fix: split_on_and would split inside the parens
5529        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
5530        // Should NOT split on the AND inside parentheses
5531        assert_eq!(parts.len(), 1);
5532        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
5533    }
5534
5535    #[test]
5536    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
5537        // (a AND b) OR c — should match when c is true but a is false
5538        let mut item = HashMap::new();
5539        item.insert("x".to_string(), json!({"S": "no"}));
5540        item.insert("y".to_string(), json!({"S": "no"}));
5541        item.insert("z".to_string(), json!({"S": "yes"}));
5542
5543        let mut expr_values = HashMap::new();
5544        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
5545
5546        // x=yes AND y=yes => false, but z=yes => true => overall true
5547        let result = evaluate_filter_expression(
5548            "(x = :yes AND y = :yes) OR z = :yes",
5549            &item,
5550            &HashMap::new(),
5551            &expr_values,
5552        );
5553        assert!(result, "should match because z = :yes is true");
5554
5555        // x=yes AND y=yes => false, z=yes => false => overall false
5556        let mut item2 = HashMap::new();
5557        item2.insert("x".to_string(), json!({"S": "no"}));
5558        item2.insert("y".to_string(), json!({"S": "no"}));
5559        item2.insert("z".to_string(), json!({"S": "no"}));
5560
5561        let result2 = evaluate_filter_expression(
5562            "(x = :yes AND y = :yes) OR z = :yes",
5563            &item2,
5564            &HashMap::new(),
5565            &expr_values,
5566        );
5567        assert!(!result2, "should not match because nothing is true");
5568    }
5569
5570    #[test]
5571    fn test_project_item_nested_path() {
5572        // Item with a list attribute containing maps
5573        let mut item = HashMap::new();
5574        item.insert("pk".to_string(), json!({"S": "key1"}));
5575        item.insert(
5576            "data".to_string(),
5577            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
5578        );
5579
5580        let body = json!({
5581            "ProjectionExpression": "data[0].name"
5582        });
5583
5584        let projected = project_item(&item, &body);
5585        // Should contain data[0].name = "Alice", not the entire data[0] element
5586        let name = projected
5587            .get("data")
5588            .and_then(|v| v.get("L"))
5589            .and_then(|v| v.get(0))
5590            .and_then(|v| v.get("M"))
5591            .and_then(|v| v.get("name"))
5592            .and_then(|v| v.get("S"))
5593            .and_then(|v| v.as_str());
5594        assert_eq!(name, Some("Alice"));
5595
5596        // Should NOT contain the "age" field
5597        let age = projected
5598            .get("data")
5599            .and_then(|v| v.get("L"))
5600            .and_then(|v| v.get(0))
5601            .and_then(|v| v.get("M"))
5602            .and_then(|v| v.get("age"));
5603        assert!(age.is_none(), "age should not be present in projection");
5604    }
5605
5606    #[test]
5607    fn test_resolve_nested_path_map() {
5608        let mut item = HashMap::new();
5609        item.insert(
5610            "info".to_string(),
5611            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
5612        );
5613
5614        let result = resolve_nested_path(&item, "info.address.city");
5615        assert_eq!(result, Some(json!({"S": "NYC"})));
5616    }
5617
5618    #[test]
5619    fn test_resolve_nested_path_list_then_map() {
5620        let mut item = HashMap::new();
5621        item.insert(
5622            "items".to_string(),
5623            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
5624        );
5625
5626        let result = resolve_nested_path(&item, "items[0].sku");
5627        assert_eq!(result, Some(json!({"S": "ABC"})));
5628    }
5629
5630    // -- Integration-style tests using DynamoDbService --
5631
5632    use crate::state::SharedDynamoDbState;
5633    use parking_lot::RwLock;
5634    use std::sync::Arc;
5635
5636    fn make_service() -> DynamoDbService {
5637        let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
5638            "123456789012",
5639            "us-east-1",
5640        )));
5641        DynamoDbService::new(state)
5642    }
5643
5644    fn make_request(action: &str, body: Value) -> AwsRequest {
5645        AwsRequest {
5646            service: "dynamodb".to_string(),
5647            action: action.to_string(),
5648            region: "us-east-1".to_string(),
5649            account_id: "123456789012".to_string(),
5650            request_id: "test-id".to_string(),
5651            headers: http::HeaderMap::new(),
5652            query_params: HashMap::new(),
5653            body: serde_json::to_vec(&body).unwrap().into(),
5654            path_segments: vec![],
5655            raw_path: "/".to_string(),
5656            raw_query: String::new(),
5657            method: http::Method::POST,
5658            is_query_protocol: false,
5659            access_key_id: None,
5660        }
5661    }
5662
5663    fn create_test_table(svc: &DynamoDbService) {
5664        let req = make_request(
5665            "CreateTable",
5666            json!({
5667                "TableName": "test-table",
5668                "KeySchema": [
5669                    { "AttributeName": "pk", "KeyType": "HASH" }
5670                ],
5671                "AttributeDefinitions": [
5672                    { "AttributeName": "pk", "AttributeType": "S" }
5673                ],
5674                "BillingMode": "PAY_PER_REQUEST"
5675            }),
5676        );
5677        svc.create_table(&req).unwrap();
5678    }
5679
5680    #[test]
5681    fn delete_item_return_values_all_old() {
5682        let svc = make_service();
5683        create_test_table(&svc);
5684
5685        // Put an item
5686        let req = make_request(
5687            "PutItem",
5688            json!({
5689                "TableName": "test-table",
5690                "Item": {
5691                    "pk": { "S": "key1" },
5692                    "name": { "S": "Alice" },
5693                    "age": { "N": "30" }
5694                }
5695            }),
5696        );
5697        svc.put_item(&req).unwrap();
5698
5699        // Delete with ReturnValues=ALL_OLD
5700        let req = make_request(
5701            "DeleteItem",
5702            json!({
5703                "TableName": "test-table",
5704                "Key": { "pk": { "S": "key1" } },
5705                "ReturnValues": "ALL_OLD"
5706            }),
5707        );
5708        let resp = svc.delete_item(&req).unwrap();
5709        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5710
5711        // Verify the old item is returned
5712        let attrs = &body["Attributes"];
5713        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
5714        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
5715        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
5716
5717        // Verify the item is actually deleted
5718        let req = make_request(
5719            "GetItem",
5720            json!({
5721                "TableName": "test-table",
5722                "Key": { "pk": { "S": "key1" } }
5723            }),
5724        );
5725        let resp = svc.get_item(&req).unwrap();
5726        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5727        assert!(body.get("Item").is_none(), "item should be deleted");
5728    }
5729
5730    #[test]
5731    fn transact_get_items_returns_existing_and_missing() {
5732        let svc = make_service();
5733        create_test_table(&svc);
5734
5735        // Put one item
5736        let req = make_request(
5737            "PutItem",
5738            json!({
5739                "TableName": "test-table",
5740                "Item": {
5741                    "pk": { "S": "exists" },
5742                    "val": { "S": "hello" }
5743                }
5744            }),
5745        );
5746        svc.put_item(&req).unwrap();
5747
5748        let req = make_request(
5749            "TransactGetItems",
5750            json!({
5751                "TransactItems": [
5752                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
5753                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
5754                ]
5755            }),
5756        );
5757        let resp = svc.transact_get_items(&req).unwrap();
5758        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5759        let responses = body["Responses"].as_array().unwrap();
5760        assert_eq!(responses.len(), 2);
5761        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
5762        assert!(responses[1].get("Item").is_none());
5763    }
5764
5765    #[test]
5766    fn transact_write_items_put_and_delete() {
5767        let svc = make_service();
5768        create_test_table(&svc);
5769
5770        // Put initial item
5771        let req = make_request(
5772            "PutItem",
5773            json!({
5774                "TableName": "test-table",
5775                "Item": {
5776                    "pk": { "S": "to-delete" },
5777                    "val": { "S": "bye" }
5778                }
5779            }),
5780        );
5781        svc.put_item(&req).unwrap();
5782
5783        // TransactWrite: put new + delete existing
5784        let req = make_request(
5785            "TransactWriteItems",
5786            json!({
5787                "TransactItems": [
5788                    {
5789                        "Put": {
5790                            "TableName": "test-table",
5791                            "Item": {
5792                                "pk": { "S": "new-item" },
5793                                "val": { "S": "hi" }
5794                            }
5795                        }
5796                    },
5797                    {
5798                        "Delete": {
5799                            "TableName": "test-table",
5800                            "Key": { "pk": { "S": "to-delete" } }
5801                        }
5802                    }
5803                ]
5804            }),
5805        );
5806        let resp = svc.transact_write_items(&req).unwrap();
5807        assert_eq!(resp.status, StatusCode::OK);
5808
5809        // Verify new item exists
5810        let req = make_request(
5811            "GetItem",
5812            json!({
5813                "TableName": "test-table",
5814                "Key": { "pk": { "S": "new-item" } }
5815            }),
5816        );
5817        let resp = svc.get_item(&req).unwrap();
5818        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5819        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
5820
5821        // Verify deleted item is gone
5822        let req = make_request(
5823            "GetItem",
5824            json!({
5825                "TableName": "test-table",
5826                "Key": { "pk": { "S": "to-delete" } }
5827            }),
5828        );
5829        let resp = svc.get_item(&req).unwrap();
5830        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5831        assert!(body.get("Item").is_none());
5832    }
5833
5834    #[test]
5835    fn transact_write_items_condition_check_failure() {
5836        let svc = make_service();
5837        create_test_table(&svc);
5838
5839        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
5840        let req = make_request(
5841            "TransactWriteItems",
5842            json!({
5843                "TransactItems": [
5844                    {
5845                        "ConditionCheck": {
5846                            "TableName": "test-table",
5847                            "Key": { "pk": { "S": "nonexistent" } },
5848                            "ConditionExpression": "attribute_exists(pk)"
5849                        }
5850                    }
5851                ]
5852            }),
5853        );
5854        let resp = svc.transact_write_items(&req).unwrap();
5855        // Should be a 400 error response
5856        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
5857        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5858        assert_eq!(
5859            body["__type"].as_str().unwrap(),
5860            "TransactionCanceledException"
5861        );
5862        assert!(body["CancellationReasons"].as_array().is_some());
5863    }
5864
5865    #[test]
5866    fn update_and_describe_time_to_live() {
5867        let svc = make_service();
5868        create_test_table(&svc);
5869
5870        // Enable TTL
5871        let req = make_request(
5872            "UpdateTimeToLive",
5873            json!({
5874                "TableName": "test-table",
5875                "TimeToLiveSpecification": {
5876                    "AttributeName": "ttl",
5877                    "Enabled": true
5878                }
5879            }),
5880        );
5881        let resp = svc.update_time_to_live(&req).unwrap();
5882        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5883        assert_eq!(
5884            body["TimeToLiveSpecification"]["AttributeName"]
5885                .as_str()
5886                .unwrap(),
5887            "ttl"
5888        );
5889        assert!(body["TimeToLiveSpecification"]["Enabled"]
5890            .as_bool()
5891            .unwrap());
5892
5893        // Describe TTL
5894        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5895        let resp = svc.describe_time_to_live(&req).unwrap();
5896        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5897        assert_eq!(
5898            body["TimeToLiveDescription"]["TimeToLiveStatus"]
5899                .as_str()
5900                .unwrap(),
5901            "ENABLED"
5902        );
5903        assert_eq!(
5904            body["TimeToLiveDescription"]["AttributeName"]
5905                .as_str()
5906                .unwrap(),
5907            "ttl"
5908        );
5909
5910        // Disable TTL
5911        let req = make_request(
5912            "UpdateTimeToLive",
5913            json!({
5914                "TableName": "test-table",
5915                "TimeToLiveSpecification": {
5916                    "AttributeName": "ttl",
5917                    "Enabled": false
5918                }
5919            }),
5920        );
5921        svc.update_time_to_live(&req).unwrap();
5922
5923        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5924        let resp = svc.describe_time_to_live(&req).unwrap();
5925        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5926        assert_eq!(
5927            body["TimeToLiveDescription"]["TimeToLiveStatus"]
5928                .as_str()
5929                .unwrap(),
5930            "DISABLED"
5931        );
5932    }
5933
5934    #[test]
5935    fn resource_policy_lifecycle() {
5936        let svc = make_service();
5937        create_test_table(&svc);
5938
5939        let table_arn = {
5940            let state = svc.state.read();
5941            state.tables.get("test-table").unwrap().arn.clone()
5942        };
5943
5944        // Put policy
5945        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
5946        let req = make_request(
5947            "PutResourcePolicy",
5948            json!({
5949                "ResourceArn": table_arn,
5950                "Policy": policy_doc
5951            }),
5952        );
5953        let resp = svc.put_resource_policy(&req).unwrap();
5954        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5955        assert!(body["RevisionId"].as_str().is_some());
5956
5957        // Get policy
5958        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5959        let resp = svc.get_resource_policy(&req).unwrap();
5960        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5961        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
5962
5963        // Delete policy
5964        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
5965        svc.delete_resource_policy(&req).unwrap();
5966
5967        // Get should return null now
5968        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5969        let resp = svc.get_resource_policy(&req).unwrap();
5970        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5971        assert!(body["Policy"].is_null());
5972    }
5973
5974    #[test]
5975    fn describe_endpoints() {
5976        let svc = make_service();
5977        let req = make_request("DescribeEndpoints", json!({}));
5978        let resp = svc.describe_endpoints(&req).unwrap();
5979        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5980        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
5981    }
5982
5983    #[test]
5984    fn describe_limits() {
5985        let svc = make_service();
5986        let req = make_request("DescribeLimits", json!({}));
5987        let resp = svc.describe_limits(&req).unwrap();
5988        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5989        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
5990    }
5991
5992    #[test]
5993    fn backup_lifecycle() {
5994        let svc = make_service();
5995        create_test_table(&svc);
5996
5997        // Create backup
5998        let req = make_request(
5999            "CreateBackup",
6000            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
6001        );
6002        let resp = svc.create_backup(&req).unwrap();
6003        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6004        let backup_arn = body["BackupDetails"]["BackupArn"]
6005            .as_str()
6006            .unwrap()
6007            .to_string();
6008        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
6009
6010        // Describe backup
6011        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
6012        let resp = svc.describe_backup(&req).unwrap();
6013        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6014        assert_eq!(
6015            body["BackupDescription"]["BackupDetails"]["BackupName"],
6016            "my-backup"
6017        );
6018
6019        // List backups
6020        let req = make_request("ListBackups", json!({}));
6021        let resp = svc.list_backups(&req).unwrap();
6022        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6023        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
6024
6025        // Restore from backup
6026        let req = make_request(
6027            "RestoreTableFromBackup",
6028            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
6029        );
6030        svc.restore_table_from_backup(&req).unwrap();
6031
6032        // Verify restored table exists
6033        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
6034        let resp = svc.describe_table(&req).unwrap();
6035        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6036        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6037
6038        // Delete backup
6039        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
6040        svc.delete_backup(&req).unwrap();
6041
6042        // List should be empty
6043        let req = make_request("ListBackups", json!({}));
6044        let resp = svc.list_backups(&req).unwrap();
6045        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6046        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
6047    }
6048
6049    #[test]
6050    fn continuous_backups() {
6051        let svc = make_service();
6052        create_test_table(&svc);
6053
6054        // Initially disabled
6055        let req = make_request(
6056            "DescribeContinuousBackups",
6057            json!({ "TableName": "test-table" }),
6058        );
6059        let resp = svc.describe_continuous_backups(&req).unwrap();
6060        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6061        assert_eq!(
6062            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
6063                ["PointInTimeRecoveryStatus"],
6064            "DISABLED"
6065        );
6066
6067        // Enable
6068        let req = make_request(
6069            "UpdateContinuousBackups",
6070            json!({
6071                "TableName": "test-table",
6072                "PointInTimeRecoverySpecification": {
6073                    "PointInTimeRecoveryEnabled": true
6074                }
6075            }),
6076        );
6077        svc.update_continuous_backups(&req).unwrap();
6078
6079        // Verify
6080        let req = make_request(
6081            "DescribeContinuousBackups",
6082            json!({ "TableName": "test-table" }),
6083        );
6084        let resp = svc.describe_continuous_backups(&req).unwrap();
6085        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6086        assert_eq!(
6087            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
6088                ["PointInTimeRecoveryStatus"],
6089            "ENABLED"
6090        );
6091    }
6092
6093    #[test]
6094    fn restore_table_to_point_in_time() {
6095        let svc = make_service();
6096        create_test_table(&svc);
6097
6098        let req = make_request(
6099            "RestoreTableToPointInTime",
6100            json!({
6101                "SourceTableName": "test-table",
6102                "TargetTableName": "pitr-restored"
6103            }),
6104        );
6105        svc.restore_table_to_point_in_time(&req).unwrap();
6106
6107        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
6108        let resp = svc.describe_table(&req).unwrap();
6109        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6110        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6111    }
6112
6113    #[test]
6114    fn global_table_lifecycle() {
6115        let svc = make_service();
6116
6117        // Create global table
6118        let req = make_request(
6119            "CreateGlobalTable",
6120            json!({
6121                "GlobalTableName": "my-global",
6122                "ReplicationGroup": [
6123                    { "RegionName": "us-east-1" },
6124                    { "RegionName": "eu-west-1" }
6125                ]
6126            }),
6127        );
6128        let resp = svc.create_global_table(&req).unwrap();
6129        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6130        assert_eq!(
6131            body["GlobalTableDescription"]["GlobalTableStatus"],
6132            "ACTIVE"
6133        );
6134
6135        // Describe
6136        let req = make_request(
6137            "DescribeGlobalTable",
6138            json!({ "GlobalTableName": "my-global" }),
6139        );
6140        let resp = svc.describe_global_table(&req).unwrap();
6141        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6142        assert_eq!(
6143            body["GlobalTableDescription"]["ReplicationGroup"]
6144                .as_array()
6145                .unwrap()
6146                .len(),
6147            2
6148        );
6149
6150        // List
6151        let req = make_request("ListGlobalTables", json!({}));
6152        let resp = svc.list_global_tables(&req).unwrap();
6153        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6154        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
6155
6156        // Update - add a region
6157        let req = make_request(
6158            "UpdateGlobalTable",
6159            json!({
6160                "GlobalTableName": "my-global",
6161                "ReplicaUpdates": [
6162                    { "Create": { "RegionName": "ap-southeast-1" } }
6163                ]
6164            }),
6165        );
6166        let resp = svc.update_global_table(&req).unwrap();
6167        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6168        assert_eq!(
6169            body["GlobalTableDescription"]["ReplicationGroup"]
6170                .as_array()
6171                .unwrap()
6172                .len(),
6173            3
6174        );
6175
6176        // Describe settings
6177        let req = make_request(
6178            "DescribeGlobalTableSettings",
6179            json!({ "GlobalTableName": "my-global" }),
6180        );
6181        let resp = svc.describe_global_table_settings(&req).unwrap();
6182        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6183        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
6184
6185        // Update settings (no-op, just verify no error)
6186        let req = make_request(
6187            "UpdateGlobalTableSettings",
6188            json!({ "GlobalTableName": "my-global" }),
6189        );
6190        svc.update_global_table_settings(&req).unwrap();
6191    }
6192
6193    #[test]
6194    fn table_replica_auto_scaling() {
6195        let svc = make_service();
6196        create_test_table(&svc);
6197
6198        let req = make_request(
6199            "DescribeTableReplicaAutoScaling",
6200            json!({ "TableName": "test-table" }),
6201        );
6202        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
6203        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6204        assert_eq!(
6205            body["TableAutoScalingDescription"]["TableName"],
6206            "test-table"
6207        );
6208
6209        let req = make_request(
6210            "UpdateTableReplicaAutoScaling",
6211            json!({ "TableName": "test-table" }),
6212        );
6213        svc.update_table_replica_auto_scaling(&req).unwrap();
6214    }
6215
6216    #[test]
6217    fn kinesis_streaming_lifecycle() {
6218        let svc = make_service();
6219        create_test_table(&svc);
6220
6221        // Enable
6222        let req = make_request(
6223            "EnableKinesisStreamingDestination",
6224            json!({
6225                "TableName": "test-table",
6226                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
6227            }),
6228        );
6229        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
6230        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6231        assert_eq!(body["DestinationStatus"], "ACTIVE");
6232
6233        // Describe
6234        let req = make_request(
6235            "DescribeKinesisStreamingDestination",
6236            json!({ "TableName": "test-table" }),
6237        );
6238        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
6239        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6240        assert_eq!(
6241            body["KinesisDataStreamDestinations"]
6242                .as_array()
6243                .unwrap()
6244                .len(),
6245            1
6246        );
6247
6248        // Update
6249        let req = make_request(
6250            "UpdateKinesisStreamingDestination",
6251            json!({
6252                "TableName": "test-table",
6253                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
6254                "UpdateKinesisStreamingConfiguration": {
6255                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
6256                }
6257            }),
6258        );
6259        svc.update_kinesis_streaming_destination(&req).unwrap();
6260
6261        // Disable
6262        let req = make_request(
6263            "DisableKinesisStreamingDestination",
6264            json!({
6265                "TableName": "test-table",
6266                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
6267            }),
6268        );
6269        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
6270        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6271        assert_eq!(body["DestinationStatus"], "DISABLED");
6272    }
6273
6274    #[test]
6275    fn contributor_insights_lifecycle() {
6276        let svc = make_service();
6277        create_test_table(&svc);
6278
6279        // Initially disabled
6280        let req = make_request(
6281            "DescribeContributorInsights",
6282            json!({ "TableName": "test-table" }),
6283        );
6284        let resp = svc.describe_contributor_insights(&req).unwrap();
6285        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6286        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6287
6288        // Enable
6289        let req = make_request(
6290            "UpdateContributorInsights",
6291            json!({
6292                "TableName": "test-table",
6293                "ContributorInsightsAction": "ENABLE"
6294            }),
6295        );
6296        let resp = svc.update_contributor_insights(&req).unwrap();
6297        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6298        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6299
6300        // List
6301        let req = make_request("ListContributorInsights", json!({}));
6302        let resp = svc.list_contributor_insights(&req).unwrap();
6303        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6304        assert_eq!(
6305            body["ContributorInsightsSummaries"]
6306                .as_array()
6307                .unwrap()
6308                .len(),
6309            1
6310        );
6311    }
6312
6313    #[test]
6314    fn export_lifecycle() {
6315        let svc = make_service();
6316        create_test_table(&svc);
6317
6318        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
6319
6320        // Export
6321        let req = make_request(
6322            "ExportTableToPointInTime",
6323            json!({
6324                "TableArn": table_arn,
6325                "S3Bucket": "my-bucket"
6326            }),
6327        );
6328        let resp = svc.export_table_to_point_in_time(&req).unwrap();
6329        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6330        let export_arn = body["ExportDescription"]["ExportArn"]
6331            .as_str()
6332            .unwrap()
6333            .to_string();
6334        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
6335
6336        // Describe
6337        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
6338        let resp = svc.describe_export(&req).unwrap();
6339        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6340        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
6341
6342        // List
6343        let req = make_request("ListExports", json!({}));
6344        let resp = svc.list_exports(&req).unwrap();
6345        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6346        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
6347    }
6348
6349    #[test]
6350    fn import_lifecycle() {
6351        let svc = make_service();
6352
6353        let req = make_request(
6354            "ImportTable",
6355            json!({
6356                "InputFormat": "DYNAMODB_JSON",
6357                "S3BucketSource": { "S3Bucket": "import-bucket" },
6358                "TableCreationParameters": {
6359                    "TableName": "imported-table",
6360                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
6361                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
6362                }
6363            }),
6364        );
6365        let resp = svc.import_table(&req).unwrap();
6366        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6367        let import_arn = body["ImportTableDescription"]["ImportArn"]
6368            .as_str()
6369            .unwrap()
6370            .to_string();
6371        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6372
6373        // Describe import
6374        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
6375        let resp = svc.describe_import(&req).unwrap();
6376        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6377        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6378
6379        // List imports
6380        let req = make_request("ListImports", json!({}));
6381        let resp = svc.list_imports(&req).unwrap();
6382        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6383        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
6384
6385        // Verify the table was created
6386        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
6387        let resp = svc.describe_table(&req).unwrap();
6388        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6389        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6390    }
6391
6392    #[test]
6393    fn backup_restore_preserves_items() {
6394        let svc = make_service();
6395        create_test_table(&svc);
6396
6397        // Put 3 items
6398        for i in 1..=3 {
6399            let req = make_request(
6400                "PutItem",
6401                json!({
6402                    "TableName": "test-table",
6403                    "Item": {
6404                        "pk": { "S": format!("key{i}") },
6405                        "data": { "S": format!("value{i}") }
6406                    }
6407                }),
6408            );
6409            svc.put_item(&req).unwrap();
6410        }
6411
6412        // Create backup
6413        let req = make_request(
6414            "CreateBackup",
6415            json!({
6416                "TableName": "test-table",
6417                "BackupName": "my-backup"
6418            }),
6419        );
6420        let resp = svc.create_backup(&req).unwrap();
6421        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6422        let backup_arn = body["BackupDetails"]["BackupArn"]
6423            .as_str()
6424            .unwrap()
6425            .to_string();
6426
6427        // Delete all items from the original table
6428        for i in 1..=3 {
6429            let req = make_request(
6430                "DeleteItem",
6431                json!({
6432                    "TableName": "test-table",
6433                    "Key": { "pk": { "S": format!("key{i}") } }
6434                }),
6435            );
6436            svc.delete_item(&req).unwrap();
6437        }
6438
6439        // Verify original table is empty
6440        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6441        let resp = svc.scan(&req).unwrap();
6442        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6443        assert_eq!(body["Count"], 0);
6444
6445        // Restore from backup
6446        let req = make_request(
6447            "RestoreTableFromBackup",
6448            json!({
6449                "BackupArn": backup_arn,
6450                "TargetTableName": "restored-table"
6451            }),
6452        );
6453        svc.restore_table_from_backup(&req).unwrap();
6454
6455        // Scan restored table — should have 3 items
6456        let req = make_request("Scan", json!({ "TableName": "restored-table" }));
6457        let resp = svc.scan(&req).unwrap();
6458        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6459        assert_eq!(body["Count"], 3);
6460        assert_eq!(body["Items"].as_array().unwrap().len(), 3);
6461    }
6462
6463    #[test]
6464    fn global_table_replicates_writes() {
6465        let svc = make_service();
6466        create_test_table(&svc);
6467
6468        // Create global table with replicas
6469        let req = make_request(
6470            "CreateGlobalTable",
6471            json!({
6472                "GlobalTableName": "test-table",
6473                "ReplicationGroup": [
6474                    { "RegionName": "us-east-1" },
6475                    { "RegionName": "eu-west-1" }
6476                ]
6477            }),
6478        );
6479        let resp = svc.create_global_table(&req).unwrap();
6480        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6481        assert_eq!(
6482            body["GlobalTableDescription"]["GlobalTableStatus"],
6483            "ACTIVE"
6484        );
6485
6486        // Put an item
6487        let req = make_request(
6488            "PutItem",
6489            json!({
6490                "TableName": "test-table",
6491                "Item": {
6492                    "pk": { "S": "replicated-key" },
6493                    "data": { "S": "replicated-value" }
6494                }
6495            }),
6496        );
6497        svc.put_item(&req).unwrap();
6498
6499        // Verify the item is readable (since all replicas share the same table)
6500        let req = make_request(
6501            "GetItem",
6502            json!({
6503                "TableName": "test-table",
6504                "Key": { "pk": { "S": "replicated-key" } }
6505            }),
6506        );
6507        let resp = svc.get_item(&req).unwrap();
6508        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6509        assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
6510        assert_eq!(body["Item"]["data"]["S"], "replicated-value");
6511    }
6512
6513    #[test]
6514    fn contributor_insights_tracks_access() {
6515        let svc = make_service();
6516        create_test_table(&svc);
6517
6518        // Enable contributor insights
6519        let req = make_request(
6520            "UpdateContributorInsights",
6521            json!({
6522                "TableName": "test-table",
6523                "ContributorInsightsAction": "ENABLE"
6524            }),
6525        );
6526        svc.update_contributor_insights(&req).unwrap();
6527
6528        // Put items with different partition keys
6529        for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
6530            let req = make_request(
6531                "PutItem",
6532                json!({
6533                    "TableName": "test-table",
6534                    "Item": {
6535                        "pk": { "S": key },
6536                        "data": { "S": "value" }
6537                    }
6538                }),
6539            );
6540            svc.put_item(&req).unwrap();
6541        }
6542
6543        // Get items (to also track read access)
6544        for _ in 0..3 {
6545            let req = make_request(
6546                "GetItem",
6547                json!({
6548                    "TableName": "test-table",
6549                    "Key": { "pk": { "S": "alpha" } }
6550                }),
6551            );
6552            svc.get_item(&req).unwrap();
6553        }
6554
6555        // Describe contributor insights — should show top contributors
6556        let req = make_request(
6557            "DescribeContributorInsights",
6558            json!({ "TableName": "test-table" }),
6559        );
6560        let resp = svc.describe_contributor_insights(&req).unwrap();
6561        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6562        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6563
6564        let contributors = body["TopContributors"].as_array().unwrap();
6565        assert!(
6566            !contributors.is_empty(),
6567            "TopContributors should not be empty"
6568        );
6569
6570        // alpha was accessed 3 (put) + 3 (get) = 6 times, beta 2 times
6571        // alpha should be the top contributor
6572        let top = &contributors[0];
6573        assert!(top["Count"].as_u64().unwrap() > 0);
6574
6575        // Verify the rule list is populated
6576        let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
6577        assert!(!rules.is_empty());
6578    }
6579
6580    #[test]
6581    fn contributor_insights_not_tracked_when_disabled() {
6582        let svc = make_service();
6583        create_test_table(&svc);
6584
6585        // Put items without enabling insights
6586        let req = make_request(
6587            "PutItem",
6588            json!({
6589                "TableName": "test-table",
6590                "Item": {
6591                    "pk": { "S": "key1" },
6592                    "data": { "S": "value" }
6593                }
6594            }),
6595        );
6596        svc.put_item(&req).unwrap();
6597
6598        // Describe — should show empty contributors
6599        let req = make_request(
6600            "DescribeContributorInsights",
6601            json!({ "TableName": "test-table" }),
6602        );
6603        let resp = svc.describe_contributor_insights(&req).unwrap();
6604        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6605        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6606
6607        let contributors = body["TopContributors"].as_array().unwrap();
6608        assert!(contributors.is_empty());
6609    }
6610
6611    #[test]
6612    fn contributor_insights_disabled_table_no_counters_after_scan() {
6613        let svc = make_service();
6614        create_test_table(&svc);
6615
6616        // Put items
6617        for key in &["alpha", "beta"] {
6618            let req = make_request(
6619                "PutItem",
6620                json!({
6621                    "TableName": "test-table",
6622                    "Item": { "pk": { "S": key } }
6623                }),
6624            );
6625            svc.put_item(&req).unwrap();
6626        }
6627
6628        // Enable insights, then scan, then disable, then check counters are cleared
6629        let req = make_request(
6630            "UpdateContributorInsights",
6631            json!({
6632                "TableName": "test-table",
6633                "ContributorInsightsAction": "ENABLE"
6634            }),
6635        );
6636        svc.update_contributor_insights(&req).unwrap();
6637
6638        // Scan to trigger counter collection
6639        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6640        svc.scan(&req).unwrap();
6641
6642        // Verify counters were collected
6643        let req = make_request(
6644            "DescribeContributorInsights",
6645            json!({ "TableName": "test-table" }),
6646        );
6647        let resp = svc.describe_contributor_insights(&req).unwrap();
6648        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6649        let contributors = body["TopContributors"].as_array().unwrap();
6650        assert!(
6651            !contributors.is_empty(),
6652            "counters should be non-empty while enabled"
6653        );
6654
6655        // Disable insights (this clears counters)
6656        let req = make_request(
6657            "UpdateContributorInsights",
6658            json!({
6659                "TableName": "test-table",
6660                "ContributorInsightsAction": "DISABLE"
6661            }),
6662        );
6663        svc.update_contributor_insights(&req).unwrap();
6664
6665        // Scan again -- should NOT accumulate counters since insights is disabled
6666        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6667        svc.scan(&req).unwrap();
6668
6669        // Verify counters are still empty
6670        let req = make_request(
6671            "DescribeContributorInsights",
6672            json!({ "TableName": "test-table" }),
6673        );
6674        let resp = svc.describe_contributor_insights(&req).unwrap();
6675        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6676        let contributors = body["TopContributors"].as_array().unwrap();
6677        assert!(
6678            contributors.is_empty(),
6679            "counters should be empty after disabling insights"
6680        );
6681    }
6682
6683    #[test]
6684    fn scan_pagination_with_limit() {
6685        let svc = make_service();
6686        create_test_table(&svc);
6687
6688        // Insert 5 items
6689        for i in 0..5 {
6690            let req = make_request(
6691                "PutItem",
6692                json!({
6693                    "TableName": "test-table",
6694                    "Item": {
6695                        "pk": { "S": format!("item{i}") },
6696                        "data": { "S": format!("value{i}") }
6697                    }
6698                }),
6699            );
6700            svc.put_item(&req).unwrap();
6701        }
6702
6703        // Scan with limit=2
6704        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
6705        let resp = svc.scan(&req).unwrap();
6706        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6707        assert_eq!(body["Count"], 2);
6708        assert!(
6709            body["LastEvaluatedKey"].is_object(),
6710            "should have LastEvaluatedKey when limit < total items"
6711        );
6712        assert!(body["LastEvaluatedKey"]["pk"].is_object());
6713
6714        // Page through all items
6715        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6716        let mut lek = body["LastEvaluatedKey"].clone();
6717
6718        while lek.is_object() {
6719            let req = make_request(
6720                "Scan",
6721                json!({
6722                    "TableName": "test-table",
6723                    "Limit": 2,
6724                    "ExclusiveStartKey": lek
6725                }),
6726            );
6727            let resp = svc.scan(&req).unwrap();
6728            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6729            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6730            lek = body["LastEvaluatedKey"].clone();
6731        }
6732
6733        assert_eq!(
6734            all_items.len(),
6735            5,
6736            "should retrieve all 5 items via pagination"
6737        );
6738    }
6739
6740    #[test]
6741    fn scan_no_pagination_when_all_fit() {
6742        let svc = make_service();
6743        create_test_table(&svc);
6744
6745        for i in 0..3 {
6746            let req = make_request(
6747                "PutItem",
6748                json!({
6749                    "TableName": "test-table",
6750                    "Item": {
6751                        "pk": { "S": format!("item{i}") }
6752                    }
6753                }),
6754            );
6755            svc.put_item(&req).unwrap();
6756        }
6757
6758        // Scan with limit > item count
6759        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
6760        let resp = svc.scan(&req).unwrap();
6761        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6762        assert_eq!(body["Count"], 3);
6763        assert!(
6764            body["LastEvaluatedKey"].is_null(),
6765            "should not have LastEvaluatedKey when all items fit"
6766        );
6767
6768        // Scan without limit
6769        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6770        let resp = svc.scan(&req).unwrap();
6771        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6772        assert_eq!(body["Count"], 3);
6773        assert!(body["LastEvaluatedKey"].is_null());
6774    }
6775
6776    fn create_composite_table(svc: &DynamoDbService) {
6777        let req = make_request(
6778            "CreateTable",
6779            json!({
6780                "TableName": "composite-table",
6781                "KeySchema": [
6782                    { "AttributeName": "pk", "KeyType": "HASH" },
6783                    { "AttributeName": "sk", "KeyType": "RANGE" }
6784                ],
6785                "AttributeDefinitions": [
6786                    { "AttributeName": "pk", "AttributeType": "S" },
6787                    { "AttributeName": "sk", "AttributeType": "S" }
6788                ],
6789                "BillingMode": "PAY_PER_REQUEST"
6790            }),
6791        );
6792        svc.create_table(&req).unwrap();
6793    }
6794
6795    #[test]
6796    fn query_pagination_with_composite_key() {
6797        let svc = make_service();
6798        create_composite_table(&svc);
6799
6800        // Insert 5 items under the same partition key
6801        for i in 0..5 {
6802            let req = make_request(
6803                "PutItem",
6804                json!({
6805                    "TableName": "composite-table",
6806                    "Item": {
6807                        "pk": { "S": "user1" },
6808                        "sk": { "S": format!("item{i:03}") },
6809                        "data": { "S": format!("value{i}") }
6810                    }
6811                }),
6812            );
6813            svc.put_item(&req).unwrap();
6814        }
6815
6816        // Query with limit=2
6817        let req = make_request(
6818            "Query",
6819            json!({
6820                "TableName": "composite-table",
6821                "KeyConditionExpression": "pk = :pk",
6822                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6823                "Limit": 2
6824            }),
6825        );
6826        let resp = svc.query(&req).unwrap();
6827        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6828        assert_eq!(body["Count"], 2);
6829        assert!(body["LastEvaluatedKey"].is_object());
6830        assert!(body["LastEvaluatedKey"]["pk"].is_object());
6831        assert!(body["LastEvaluatedKey"]["sk"].is_object());
6832
6833        // Page through all items
6834        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6835        let mut lek = body["LastEvaluatedKey"].clone();
6836
6837        while lek.is_object() {
6838            let req = make_request(
6839                "Query",
6840                json!({
6841                    "TableName": "composite-table",
6842                    "KeyConditionExpression": "pk = :pk",
6843                    "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6844                    "Limit": 2,
6845                    "ExclusiveStartKey": lek
6846                }),
6847            );
6848            let resp = svc.query(&req).unwrap();
6849            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6850            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6851            lek = body["LastEvaluatedKey"].clone();
6852        }
6853
6854        assert_eq!(
6855            all_items.len(),
6856            5,
6857            "should retrieve all 5 items via pagination"
6858        );
6859
6860        // Verify items came back sorted by sort key
6861        let sks: Vec<String> = all_items
6862            .iter()
6863            .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
6864            .collect();
6865        let mut sorted = sks.clone();
6866        sorted.sort();
6867        assert_eq!(sks, sorted, "items should be sorted by sort key");
6868    }
6869
6870    #[test]
6871    fn query_no_pagination_when_all_fit() {
6872        let svc = make_service();
6873        create_composite_table(&svc);
6874
6875        for i in 0..2 {
6876            let req = make_request(
6877                "PutItem",
6878                json!({
6879                    "TableName": "composite-table",
6880                    "Item": {
6881                        "pk": { "S": "user1" },
6882                        "sk": { "S": format!("item{i}") }
6883                    }
6884                }),
6885            );
6886            svc.put_item(&req).unwrap();
6887        }
6888
6889        let req = make_request(
6890            "Query",
6891            json!({
6892                "TableName": "composite-table",
6893                "KeyConditionExpression": "pk = :pk",
6894                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6895                "Limit": 10
6896            }),
6897        );
6898        let resp = svc.query(&req).unwrap();
6899        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6900        assert_eq!(body["Count"], 2);
6901        assert!(
6902            body["LastEvaluatedKey"].is_null(),
6903            "should not have LastEvaluatedKey when all items fit"
6904        );
6905    }
6906
6907    fn create_gsi_table(svc: &DynamoDbService) {
6908        let req = make_request(
6909            "CreateTable",
6910            json!({
6911                "TableName": "gsi-table",
6912                "KeySchema": [
6913                    { "AttributeName": "pk", "KeyType": "HASH" }
6914                ],
6915                "AttributeDefinitions": [
6916                    { "AttributeName": "pk", "AttributeType": "S" },
6917                    { "AttributeName": "gsi_pk", "AttributeType": "S" },
6918                    { "AttributeName": "gsi_sk", "AttributeType": "S" }
6919                ],
6920                "BillingMode": "PAY_PER_REQUEST",
6921                "GlobalSecondaryIndexes": [
6922                    {
6923                        "IndexName": "gsi-index",
6924                        "KeySchema": [
6925                            { "AttributeName": "gsi_pk", "KeyType": "HASH" },
6926                            { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
6927                        ],
6928                        "Projection": { "ProjectionType": "ALL" }
6929                    }
6930                ]
6931            }),
6932        );
6933        svc.create_table(&req).unwrap();
6934    }
6935
6936    #[test]
6937    fn gsi_query_last_evaluated_key_includes_table_pk() {
6938        let svc = make_service();
6939        create_gsi_table(&svc);
6940
6941        // Insert 3 items with the SAME GSI key but different table PKs
6942        for i in 0..3 {
6943            let req = make_request(
6944                "PutItem",
6945                json!({
6946                    "TableName": "gsi-table",
6947                    "Item": {
6948                        "pk": { "S": format!("item{i}") },
6949                        "gsi_pk": { "S": "shared" },
6950                        "gsi_sk": { "S": "sort" }
6951                    }
6952                }),
6953            );
6954            svc.put_item(&req).unwrap();
6955        }
6956
6957        // Query GSI with Limit=1 to trigger pagination
6958        let req = make_request(
6959            "Query",
6960            json!({
6961                "TableName": "gsi-table",
6962                "IndexName": "gsi-index",
6963                "KeyConditionExpression": "gsi_pk = :v",
6964                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6965                "Limit": 1
6966            }),
6967        );
6968        let resp = svc.query(&req).unwrap();
6969        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6970        assert_eq!(body["Count"], 1);
6971        let lek = &body["LastEvaluatedKey"];
6972        assert!(lek.is_object(), "should have LastEvaluatedKey");
6973        // Must contain the index keys
6974        assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
6975        assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
6976        // Must also contain the table PK
6977        assert!(
6978            lek["pk"].is_object(),
6979            "LEK must contain table PK for GSI queries"
6980        );
6981    }
6982
6983    #[test]
6984    fn gsi_query_pagination_returns_all_items() {
6985        let svc = make_service();
6986        create_gsi_table(&svc);
6987
6988        // Insert 4 items with the SAME GSI key but different table PKs
6989        for i in 0..4 {
6990            let req = make_request(
6991                "PutItem",
6992                json!({
6993                    "TableName": "gsi-table",
6994                    "Item": {
6995                        "pk": { "S": format!("item{i:03}") },
6996                        "gsi_pk": { "S": "shared" },
6997                        "gsi_sk": { "S": "sort" }
6998                    }
6999                }),
7000            );
7001            svc.put_item(&req).unwrap();
7002        }
7003
7004        // Paginate through all items with Limit=2
7005        let mut all_pks = Vec::new();
7006        let mut lek: Option<Value> = None;
7007
7008        loop {
7009            let mut query = json!({
7010                "TableName": "gsi-table",
7011                "IndexName": "gsi-index",
7012                "KeyConditionExpression": "gsi_pk = :v",
7013                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
7014                "Limit": 2
7015            });
7016            if let Some(ref start_key) = lek {
7017                query["ExclusiveStartKey"] = start_key.clone();
7018            }
7019
7020            let req = make_request("Query", query);
7021            let resp = svc.query(&req).unwrap();
7022            let body: Value = serde_json::from_slice(&resp.body).unwrap();
7023
7024            for item in body["Items"].as_array().unwrap() {
7025                let pk = item["pk"]["S"].as_str().unwrap().to_string();
7026                all_pks.push(pk);
7027            }
7028
7029            if body["LastEvaluatedKey"].is_object() {
7030                lek = Some(body["LastEvaluatedKey"].clone());
7031            } else {
7032                break;
7033            }
7034        }
7035
7036        all_pks.sort();
7037        assert_eq!(
7038            all_pks,
7039            vec!["item000", "item001", "item002", "item003"],
7040            "pagination should return all items without duplicates"
7041        );
7042    }
7043
7044    fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
7045        pairs
7046            .iter()
7047            .map(|(k, v)| (k.to_string(), json!({"S": v})))
7048            .collect()
7049    }
7050
7051    fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
7052        pairs
7053            .iter()
7054            .map(|(k, v)| (k.to_string(), v.to_string()))
7055            .collect()
7056    }
7057
7058    fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
7059        pairs
7060            .iter()
7061            .map(|(k, v)| (k.to_string(), json!({"S": v})))
7062            .collect()
7063    }
7064
7065    #[test]
7066    fn test_evaluate_condition_bare_not_equal() {
7067        let item = cond_item(&[("state", "active")]);
7068        let names = cond_names(&[("#s", "state")]);
7069        let values = cond_values(&[(":c", "complete")]);
7070
7071        assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
7072
7073        let item2 = cond_item(&[("state", "complete")]);
7074        assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
7075    }
7076
7077    #[test]
7078    fn test_evaluate_condition_parenthesized_not_equal() {
7079        let item = cond_item(&[("state", "active")]);
7080        let names = cond_names(&[("#s", "state")]);
7081        let values = cond_values(&[(":c", "complete")]);
7082
7083        assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
7084    }
7085
7086    #[test]
7087    fn test_evaluate_condition_parenthesized_equal_mismatch() {
7088        let item = cond_item(&[("state", "active")]);
7089        let names = cond_names(&[("#s", "state")]);
7090        let values = cond_values(&[(":c", "complete")]);
7091
7092        assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
7093    }
7094
7095    #[test]
7096    fn test_evaluate_condition_compound_and() {
7097        let item = cond_item(&[("state", "active")]);
7098        let names = cond_names(&[("#s", "state")]);
7099        let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
7100
7101        // active <> complete AND active <> failed => true
7102        assert!(
7103            evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
7104        );
7105    }
7106
7107    #[test]
7108    fn test_evaluate_condition_compound_and_mismatch() {
7109        let item = cond_item(&[("state", "inactive")]);
7110        let names = cond_names(&[("#s", "state")]);
7111        let values = cond_values(&[(":a", "active"), (":b", "active")]);
7112
7113        // inactive = active AND inactive = active => false
7114        assert!(
7115            evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
7116        );
7117    }
7118
7119    #[test]
7120    fn test_evaluate_condition_compound_or() {
7121        let item = cond_item(&[("state", "running")]);
7122        let names = cond_names(&[("#s", "state")]);
7123        let values = cond_values(&[(":a", "active"), (":b", "idle")]);
7124
7125        // running = active OR running = idle => false
7126        assert!(
7127            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
7128        );
7129
7130        // running = active OR running = running => true
7131        let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
7132        assert!(
7133            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
7134        );
7135    }
7136
7137    #[test]
7138    fn test_evaluate_condition_not_operator() {
7139        let item = cond_item(&[("state", "active")]);
7140        let names = cond_names(&[("#s", "state")]);
7141        let values = cond_values(&[(":c", "complete")]);
7142
7143        // NOT (active = complete) => NOT false => true
7144        assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
7145
7146        // NOT (active <> complete) => NOT true => false
7147        assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
7148
7149        // NOT attribute_exists(#s) on existing item => NOT true => false
7150        assert!(
7151            evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
7152        );
7153
7154        // NOT attribute_exists(#s) on missing item => NOT false => true
7155        assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
7156    }
7157
7158    #[test]
7159    fn test_evaluate_condition_begins_with() {
7160        // After unification, conditions support begins_with via
7161        // evaluate_single_filter_condition (previously only filters had it).
7162        let item = cond_item(&[("name", "fakecloud-dynamodb")]);
7163        let names = cond_names(&[("#n", "name")]);
7164        let values = cond_values(&[(":p", "fakecloud")]);
7165
7166        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
7167
7168        let values2 = cond_values(&[(":p", "realcloud")]);
7169        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
7170    }
7171
7172    #[test]
7173    fn test_evaluate_condition_contains() {
7174        let item = cond_item(&[("tags", "alpha,beta,gamma")]);
7175        let names = cond_names(&[("#t", "tags")]);
7176        let values = cond_values(&[(":v", "beta")]);
7177
7178        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
7179
7180        let values2 = cond_values(&[(":v", "delta")]);
7181        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
7182    }
7183
7184    #[test]
7185    fn test_evaluate_condition_no_existing_item() {
7186        // When no item exists (PutItem with condition), attribute_not_exists
7187        // should succeed and attribute_exists should fail.
7188        let names = cond_names(&[("#s", "state")]);
7189        let values = cond_values(&[(":v", "active")]);
7190
7191        assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
7192        assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
7193        // Comparison against missing item: None != Some(val) => true for <>
7194        assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
7195        // None == Some(val) => false for =
7196        assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
7197    }
7198
7199    #[test]
7200    fn test_evaluate_filter_not_operator() {
7201        let item = cond_item(&[("status", "pending")]);
7202        let names = cond_names(&[("#s", "status")]);
7203        let values = cond_values(&[(":v", "pending")]);
7204
7205        assert!(!evaluate_filter_expression(
7206            "NOT (#s = :v)",
7207            &item,
7208            &names,
7209            &values
7210        ));
7211        assert!(evaluate_filter_expression(
7212            "NOT (#s <> :v)",
7213            &item,
7214            &names,
7215            &values
7216        ));
7217    }
7218
7219    #[test]
7220    fn test_evaluate_filter_expression_in_match() {
7221        // aws-sdk-go v2's expression.Name("state").In(Value("active"), Value("pending"))
7222        // emits "#0 IN (:0, :1)". Before fix: neither evaluate_single_filter_condition
7223        // nor evaluate_single_key_condition handled IN, so the filter leaf fell through
7224        // to the simple-comparison loop, hit no operators, and returned `true` — meaning
7225        // every item matched every IN filter regardless of value.
7226        let item = cond_item(&[("state", "active")]);
7227        let names = cond_names(&[("#s", "state")]);
7228        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7229
7230        assert!(
7231            evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
7232            "state=active should match IN (active, pending)"
7233        );
7234    }
7235
7236    #[test]
7237    fn test_evaluate_filter_expression_in_no_match() {
7238        let item = cond_item(&[("state", "complete")]);
7239        let names = cond_names(&[("#s", "state")]);
7240        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7241
7242        assert!(
7243            !evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
7244            "state=complete should not match IN (active, pending)"
7245        );
7246    }
7247
7248    #[test]
7249    fn test_evaluate_filter_expression_in_no_spaces() {
7250        // orderbot emits the raw form
7251        //     "#status IN (" + strings.Join(keys, ",") + ")"
7252        // which produces "IN (:v0,:v1,:v2)" — no spaces after commas. Must parse.
7253        let item = cond_item(&[("status", "shipped")]);
7254        let names = cond_names(&[("#s", "status")]);
7255        let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
7256
7257        assert!(
7258            evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
7259            "no-space IN list should still parse"
7260        );
7261    }
7262
7263    #[test]
7264    fn test_evaluate_filter_expression_in_missing_attribute() {
7265        // A missing attribute must not match any IN list — the silent-true
7266        // fallthrough would wrongly accept these items.
7267        let item: HashMap<String, AttributeValue> = HashMap::new();
7268        let names = cond_names(&[("#s", "state")]);
7269        let values = cond_values(&[(":a", "active")]);
7270
7271        assert!(
7272            !evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
7273            "missing attribute should not match any IN list"
7274        );
7275    }
7276
7277    #[test]
7278    fn test_evaluate_filter_expression_compound_in_and_eq() {
7279        // Shape emitted by `Name("state").In(...).And(Name("priority").Equal(...))`:
7280        //     "(#0 IN (:0, :1)) AND (#1 = :2)"
7281        // split_on_and handles the outer parens, but the IN leaf had the
7282        // silent-true fallthrough, so any item with priority=high would match
7283        // regardless of state.
7284        let item = cond_item(&[("state", "active"), ("priority", "high")]);
7285        let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
7286        let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
7287
7288        assert!(
7289            evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
7290            "(active IN (active, pending)) AND (high = high) should match"
7291        );
7292
7293        let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
7294        assert!(
7295            !evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
7296            "(complete IN (active, pending)) AND (high = high) should not match"
7297        );
7298    }
7299
7300    #[test]
7301    fn test_evaluate_condition_attribute_exists_with_space() {
7302        // aws-sdk-go v2's expression.NewBuilder emits function calls with a
7303        // space between the name and the opening paren:
7304        //     "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))"
7305        // Before fix: extract_function_arg used strip_prefix("attribute_exists(")
7306        // with no space, so these fell through the filter leaf entirely and
7307        // hit evaluate_single_key_condition's silent-true fallthrough —
7308        // every conditional write was silently accepted.
7309        let item = cond_item(&[("store_id", "s-1")]);
7310        let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
7311        let values = cond_values(&[(":0", "tab-A")]);
7312
7313        // On an existing item without active_viewer_tab_id: exists(store_id)
7314        // is true, not_exists(active_viewer_tab_id) is true → OK.
7315        assert!(
7316            evaluate_condition(
7317                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7318                Some(&item),
7319                &names,
7320                &values,
7321            )
7322            .is_ok(),
7323            "claim-lease compound on free item should succeed"
7324        );
7325
7326        // On a missing item: exists(store_id) is false → whole AND false → Err.
7327        assert!(
7328            evaluate_condition(
7329                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7330                None,
7331                &names,
7332                &values,
7333            )
7334            .is_err(),
7335            "claim-lease compound on missing item must fail attribute_exists branch"
7336        );
7337
7338        // On an item already held by tab-B: exists ✓, not_exists ✗, #1 = :0 ✗
7339        // → (✓) AND ((✗) OR (✗)) → false → Err.
7340        let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
7341        assert!(
7342            evaluate_condition(
7343                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7344                Some(&held),
7345                &names,
7346                &values,
7347            )
7348            .is_err(),
7349            "claim-lease compound on item held by another tab must fail"
7350        );
7351
7352        // Same tab re-claiming: exists ✓, not_exists ✗, #1 = :0 ✓
7353        // → (✓) AND ((✗) OR (✓)) → true → Ok.
7354        let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
7355        assert!(
7356            evaluate_condition(
7357                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7358                Some(&self_held),
7359                &names,
7360                &values,
7361            )
7362            .is_ok(),
7363            "same-tab re-claim must succeed"
7364        );
7365    }
7366
7367    #[test]
7368    fn test_evaluate_condition_in_match() {
7369        // evaluate_condition delegates to evaluate_filter_expression, so this
7370        // also proves the ConditionExpression path. Before fix: silently Ok.
7371        let item = cond_item(&[("state", "active")]);
7372        let names = cond_names(&[("#s", "state")]);
7373        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7374
7375        assert!(
7376            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
7377            "IN should succeed when actual value is in the list"
7378        );
7379    }
7380
7381    #[test]
7382    fn test_evaluate_condition_in_no_match() {
7383        // Before fix: evaluate_condition silently returned Ok(()) for IN — any
7384        // conditional write was accepted regardless of actual state, the
7385        // opposite of what the caller asked for.
7386        let item = cond_item(&[("state", "complete")]);
7387        let names = cond_names(&[("#s", "state")]);
7388        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7389
7390        assert!(
7391            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
7392            "IN should fail when actual value is not in the list"
7393        );
7394    }
7395
7396    #[test]
7397    fn test_apply_update_set_list_index_replaces_existing() {
7398        // Shape emitted by orderbot's order-item update retry loop:
7399        //     UpdateExpression: fmt.Sprintf("SET #items[%d] = :item", index)
7400        // Before fix: apply_set_assignment called resolve_attr_name on the
7401        // whole "#items[0]" token, which misses the name map, and then
7402        // item.insert("#items[0]", :item), producing a top-level key
7403        // literally named "#items[0]" rather than mutating the list.
7404        let mut item = HashMap::new();
7405        item.insert(
7406            "items".to_string(),
7407            json!({"L": [
7408                {"M": {"sku": {"S": "OLD-A"}}},
7409                {"M": {"sku": {"S": "OLD-B"}}},
7410            ]}),
7411        );
7412
7413        let names = cond_names(&[("#items", "items")]);
7414        let mut values = HashMap::new();
7415        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
7416
7417        apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
7418
7419        let items_list = item
7420            .get("items")
7421            .and_then(|v| v.get("L"))
7422            .and_then(|v| v.as_array())
7423            .expect("items should still be a list");
7424        assert_eq!(items_list.len(), 2, "list length should be unchanged");
7425        let sku0 = items_list[0]
7426            .get("M")
7427            .and_then(|m| m.get("sku"))
7428            .and_then(|s| s.get("S"))
7429            .and_then(|s| s.as_str());
7430        assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
7431        let sku1 = items_list[1]
7432            .get("M")
7433            .and_then(|m| m.get("sku"))
7434            .and_then(|s| s.get("S"))
7435            .and_then(|s| s.as_str());
7436        assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
7437
7438        assert!(!item.contains_key("items[0]"));
7439        assert!(!item.contains_key("#items[0]"));
7440    }
7441
7442    #[test]
7443    fn test_apply_update_set_list_index_second_slot() {
7444        let mut item = HashMap::new();
7445        item.insert(
7446            "items".to_string(),
7447            json!({"L": [
7448                {"M": {"sku": {"S": "A"}}},
7449                {"M": {"sku": {"S": "B"}}},
7450                {"M": {"sku": {"S": "C"}}},
7451            ]}),
7452        );
7453
7454        let names = cond_names(&[("#items", "items")]);
7455        let mut values = HashMap::new();
7456        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
7457
7458        apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
7459
7460        let items_list = item
7461            .get("items")
7462            .and_then(|v| v.get("L"))
7463            .and_then(|v| v.as_array())
7464            .unwrap();
7465        let skus: Vec<&str> = items_list
7466            .iter()
7467            .map(|v| {
7468                v.get("M")
7469                    .and_then(|m| m.get("sku"))
7470                    .and_then(|s| s.get("S"))
7471                    .and_then(|s| s.as_str())
7472                    .unwrap()
7473            })
7474            .collect();
7475        assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
7476    }
7477
7478    #[test]
7479    fn test_apply_update_set_list_index_without_name_ref() {
7480        // Same fix must also work when the LHS is a literal attribute name,
7481        // not an expression attribute name ref.
7482        let mut item = HashMap::new();
7483        item.insert(
7484            "tags".to_string(),
7485            json!({"L": [{"S": "red"}, {"S": "blue"}]}),
7486        );
7487
7488        let names: HashMap<String, String> = HashMap::new();
7489        let mut values = HashMap::new();
7490        values.insert(":t".to_string(), json!({"S": "green"}));
7491
7492        apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
7493
7494        let tags = item
7495            .get("tags")
7496            .and_then(|v| v.get("L"))
7497            .and_then(|v| v.as_array())
7498            .unwrap();
7499        assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
7500        assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
7501    }
7502
7503    #[test]
7504    fn test_unrecognized_expression_returns_false() {
7505        // evaluate_single_key_condition must fail-closed: an expression shape
7506        // it doesn't recognize should return false (reject), not true (accept).
7507        let item = cond_item(&[("x", "1")]);
7508        let names: HashMap<String, String> = HashMap::new();
7509        let values: HashMap<String, Value> = HashMap::new();
7510
7511        assert!(
7512            !evaluate_single_key_condition("GARBAGE NONSENSE", &item, "", &names, &values),
7513            "unrecognized expression must return false"
7514        );
7515    }
7516
7517    #[test]
7518    fn test_set_list_index_out_of_range_returns_error() {
7519        // SET list[N] where N > len must return a ValidationException,
7520        // not silently no-op.
7521        let mut item = HashMap::new();
7522        item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
7523
7524        let names: HashMap<String, String> = HashMap::new();
7525        let mut values = HashMap::new();
7526        values.insert(":v".to_string(), json!({"S": "z"}));
7527
7528        let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
7529        assert!(
7530            result.is_err(),
7531            "out-of-range list index must return an error"
7532        );
7533
7534        // List should be unchanged
7535        let list = item
7536            .get("items")
7537            .and_then(|v| v.get("L"))
7538            .and_then(|v| v.as_array())
7539            .unwrap();
7540        assert_eq!(list.len(), 2);
7541    }
7542
7543    #[test]
7544    fn test_set_list_index_on_non_list_returns_error() {
7545        // SET attr[0] = :v where attr is a string (not a list) must return
7546        // a ValidationException.
7547        let mut item = HashMap::new();
7548        item.insert("name".to_string(), json!({"S": "hello"}));
7549
7550        let names: HashMap<String, String> = HashMap::new();
7551        let mut values = HashMap::new();
7552        values.insert(":v".to_string(), json!({"S": "z"}));
7553
7554        let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
7555        assert!(
7556            result.is_err(),
7557            "list index on non-list attribute must return an error"
7558        );
7559    }
7560
7561    #[test]
7562    fn test_unrecognized_update_action_returns_error() {
7563        let mut item = HashMap::new();
7564        item.insert("name".to_string(), json!({"S": "hello"}));
7565
7566        let names: HashMap<String, String> = HashMap::new();
7567        let mut values = HashMap::new();
7568        values.insert(":bar".to_string(), json!({"S": "baz"}));
7569
7570        let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
7571        assert!(
7572            result.is_err(),
7573            "unrecognized UpdateExpression action must return an error"
7574        );
7575        let err_msg = format!("{}", result.unwrap_err());
7576        assert!(
7577            err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
7578            "error should mention Invalid UpdateExpression, got: {err_msg}"
7579        );
7580    }
7581
7582    // ── size() function tests ──────────────────────────────────────────
7583
7584    #[test]
7585    fn test_size_string() {
7586        let mut item = HashMap::new();
7587        item.insert("name".to_string(), json!({"S": "hello"}));
7588        let names = HashMap::new();
7589        let mut values = HashMap::new();
7590        values.insert(":limit".to_string(), json!({"N": "5"}));
7591
7592        assert!(evaluate_single_filter_condition(
7593            "size(name) = :limit",
7594            &item,
7595            &names,
7596            &values,
7597        ));
7598        values.insert(":limit".to_string(), json!({"N": "4"}));
7599        assert!(evaluate_single_filter_condition(
7600            "size(name) > :limit",
7601            &item,
7602            &names,
7603            &values,
7604        ));
7605    }
7606
7607    #[test]
7608    fn test_size_list() {
7609        let mut item = HashMap::new();
7610        item.insert(
7611            "items".to_string(),
7612            json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
7613        );
7614        let names = HashMap::new();
7615        let mut values = HashMap::new();
7616        values.insert(":limit".to_string(), json!({"N": "3"}));
7617
7618        assert!(evaluate_single_filter_condition(
7619            "size(items) = :limit",
7620            &item,
7621            &names,
7622            &values,
7623        ));
7624    }
7625
7626    #[test]
7627    fn test_size_map() {
7628        let mut item = HashMap::new();
7629        item.insert(
7630            "data".to_string(),
7631            json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
7632        );
7633        let names = HashMap::new();
7634        let mut values = HashMap::new();
7635        values.insert(":limit".to_string(), json!({"N": "2"}));
7636
7637        assert!(evaluate_single_filter_condition(
7638            "size(data) = :limit",
7639            &item,
7640            &names,
7641            &values,
7642        ));
7643    }
7644
7645    #[test]
7646    fn test_size_set() {
7647        let mut item = HashMap::new();
7648        item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
7649        let names = HashMap::new();
7650        let mut values = HashMap::new();
7651        values.insert(":limit".to_string(), json!({"N": "3"}));
7652
7653        assert!(evaluate_single_filter_condition(
7654            "size(tags) > :limit",
7655            &item,
7656            &names,
7657            &values,
7658        ));
7659    }
7660
7661    // ── attribute_type() function tests ────────────────────────────────
7662
7663    #[test]
7664    fn test_attribute_type_string() {
7665        let mut item = HashMap::new();
7666        item.insert("name".to_string(), json!({"S": "hello"}));
7667        let names = HashMap::new();
7668        let mut values = HashMap::new();
7669        values.insert(":t".to_string(), json!({"S": "S"}));
7670
7671        assert!(evaluate_single_filter_condition(
7672            "attribute_type(name, :t)",
7673            &item,
7674            &names,
7675            &values,
7676        ));
7677
7678        values.insert(":t".to_string(), json!({"S": "N"}));
7679        assert!(!evaluate_single_filter_condition(
7680            "attribute_type(name, :t)",
7681            &item,
7682            &names,
7683            &values,
7684        ));
7685    }
7686
7687    #[test]
7688    fn test_attribute_type_number() {
7689        let mut item = HashMap::new();
7690        item.insert("age".to_string(), json!({"N": "42"}));
7691        let names = HashMap::new();
7692        let mut values = HashMap::new();
7693        values.insert(":t".to_string(), json!({"S": "N"}));
7694
7695        assert!(evaluate_single_filter_condition(
7696            "attribute_type(age, :t)",
7697            &item,
7698            &names,
7699            &values,
7700        ));
7701    }
7702
7703    #[test]
7704    fn test_attribute_type_list() {
7705        let mut item = HashMap::new();
7706        item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
7707        let names = HashMap::new();
7708        let mut values = HashMap::new();
7709        values.insert(":t".to_string(), json!({"S": "L"}));
7710
7711        assert!(evaluate_single_filter_condition(
7712            "attribute_type(items, :t)",
7713            &item,
7714            &names,
7715            &values,
7716        ));
7717    }
7718
7719    #[test]
7720    fn test_attribute_type_map() {
7721        let mut item = HashMap::new();
7722        item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
7723        let names = HashMap::new();
7724        let mut values = HashMap::new();
7725        values.insert(":t".to_string(), json!({"S": "M"}));
7726
7727        assert!(evaluate_single_filter_condition(
7728            "attribute_type(data, :t)",
7729            &item,
7730            &names,
7731            &values,
7732        ));
7733    }
7734
7735    #[test]
7736    fn test_attribute_type_bool() {
7737        let mut item = HashMap::new();
7738        item.insert("active".to_string(), json!({"BOOL": true}));
7739        let names = HashMap::new();
7740        let mut values = HashMap::new();
7741        values.insert(":t".to_string(), json!({"S": "BOOL"}));
7742
7743        assert!(evaluate_single_filter_condition(
7744            "attribute_type(active, :t)",
7745            &item,
7746            &names,
7747            &values,
7748        ));
7749    }
7750
7751    // ── begins_with rejects non-string types ───────────────────────────
7752
7753    #[test]
7754    fn test_begins_with_rejects_number_type() {
7755        let mut item = HashMap::new();
7756        item.insert("code".to_string(), json!({"N": "12345"}));
7757        let names = HashMap::new();
7758        let mut values = HashMap::new();
7759        values.insert(":prefix".to_string(), json!({"S": "123"}));
7760
7761        assert!(
7762            !evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
7763            "begins_with must return false for N-type attributes"
7764        );
7765    }
7766
7767    #[test]
7768    fn test_begins_with_works_on_string_type() {
7769        let mut item = HashMap::new();
7770        item.insert("code".to_string(), json!({"S": "abc123"}));
7771        let names = HashMap::new();
7772        let mut values = HashMap::new();
7773        values.insert(":prefix".to_string(), json!({"S": "abc"}));
7774
7775        assert!(evaluate_single_filter_condition(
7776            "begins_with(code, :prefix)",
7777            &item,
7778            &names,
7779            &values,
7780        ));
7781    }
7782
7783    // ── contains on sets ───────────────────────────────────────────────
7784
7785    #[test]
7786    fn test_contains_string_set() {
7787        let mut item = HashMap::new();
7788        item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
7789        let names = HashMap::new();
7790        let mut values = HashMap::new();
7791        values.insert(":val".to_string(), json!({"S": "blue"}));
7792
7793        assert!(evaluate_single_filter_condition(
7794            "contains(tags, :val)",
7795            &item,
7796            &names,
7797            &values,
7798        ));
7799
7800        values.insert(":val".to_string(), json!({"S": "yellow"}));
7801        assert!(!evaluate_single_filter_condition(
7802            "contains(tags, :val)",
7803            &item,
7804            &names,
7805            &values,
7806        ));
7807    }
7808
7809    #[test]
7810    fn test_contains_number_set() {
7811        let mut item = HashMap::new();
7812        item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
7813        let names = HashMap::new();
7814        let mut values = HashMap::new();
7815        values.insert(":val".to_string(), json!({"N": "2"}));
7816
7817        assert!(evaluate_single_filter_condition(
7818            "contains(scores, :val)",
7819            &item,
7820            &names,
7821            &values,
7822        ));
7823    }
7824
7825    // ── SET arithmetic type validation ─────────────────────────────────
7826
7827    #[test]
7828    fn test_set_arithmetic_rejects_string_operand() {
7829        let mut item = HashMap::new();
7830        item.insert("name".to_string(), json!({"S": "hello"}));
7831        let names = HashMap::new();
7832        let mut values = HashMap::new();
7833        values.insert(":val".to_string(), json!({"N": "1"}));
7834
7835        let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
7836        assert!(
7837            result.is_err(),
7838            "arithmetic on S-type attribute must return a ValidationException"
7839        );
7840    }
7841
7842    #[test]
7843    fn test_set_arithmetic_rejects_string_value() {
7844        let mut item = HashMap::new();
7845        item.insert("count".to_string(), json!({"N": "5"}));
7846        let names = HashMap::new();
7847        let mut values = HashMap::new();
7848        values.insert(":val".to_string(), json!({"S": "notanumber"}));
7849
7850        let result =
7851            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
7852        assert!(
7853            result.is_err(),
7854            "arithmetic with S-type value must return a ValidationException"
7855        );
7856    }
7857
7858    #[test]
7859    fn test_set_arithmetic_valid_numbers() {
7860        let mut item = HashMap::new();
7861        item.insert("count".to_string(), json!({"N": "10"}));
7862        let names = HashMap::new();
7863        let mut values = HashMap::new();
7864        values.insert(":val".to_string(), json!({"N": "3"}));
7865
7866        let result =
7867            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
7868        assert!(result.is_ok());
7869        assert_eq!(item["count"], json!({"N": "13"}));
7870    }
7871
7872    // ── Binary Set (BS) support in ADD/DELETE ──────────────────────────
7873
7874    #[test]
7875    fn test_add_binary_set() {
7876        let mut item = HashMap::new();
7877        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
7878        let names = HashMap::new();
7879        let mut values = HashMap::new();
7880        values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
7881
7882        let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
7883        assert!(result.is_ok());
7884        let bs = item["data"]["BS"].as_array().unwrap();
7885        assert_eq!(bs.len(), 3, "should merge sets without duplicates");
7886        assert!(bs.contains(&json!("YQ==")));
7887        assert!(bs.contains(&json!("Yg==")));
7888        assert!(bs.contains(&json!("Yw==")));
7889    }
7890
7891    #[test]
7892    fn test_delete_binary_set() {
7893        let mut item = HashMap::new();
7894        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
7895        let names = HashMap::new();
7896        let mut values = HashMap::new();
7897        values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
7898
7899        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
7900        assert!(result.is_ok());
7901        let bs = item["data"]["BS"].as_array().unwrap();
7902        assert_eq!(bs.len(), 2);
7903        assert!(!bs.contains(&json!("Yg==")));
7904    }
7905
7906    #[test]
7907    fn test_delete_binary_set_removes_attr_when_empty() {
7908        let mut item = HashMap::new();
7909        item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
7910        let names = HashMap::new();
7911        let mut values = HashMap::new();
7912        values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
7913
7914        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
7915        assert!(result.is_ok());
7916        assert!(
7917            !item.contains_key("data"),
7918            "attribute should be removed when set becomes empty"
7919        );
7920    }
7921}