Skip to main content

fakecloud_dynamodb/
service.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use parking_lot::RwLock;
9use serde_json::{json, Value};
10
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_s3::state::SharedS3State;
16
17use crate::state::{
18    attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
19    ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
20    KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
21    ReplicaDescription, SharedDynamoDbState,
22};
23
24pub struct DynamoDbService {
25    state: SharedDynamoDbState,
26    s3_state: Option<SharedS3State>,
27    delivery: Option<Arc<DeliveryBus>>,
28}
29
30impl DynamoDbService {
31    pub fn new(state: SharedDynamoDbState) -> Self {
32        Self {
33            state,
34            s3_state: None,
35            delivery: None,
36        }
37    }
38
39    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
40        self.s3_state = Some(s3_state);
41        self
42    }
43
44    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
45        self.delivery = Some(delivery);
46        self
47    }
48
49    /// Deliver a change record to all active Kinesis streaming destinations for a table.
50    fn deliver_to_kinesis_destinations(
51        &self,
52        table: &DynamoTable,
53        event_name: &str,
54        keys: &HashMap<String, AttributeValue>,
55        old_image: Option<&HashMap<String, AttributeValue>>,
56        new_image: Option<&HashMap<String, AttributeValue>>,
57    ) {
58        let delivery = match &self.delivery {
59            Some(d) => d,
60            None => return,
61        };
62
63        let active_destinations: Vec<_> = table
64            .kinesis_destinations
65            .iter()
66            .filter(|d| d.destination_status == "ACTIVE")
67            .collect();
68
69        if active_destinations.is_empty() {
70            return;
71        }
72
73        let mut record = json!({
74            "eventID": uuid::Uuid::new_v4().to_string(),
75            "eventName": event_name,
76            "eventVersion": "1.1",
77            "eventSource": "aws:dynamodb",
78            "awsRegion": table.arn.split(':').nth(3).unwrap_or("us-east-1"),
79            "dynamodb": {
80                "Keys": keys,
81                "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
82                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
83                "StreamViewType": "NEW_AND_OLD_IMAGES",
84            },
85            "eventSourceARN": &table.arn,
86            "tableName": &table.name,
87        });
88
89        if let Some(old) = old_image {
90            record["dynamodb"]["OldImage"] = json!(old);
91        }
92        if let Some(new) = new_image {
93            record["dynamodb"]["NewImage"] = json!(new);
94        }
95
96        let record_str = serde_json::to_string(&record).unwrap_or_default();
97        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
98        let partition_key = serde_json::to_string(keys).unwrap_or_default();
99
100        for dest in active_destinations {
101            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
102        }
103    }
104
105    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
106        serde_json::from_slice(&req.body).map_err(|e| {
107            AwsServiceError::aws_error(
108                StatusCode::BAD_REQUEST,
109                "SerializationException",
110                format!("Invalid JSON: {e}"),
111            )
112        })
113    }
114
115    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
116        Ok(AwsResponse::ok_json(body))
117    }
118
119    // ── Table Operations ────────────────────────────────────────────────
120
121    fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
122        let body = Self::parse_body(req)?;
123
124        let table_name = body["TableName"]
125            .as_str()
126            .ok_or_else(|| {
127                AwsServiceError::aws_error(
128                    StatusCode::BAD_REQUEST,
129                    "ValidationException",
130                    "TableName is required",
131                )
132            })?
133            .to_string();
134
135        let key_schema = parse_key_schema(&body["KeySchema"])?;
136        let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
137
138        // Validate that key schema attributes are defined
139        for ks in &key_schema {
140            if !attribute_definitions
141                .iter()
142                .any(|ad| ad.attribute_name == ks.attribute_name)
143            {
144                return Err(AwsServiceError::aws_error(
145                    StatusCode::BAD_REQUEST,
146                    "ValidationException",
147                    format!(
148                        "One or more parameter values were invalid: \
149                         Some index key attributes are not defined in AttributeDefinitions. \
150                         Keys: [{}], AttributeDefinitions: [{}]",
151                        ks.attribute_name,
152                        attribute_definitions
153                            .iter()
154                            .map(|ad| ad.attribute_name.as_str())
155                            .collect::<Vec<_>>()
156                            .join(", ")
157                    ),
158                ));
159            }
160        }
161
162        let billing_mode = body["BillingMode"]
163            .as_str()
164            .unwrap_or("PROVISIONED")
165            .to_string();
166
167        let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
168            ProvisionedThroughput {
169                read_capacity_units: 0,
170                write_capacity_units: 0,
171            }
172        } else {
173            parse_provisioned_throughput(&body["ProvisionedThroughput"])?
174        };
175
176        let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
177        let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
178        let tags = parse_tags(&body["Tags"]);
179
180        // Parse StreamSpecification
181        let (stream_enabled, stream_view_type) =
182            if let Some(stream_spec) = body.get("StreamSpecification") {
183                let enabled = stream_spec
184                    .get("StreamEnabled")
185                    .and_then(|v| v.as_bool())
186                    .unwrap_or(false);
187                let view_type = if enabled {
188                    stream_spec
189                        .get("StreamViewType")
190                        .and_then(|v| v.as_str())
191                        .map(|s| s.to_string())
192                } else {
193                    None
194                };
195                (enabled, view_type)
196            } else {
197                (false, None)
198            };
199
200        // Parse SSESpecification
201        let (sse_type, sse_kms_key_arn) = if let Some(sse_spec) = body.get("SSESpecification") {
202            let enabled = sse_spec
203                .get("Enabled")
204                .and_then(|v| v.as_bool())
205                .unwrap_or(false);
206            if enabled {
207                let sse_type = sse_spec
208                    .get("SSEType")
209                    .and_then(|v| v.as_str())
210                    .unwrap_or("KMS")
211                    .to_string();
212                let kms_key = sse_spec
213                    .get("KMSMasterKeyId")
214                    .and_then(|v| v.as_str())
215                    .map(|s| s.to_string());
216                (Some(sse_type), kms_key)
217            } else {
218                (None, None)
219            }
220        } else {
221            (None, None)
222        };
223
224        let mut state = self.state.write();
225
226        if state.tables.contains_key(&table_name) {
227            return Err(AwsServiceError::aws_error(
228                StatusCode::BAD_REQUEST,
229                "ResourceInUseException",
230                format!("Table already exists: {table_name}"),
231            ));
232        }
233
234        let now = Utc::now();
235        let arn = format!(
236            "arn:aws:dynamodb:{}:{}:table/{}",
237            state.region, state.account_id, table_name
238        );
239        let stream_arn = if stream_enabled {
240            Some(format!(
241                "arn:aws:dynamodb:{}:{}:table/{}/stream/{}",
242                state.region,
243                state.account_id,
244                table_name,
245                now.format("%Y-%m-%dT%H:%M:%S.%3f")
246            ))
247        } else {
248            None
249        };
250
251        let table = DynamoTable {
252            name: table_name.clone(),
253            arn: arn.clone(),
254            key_schema: key_schema.clone(),
255            attribute_definitions: attribute_definitions.clone(),
256            provisioned_throughput: provisioned_throughput.clone(),
257            items: Vec::new(),
258            gsi: gsi.clone(),
259            lsi: lsi.clone(),
260            tags,
261            created_at: now,
262            status: "ACTIVE".to_string(),
263            item_count: 0,
264            size_bytes: 0,
265            billing_mode: billing_mode.clone(),
266            ttl_attribute: None,
267            ttl_enabled: false,
268            resource_policy: None,
269            pitr_enabled: false,
270            kinesis_destinations: Vec::new(),
271            contributor_insights_status: "DISABLED".to_string(),
272            contributor_insights_counters: HashMap::new(),
273            stream_enabled,
274            stream_view_type,
275            stream_arn,
276            stream_records: Arc::new(RwLock::new(Vec::new())),
277            sse_type,
278            sse_kms_key_arn,
279        };
280
281        state.tables.insert(table_name, table);
282
283        let table_desc = build_table_description_json(
284            &arn,
285            &key_schema,
286            &attribute_definitions,
287            &provisioned_throughput,
288            &gsi,
289            &lsi,
290            &billing_mode,
291            now,
292            0,
293            0,
294            "ACTIVE",
295        );
296
297        Self::ok_json(json!({ "TableDescription": table_desc }))
298    }
299
300    fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301        let body = Self::parse_body(req)?;
302        let table_name = require_str(&body, "TableName")?;
303
304        let mut state = self.state.write();
305        let table = state.tables.remove(table_name).ok_or_else(|| {
306            AwsServiceError::aws_error(
307                StatusCode::BAD_REQUEST,
308                "ResourceNotFoundException",
309                format!("Requested resource not found: Table: {table_name} not found"),
310            )
311        })?;
312
313        let table_desc = build_table_description_json(
314            &table.arn,
315            &table.key_schema,
316            &table.attribute_definitions,
317            &table.provisioned_throughput,
318            &table.gsi,
319            &table.lsi,
320            &table.billing_mode,
321            table.created_at,
322            table.item_count,
323            table.size_bytes,
324            "DELETING",
325        );
326
327        Self::ok_json(json!({ "TableDescription": table_desc }))
328    }
329
330    fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
331        let body = Self::parse_body(req)?;
332        let table_name = require_str(&body, "TableName")?;
333
334        let state = self.state.read();
335        let table = get_table(&state.tables, table_name)?;
336
337        let table_desc = build_table_description(table);
338
339        Self::ok_json(json!({ "Table": table_desc }))
340    }
341
342    fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
343        let body = Self::parse_body(req)?;
344
345        validate_optional_string_length(
346            "exclusiveStartTableName",
347            body["ExclusiveStartTableName"].as_str(),
348            3,
349            255,
350        )?;
351        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
352
353        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
354        let exclusive_start = body["ExclusiveStartTableName"]
355            .as_str()
356            .map(|s| s.to_string());
357
358        let state = self.state.read();
359        let mut names: Vec<&String> = state.tables.keys().collect();
360        names.sort();
361
362        let start_idx = match &exclusive_start {
363            Some(start) => names
364                .iter()
365                .position(|n| n.as_str() > start.as_str())
366                .unwrap_or(names.len()),
367            None => 0,
368        };
369
370        let page: Vec<&str> = names
371            .iter()
372            .skip(start_idx)
373            .take(limit)
374            .map(|n| n.as_str())
375            .collect();
376
377        let mut result = json!({ "TableNames": page });
378
379        if start_idx + limit < names.len() {
380            if let Some(last) = page.last() {
381                result["LastEvaluatedTableName"] = json!(last);
382            }
383        }
384
385        Self::ok_json(result)
386    }
387
388    fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
389        let body = Self::parse_body(req)?;
390        let table_name = require_str(&body, "TableName")?;
391
392        let mut state = self.state.write();
393        let table = state.tables.get_mut(table_name).ok_or_else(|| {
394            AwsServiceError::aws_error(
395                StatusCode::BAD_REQUEST,
396                "ResourceNotFoundException",
397                format!("Requested resource not found: Table: {table_name} not found"),
398            )
399        })?;
400
401        if let Some(pt) = body.get("ProvisionedThroughput") {
402            if let Ok(throughput) = parse_provisioned_throughput(pt) {
403                table.provisioned_throughput = throughput;
404            }
405        }
406
407        if let Some(bm) = body["BillingMode"].as_str() {
408            table.billing_mode = bm.to_string();
409        }
410
411        // Handle SSESpecification update
412        if let Some(sse_spec) = body.get("SSESpecification") {
413            let enabled = sse_spec
414                .get("Enabled")
415                .and_then(|v| v.as_bool())
416                .unwrap_or(false);
417            if enabled {
418                table.sse_type = Some(
419                    sse_spec
420                        .get("SSEType")
421                        .and_then(|v| v.as_str())
422                        .unwrap_or("KMS")
423                        .to_string(),
424                );
425                table.sse_kms_key_arn = sse_spec
426                    .get("KMSMasterKeyId")
427                    .and_then(|v| v.as_str())
428                    .map(|s| s.to_string());
429            } else {
430                table.sse_type = None;
431                table.sse_kms_key_arn = None;
432            }
433        }
434
435        let table_desc = build_table_description_json(
436            &table.arn,
437            &table.key_schema,
438            &table.attribute_definitions,
439            &table.provisioned_throughput,
440            &table.gsi,
441            &table.lsi,
442            &table.billing_mode,
443            table.created_at,
444            table.item_count,
445            table.size_bytes,
446            &table.status,
447        );
448
449        Self::ok_json(json!({ "TableDescription": table_desc }))
450    }
451
452    // ── Item Operations ─────────────────────────────────────────────────
453
454    fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
455        // --- Parse request body and expression attributes WITHOUT holding any lock ---
456        let body = Self::parse_body(req)?;
457        let table_name = require_str(&body, "TableName")?;
458        let item = require_object(&body, "Item")?;
459        let condition = body["ConditionExpression"].as_str().map(|s| s.to_string());
460        let expr_attr_names = parse_expression_attribute_names(&body);
461        let expr_attr_values = parse_expression_attribute_values(&body);
462        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE").to_string();
463
464        // --- Acquire write lock ONLY for validation + mutation ---
465        // Capture kinesis delivery info alongside the return value
466        let (old_item, kinesis_info) = {
467            let mut state = self.state.write();
468            let table = get_table_mut(&mut state.tables, table_name)?;
469
470            validate_key_in_item(table, &item)?;
471
472            let key = extract_key(table, &item);
473            let existing_idx = table.find_item_index(&key);
474
475            if let Some(ref cond) = condition {
476                let existing = existing_idx.map(|i| &table.items[i]);
477                evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
478            }
479
480            let old_item_for_return = if return_values == "ALL_OLD" {
481                existing_idx.map(|i| table.items[i].clone())
482            } else {
483                None
484            };
485
486            // Capture old item for stream/kinesis if needed
487            let needs_change_capture = table.stream_enabled
488                || table
489                    .kinesis_destinations
490                    .iter()
491                    .any(|d| d.destination_status == "ACTIVE");
492            let old_item_for_stream = if needs_change_capture {
493                existing_idx.map(|i| table.items[i].clone())
494            } else {
495                None
496            };
497
498            let is_modify = existing_idx.is_some();
499
500            if let Some(idx) = existing_idx {
501                table.items[idx] = item.clone();
502            } else {
503                table.items.push(item.clone());
504            }
505
506            table.record_item_access(&item);
507            table.recalculate_stats();
508
509            let event_name = if is_modify { "MODIFY" } else { "INSERT" };
510            let key = extract_key(table, &item);
511
512            // Generate stream record
513            if table.stream_enabled {
514                if let Some(record) = crate::streams::generate_stream_record(
515                    table,
516                    event_name,
517                    key.clone(),
518                    old_item_for_stream.clone(),
519                    Some(item.clone()),
520                ) {
521                    crate::streams::add_stream_record(table, record);
522                }
523            }
524
525            // Capture kinesis delivery info (delivered after lock release)
526            let kinesis_info = if table
527                .kinesis_destinations
528                .iter()
529                .any(|d| d.destination_status == "ACTIVE")
530            {
531                Some((
532                    table.clone(),
533                    event_name.to_string(),
534                    key,
535                    old_item_for_stream,
536                    Some(item.clone()),
537                ))
538            } else {
539                None
540            };
541
542            (old_item_for_return, kinesis_info)
543        };
544        // --- Write lock released, build response ---
545
546        // Deliver to Kinesis destinations outside the lock
547        if let Some((table, event_name, keys, old_image, new_image)) = kinesis_info {
548            self.deliver_to_kinesis_destinations(
549                &table,
550                &event_name,
551                &keys,
552                old_image.as_ref(),
553                new_image.as_ref(),
554            );
555        }
556
557        let mut result = json!({});
558        if let Some(old) = old_item {
559            result["Attributes"] = json!(old);
560        }
561
562        Self::ok_json(result)
563    }
564
565    fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
566        // --- Parse request body WITHOUT holding any lock ---
567        let body = Self::parse_body(req)?;
568        let table_name = require_str(&body, "TableName")?;
569        let key = require_object(&body, "Key")?;
570
571        // --- Use a read lock for the lookup (allows concurrent GetItem calls) ---
572        let (result, needs_insights) = {
573            let state = self.state.read();
574            let table = get_table(&state.tables, table_name)?;
575            let needs_insights = table.contributor_insights_status == "ENABLED";
576
577            let result = match table.find_item_index(&key) {
578                Some(idx) => {
579                    let item = &table.items[idx];
580                    let projected = project_item(item, &body);
581                    json!({ "Item": projected })
582                }
583                None => json!({}),
584            };
585            (result, needs_insights)
586        };
587        // --- Read lock released ---
588
589        // Only acquire write lock if contributor insights tracking is enabled
590        if needs_insights {
591            let mut state = self.state.write();
592            if let Some(table) = state.tables.get_mut(table_name) {
593                table.record_key_access(&key);
594            }
595        }
596
597        Self::ok_json(result)
598    }
599
600    fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
601        let body = Self::parse_body(req)?;
602
603        validate_optional_enum_value(
604            "conditionalOperator",
605            &body["ConditionalOperator"],
606            &["AND", "OR"],
607        )?;
608        validate_optional_enum_value(
609            "returnConsumedCapacity",
610            &body["ReturnConsumedCapacity"],
611            &["INDEXES", "TOTAL", "NONE"],
612        )?;
613        validate_optional_enum_value(
614            "returnValues",
615            &body["ReturnValues"],
616            &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
617        )?;
618        validate_optional_enum_value(
619            "returnItemCollectionMetrics",
620            &body["ReturnItemCollectionMetrics"],
621            &["SIZE", "NONE"],
622        )?;
623        validate_optional_enum_value(
624            "returnValuesOnConditionCheckFailure",
625            &body["ReturnValuesOnConditionCheckFailure"],
626            &["ALL_OLD", "NONE"],
627        )?;
628
629        let table_name = require_str(&body, "TableName")?;
630        let key = require_object(&body, "Key")?;
631
632        let (result, kinesis_info) = {
633            let mut state = self.state.write();
634            let table = get_table_mut(&mut state.tables, table_name)?;
635
636            let condition = body["ConditionExpression"].as_str();
637            let expr_attr_names = parse_expression_attribute_names(&body);
638            let expr_attr_values = parse_expression_attribute_values(&body);
639
640            let existing_idx = table.find_item_index(&key);
641
642            if let Some(cond) = condition {
643                let existing = existing_idx.map(|i| &table.items[i]);
644                evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
645            }
646
647            let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
648
649            let mut result = json!({});
650            let mut kinesis_info = None;
651
652            if let Some(idx) = existing_idx {
653                let old_item = table.items[idx].clone();
654                if return_values == "ALL_OLD" {
655                    result["Attributes"] = json!(old_item.clone());
656                }
657
658                // Generate stream record before removing
659                if table.stream_enabled {
660                    if let Some(record) = crate::streams::generate_stream_record(
661                        table,
662                        "REMOVE",
663                        key.clone(),
664                        Some(old_item.clone()),
665                        None,
666                    ) {
667                        crate::streams::add_stream_record(table, record);
668                    }
669                }
670
671                // Capture kinesis delivery info
672                if table
673                    .kinesis_destinations
674                    .iter()
675                    .any(|d| d.destination_status == "ACTIVE")
676                {
677                    kinesis_info = Some((table.clone(), key.clone(), Some(old_item)));
678                }
679
680                table.items.remove(idx);
681                table.recalculate_stats();
682            }
683
684            let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
685            let return_icm = body["ReturnItemCollectionMetrics"]
686                .as_str()
687                .unwrap_or("NONE");
688
689            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
690                result["ConsumedCapacity"] = json!({
691                    "TableName": table_name,
692                    "CapacityUnits": 1.0,
693                });
694            }
695
696            if return_icm == "SIZE" {
697                result["ItemCollectionMetrics"] = json!({});
698            }
699
700            (result, kinesis_info)
701        };
702
703        // Deliver to Kinesis destinations outside the lock
704        if let Some((table, keys, old_image)) = kinesis_info {
705            self.deliver_to_kinesis_destinations(&table, "REMOVE", &keys, old_image.as_ref(), None);
706        }
707
708        Self::ok_json(result)
709    }
710
711    fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
712        let body = Self::parse_body(req)?;
713        let table_name = require_str(&body, "TableName")?;
714        let key = require_object(&body, "Key")?;
715
716        let mut state = self.state.write();
717        let table = get_table_mut(&mut state.tables, table_name)?;
718
719        validate_key_attributes_in_key(table, &key)?;
720
721        let condition = body["ConditionExpression"].as_str();
722        let expr_attr_names = parse_expression_attribute_names(&body);
723        let expr_attr_values = parse_expression_attribute_values(&body);
724        let update_expression = body["UpdateExpression"].as_str();
725
726        let existing_idx = table.find_item_index(&key);
727
728        if let Some(cond) = condition {
729            let existing = existing_idx.map(|i| &table.items[i]);
730            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
731        }
732
733        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
734
735        let is_insert = existing_idx.is_none();
736        let idx = match existing_idx {
737            Some(i) => i,
738            None => {
739                let mut new_item = HashMap::new();
740                for (k, v) in &key {
741                    new_item.insert(k.clone(), v.clone());
742                }
743                table.items.push(new_item);
744                table.items.len() - 1
745            }
746        };
747
748        // Capture old item for stream/kinesis (before update)
749        let needs_change_capture = table.stream_enabled
750            || table
751                .kinesis_destinations
752                .iter()
753                .any(|d| d.destination_status == "ACTIVE");
754        let old_item_for_stream = if needs_change_capture {
755            Some(table.items[idx].clone())
756        } else {
757            None
758        };
759
760        let old_item = if return_values == "ALL_OLD" {
761            Some(table.items[idx].clone())
762        } else {
763            None
764        };
765
766        if let Some(expr) = update_expression {
767            apply_update_expression(
768                &mut table.items[idx],
769                expr,
770                &expr_attr_names,
771                &expr_attr_values,
772            )?;
773        }
774
775        let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
776            Some(table.items[idx].clone())
777        } else {
778            None
779        };
780
781        let event_name = if is_insert { "INSERT" } else { "MODIFY" };
782        let new_item_for_stream = table.items[idx].clone();
783
784        // Generate stream record after update
785        if table.stream_enabled {
786            if let Some(record) = crate::streams::generate_stream_record(
787                table,
788                event_name,
789                key.clone(),
790                old_item_for_stream.clone(),
791                Some(new_item_for_stream.clone()),
792            ) {
793                crate::streams::add_stream_record(table, record);
794            }
795        }
796
797        // Capture kinesis delivery info
798        let kinesis_info = if table
799            .kinesis_destinations
800            .iter()
801            .any(|d| d.destination_status == "ACTIVE")
802        {
803            Some((
804                table.clone(),
805                event_name.to_string(),
806                key.clone(),
807                old_item_for_stream,
808                Some(new_item_for_stream),
809            ))
810        } else {
811            None
812        };
813
814        table.recalculate_stats();
815
816        // Release the write lock (drop `state`)
817        drop(state);
818
819        // Deliver to Kinesis destinations outside the lock
820        if let Some((tbl, ev, keys, old_image, new_image)) = kinesis_info {
821            self.deliver_to_kinesis_destinations(
822                &tbl,
823                &ev,
824                &keys,
825                old_image.as_ref(),
826                new_image.as_ref(),
827            );
828        }
829
830        let mut result = json!({});
831        if let Some(old) = old_item {
832            result["Attributes"] = json!(old);
833        } else if let Some(new) = new_item {
834            result["Attributes"] = json!(new);
835        }
836
837        Self::ok_json(result)
838    }
839
840    // ── Query & Scan ────────────────────────────────────────────────────
841
842    fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
843        let body = Self::parse_body(req)?;
844        let table_name = require_str(&body, "TableName")?;
845
846        let state = self.state.read();
847        let table = get_table(&state.tables, table_name)?;
848
849        let expr_attr_names = parse_expression_attribute_names(&body);
850        let expr_attr_values = parse_expression_attribute_values(&body);
851
852        let key_condition = body["KeyConditionExpression"].as_str();
853        let filter_expression = body["FilterExpression"].as_str();
854        let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
855        let limit = body["Limit"].as_i64().map(|l| l as usize);
856        let index_name = body["IndexName"].as_str();
857        let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
858            parse_key_map(&body["ExclusiveStartKey"]);
859
860        let (items_to_scan, hash_key_name, range_key_name): (
861            &[HashMap<String, AttributeValue>],
862            String,
863            Option<String>,
864        ) = if let Some(idx_name) = index_name {
865            if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
866                let hk = gsi
867                    .key_schema
868                    .iter()
869                    .find(|k| k.key_type == "HASH")
870                    .map(|k| k.attribute_name.clone())
871                    .unwrap_or_default();
872                let rk = gsi
873                    .key_schema
874                    .iter()
875                    .find(|k| k.key_type == "RANGE")
876                    .map(|k| k.attribute_name.clone());
877                (&table.items, hk, rk)
878            } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
879                let hk = lsi
880                    .key_schema
881                    .iter()
882                    .find(|k| k.key_type == "HASH")
883                    .map(|k| k.attribute_name.clone())
884                    .unwrap_or_default();
885                let rk = lsi
886                    .key_schema
887                    .iter()
888                    .find(|k| k.key_type == "RANGE")
889                    .map(|k| k.attribute_name.clone());
890                (&table.items, hk, rk)
891            } else {
892                return Err(AwsServiceError::aws_error(
893                    StatusCode::BAD_REQUEST,
894                    "ValidationException",
895                    format!("The table does not have the specified index: {idx_name}"),
896                ));
897            }
898        } else {
899            (
900                &table.items[..],
901                table.hash_key_name().to_string(),
902                table.range_key_name().map(|s| s.to_string()),
903            )
904        };
905
906        let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
907            .iter()
908            .filter(|item| {
909                if let Some(kc) = key_condition {
910                    evaluate_key_condition(
911                        kc,
912                        item,
913                        &hash_key_name,
914                        range_key_name.as_deref(),
915                        &expr_attr_names,
916                        &expr_attr_values,
917                    )
918                } else {
919                    true
920                }
921            })
922            .collect();
923
924        if let Some(ref rk) = range_key_name {
925            matched.sort_by(|a, b| {
926                let av = a.get(rk.as_str());
927                let bv = b.get(rk.as_str());
928                compare_attribute_values(av, bv)
929            });
930            if !scan_forward {
931                matched.reverse();
932            }
933        }
934
935        // For GSI queries, we need the table's primary key attributes to uniquely
936        // identify items (GSI keys are not unique).
937        let table_pk_hash = table.hash_key_name().to_string();
938        let table_pk_range = table.range_key_name().map(|s| s.to_string());
939        let is_gsi_query = index_name.is_some()
940            && (hash_key_name != table_pk_hash
941                || range_key_name.as_deref() != table_pk_range.as_deref());
942
943        // Apply ExclusiveStartKey: skip items up to and including the start key.
944        // For GSI queries the start key contains both index keys and table PK, so
945        // we must match on ALL of them to find the exact item.
946        if let Some(ref start_key) = exclusive_start_key {
947            if let Some(pos) = matched.iter().position(|item| {
948                let index_match =
949                    item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref());
950                if is_gsi_query {
951                    index_match
952                        && item_matches_key(
953                            item,
954                            start_key,
955                            &table_pk_hash,
956                            table_pk_range.as_deref(),
957                        )
958                } else {
959                    index_match
960                }
961            }) {
962                matched = matched.split_off(pos + 1);
963            }
964        }
965
966        if let Some(filter) = filter_expression {
967            matched.retain(|item| {
968                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
969            });
970        }
971
972        let scanned_count = matched.len();
973
974        let has_more = if let Some(lim) = limit {
975            let more = matched.len() > lim;
976            matched.truncate(lim);
977            more
978        } else {
979            false
980        };
981
982        // Build LastEvaluatedKey from the last returned item if there are more results.
983        // For GSI queries, include both the index keys and the table's primary key
984        // so the item can be uniquely identified on resume.
985        let last_evaluated_key = if has_more {
986            matched.last().map(|item| {
987                let mut key =
988                    extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref());
989                if is_gsi_query {
990                    let table_key =
991                        extract_key_for_schema(item, &table_pk_hash, table_pk_range.as_deref());
992                    key.extend(table_key);
993                }
994                key
995            })
996        } else {
997            None
998        };
999
1000        // Collect partition key values for contributor insights
1001        let insights_enabled = table.contributor_insights_status == "ENABLED";
1002        let pk_name = table.hash_key_name().to_string();
1003        let accessed_keys: Vec<String> = if insights_enabled {
1004            matched
1005                .iter()
1006                .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1007                .collect()
1008        } else {
1009            Vec::new()
1010        };
1011
1012        let items: Vec<Value> = matched
1013            .iter()
1014            .map(|item| {
1015                let projected = project_item(item, &body);
1016                json!(projected)
1017            })
1018            .collect();
1019
1020        let mut result = json!({
1021            "Items": items,
1022            "Count": items.len(),
1023            "ScannedCount": scanned_count,
1024        });
1025
1026        if let Some(lek) = last_evaluated_key {
1027            result["LastEvaluatedKey"] = json!(lek);
1028        }
1029
1030        drop(state);
1031
1032        if !accessed_keys.is_empty() {
1033            let mut state = self.state.write();
1034            if let Some(table) = state.tables.get_mut(table_name) {
1035                // Re-check insights status after acquiring write lock in case it
1036                // was disabled between the read and write lock acquisitions.
1037                if table.contributor_insights_status == "ENABLED" {
1038                    for key_str in accessed_keys {
1039                        *table
1040                            .contributor_insights_counters
1041                            .entry(key_str)
1042                            .or_insert(0) += 1;
1043                    }
1044                }
1045            }
1046        }
1047
1048        Self::ok_json(result)
1049    }
1050
1051    fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1052        let body = Self::parse_body(req)?;
1053        let table_name = require_str(&body, "TableName")?;
1054
1055        let state = self.state.read();
1056        let table = get_table(&state.tables, table_name)?;
1057
1058        let expr_attr_names = parse_expression_attribute_names(&body);
1059        let expr_attr_values = parse_expression_attribute_values(&body);
1060        let filter_expression = body["FilterExpression"].as_str();
1061        let limit = body["Limit"].as_i64().map(|l| l as usize);
1062        let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
1063            parse_key_map(&body["ExclusiveStartKey"]);
1064
1065        let hash_key_name = table.hash_key_name().to_string();
1066        let range_key_name = table.range_key_name().map(|s| s.to_string());
1067
1068        let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
1069
1070        // Apply ExclusiveStartKey: skip items up to and including the start key
1071        if let Some(ref start_key) = exclusive_start_key {
1072            if let Some(pos) = matched.iter().position(|item| {
1073                item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref())
1074            }) {
1075                matched = matched.split_off(pos + 1);
1076            }
1077        }
1078
1079        let scanned_count = matched.len();
1080
1081        if let Some(filter) = filter_expression {
1082            matched.retain(|item| {
1083                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
1084            });
1085        }
1086
1087        let has_more = if let Some(lim) = limit {
1088            let more = matched.len() > lim;
1089            matched.truncate(lim);
1090            more
1091        } else {
1092            false
1093        };
1094
1095        // Build LastEvaluatedKey from the last returned item if there are more results
1096        let last_evaluated_key = if has_more {
1097            matched
1098                .last()
1099                .map(|item| extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref()))
1100        } else {
1101            None
1102        };
1103
1104        // Collect partition key values for contributor insights
1105        let insights_enabled = table.contributor_insights_status == "ENABLED";
1106        let pk_name = table.hash_key_name().to_string();
1107        let accessed_keys: Vec<String> = if insights_enabled {
1108            matched
1109                .iter()
1110                .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1111                .collect()
1112        } else {
1113            Vec::new()
1114        };
1115
1116        let items: Vec<Value> = matched
1117            .iter()
1118            .map(|item| {
1119                let projected = project_item(item, &body);
1120                json!(projected)
1121            })
1122            .collect();
1123
1124        let mut result = json!({
1125            "Items": items,
1126            "Count": items.len(),
1127            "ScannedCount": scanned_count,
1128        });
1129
1130        if let Some(lek) = last_evaluated_key {
1131            result["LastEvaluatedKey"] = json!(lek);
1132        }
1133
1134        drop(state);
1135
1136        if !accessed_keys.is_empty() {
1137            let mut state = self.state.write();
1138            if let Some(table) = state.tables.get_mut(table_name) {
1139                // Re-check insights status after acquiring write lock in case it
1140                // was disabled between the read and write lock acquisitions.
1141                if table.contributor_insights_status == "ENABLED" {
1142                    for key_str in accessed_keys {
1143                        *table
1144                            .contributor_insights_counters
1145                            .entry(key_str)
1146                            .or_insert(0) += 1;
1147                    }
1148                }
1149            }
1150        }
1151
1152        Self::ok_json(result)
1153    }
1154
1155    // ── Batch Operations ────────────────────────────────────────────────
1156
1157    fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1158        let body = Self::parse_body(req)?;
1159
1160        validate_optional_enum_value(
1161            "returnConsumedCapacity",
1162            &body["ReturnConsumedCapacity"],
1163            &["INDEXES", "TOTAL", "NONE"],
1164        )?;
1165
1166        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1167
1168        let request_items = body["RequestItems"]
1169            .as_object()
1170            .ok_or_else(|| {
1171                AwsServiceError::aws_error(
1172                    StatusCode::BAD_REQUEST,
1173                    "ValidationException",
1174                    "RequestItems is required",
1175                )
1176            })?
1177            .clone();
1178
1179        let state = self.state.read();
1180        let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
1181        let mut consumed_capacity: Vec<Value> = Vec::new();
1182
1183        for (table_name, params) in &request_items {
1184            let table = get_table(&state.tables, table_name)?;
1185            let keys = params["Keys"].as_array().ok_or_else(|| {
1186                AwsServiceError::aws_error(
1187                    StatusCode::BAD_REQUEST,
1188                    "ValidationException",
1189                    "Keys is required",
1190                )
1191            })?;
1192
1193            let mut items = Vec::new();
1194            for key_val in keys {
1195                let key: HashMap<String, AttributeValue> =
1196                    serde_json::from_value(key_val.clone()).unwrap_or_default();
1197                if let Some(idx) = table.find_item_index(&key) {
1198                    items.push(json!(table.items[idx]));
1199                }
1200            }
1201            responses.insert(table_name.clone(), items);
1202
1203            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1204                consumed_capacity.push(json!({
1205                    "TableName": table_name,
1206                    "CapacityUnits": 1.0,
1207                }));
1208            }
1209        }
1210
1211        let mut result = json!({
1212            "Responses": responses,
1213            "UnprocessedKeys": {},
1214        });
1215
1216        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1217            result["ConsumedCapacity"] = json!(consumed_capacity);
1218        }
1219
1220        Self::ok_json(result)
1221    }
1222
1223    fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1224        let body = Self::parse_body(req)?;
1225
1226        validate_optional_enum_value(
1227            "returnConsumedCapacity",
1228            &body["ReturnConsumedCapacity"],
1229            &["INDEXES", "TOTAL", "NONE"],
1230        )?;
1231        validate_optional_enum_value(
1232            "returnItemCollectionMetrics",
1233            &body["ReturnItemCollectionMetrics"],
1234            &["SIZE", "NONE"],
1235        )?;
1236
1237        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1238        let return_icm = body["ReturnItemCollectionMetrics"]
1239            .as_str()
1240            .unwrap_or("NONE");
1241
1242        let request_items = body["RequestItems"]
1243            .as_object()
1244            .ok_or_else(|| {
1245                AwsServiceError::aws_error(
1246                    StatusCode::BAD_REQUEST,
1247                    "ValidationException",
1248                    "RequestItems is required",
1249                )
1250            })?
1251            .clone();
1252
1253        let mut state = self.state.write();
1254        let mut consumed_capacity: Vec<Value> = Vec::new();
1255        let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
1256
1257        for (table_name, requests) in &request_items {
1258            let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
1259                AwsServiceError::aws_error(
1260                    StatusCode::BAD_REQUEST,
1261                    "ResourceNotFoundException",
1262                    format!("Requested resource not found: Table: {table_name} not found"),
1263                )
1264            })?;
1265
1266            let reqs = requests.as_array().ok_or_else(|| {
1267                AwsServiceError::aws_error(
1268                    StatusCode::BAD_REQUEST,
1269                    "ValidationException",
1270                    "Request list must be an array",
1271                )
1272            })?;
1273
1274            for request in reqs {
1275                if let Some(put_req) = request.get("PutRequest") {
1276                    let item: HashMap<String, AttributeValue> =
1277                        serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
1278                    let key = extract_key(table, &item);
1279                    if let Some(idx) = table.find_item_index(&key) {
1280                        table.items[idx] = item;
1281                    } else {
1282                        table.items.push(item);
1283                    }
1284                } else if let Some(del_req) = request.get("DeleteRequest") {
1285                    let key: HashMap<String, AttributeValue> =
1286                        serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
1287                    if let Some(idx) = table.find_item_index(&key) {
1288                        table.items.remove(idx);
1289                    }
1290                }
1291            }
1292
1293            table.recalculate_stats();
1294
1295            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1296                consumed_capacity.push(json!({
1297                    "TableName": table_name,
1298                    "CapacityUnits": 1.0,
1299                }));
1300            }
1301
1302            if return_icm == "SIZE" {
1303                item_collection_metrics.insert(table_name.clone(), vec![]);
1304            }
1305        }
1306
1307        let mut result = json!({
1308            "UnprocessedItems": {},
1309        });
1310
1311        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1312            result["ConsumedCapacity"] = json!(consumed_capacity);
1313        }
1314
1315        if return_icm == "SIZE" {
1316            result["ItemCollectionMetrics"] = json!(item_collection_metrics);
1317        }
1318
1319        Self::ok_json(result)
1320    }
1321
1322    // ── Tags ────────────────────────────────────────────────────────────
1323
1324    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1325        let body = Self::parse_body(req)?;
1326        let resource_arn = require_str(&body, "ResourceArn")?;
1327        validate_required("Tags", &body["Tags"])?;
1328
1329        let mut state = self.state.write();
1330        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1331
1332        fakecloud_core::tags::apply_tags(&mut table.tags, &body, "Tags", "Key", "Value").map_err(
1333            |f| {
1334                AwsServiceError::aws_error(
1335                    StatusCode::BAD_REQUEST,
1336                    "ValidationException",
1337                    format!("{f} must be a list"),
1338                )
1339            },
1340        )?;
1341
1342        Self::ok_json(json!({}))
1343    }
1344
1345    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1346        let body = Self::parse_body(req)?;
1347        let resource_arn = require_str(&body, "ResourceArn")?;
1348        validate_required("TagKeys", &body["TagKeys"])?;
1349
1350        let mut state = self.state.write();
1351        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1352
1353        fakecloud_core::tags::remove_tags(&mut table.tags, &body, "TagKeys").map_err(|f| {
1354            AwsServiceError::aws_error(
1355                StatusCode::BAD_REQUEST,
1356                "ValidationException",
1357                format!("{f} must be a list"),
1358            )
1359        })?;
1360
1361        Self::ok_json(json!({}))
1362    }
1363
1364    fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1365        let body = Self::parse_body(req)?;
1366        let resource_arn = require_str(&body, "ResourceArn")?;
1367
1368        let state = self.state.read();
1369        let table = find_table_by_arn(&state.tables, resource_arn)?;
1370
1371        let tags = fakecloud_core::tags::tags_to_json(&table.tags, "Key", "Value");
1372
1373        Self::ok_json(json!({ "Tags": tags }))
1374    }
1375
1376    // ── Transactions ────────────────────────────────────────────────────
1377
1378    fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1379        let body = Self::parse_body(req)?;
1380        validate_optional_enum_value(
1381            "returnConsumedCapacity",
1382            &body["ReturnConsumedCapacity"],
1383            &["INDEXES", "TOTAL", "NONE"],
1384        )?;
1385        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1386            AwsServiceError::aws_error(
1387                StatusCode::BAD_REQUEST,
1388                "ValidationException",
1389                "TransactItems is required",
1390            )
1391        })?;
1392
1393        let state = self.state.read();
1394        let mut responses: Vec<Value> = Vec::new();
1395
1396        for ti in transact_items {
1397            let get = &ti["Get"];
1398            let table_name = get["TableName"].as_str().ok_or_else(|| {
1399                AwsServiceError::aws_error(
1400                    StatusCode::BAD_REQUEST,
1401                    "ValidationException",
1402                    "TableName is required in Get",
1403                )
1404            })?;
1405            let key: HashMap<String, AttributeValue> =
1406                serde_json::from_value(get["Key"].clone()).unwrap_or_default();
1407
1408            let table = get_table(&state.tables, table_name)?;
1409            match table.find_item_index(&key) {
1410                Some(idx) => {
1411                    responses.push(json!({ "Item": table.items[idx] }));
1412                }
1413                None => {
1414                    responses.push(json!({}));
1415                }
1416            }
1417        }
1418
1419        Self::ok_json(json!({ "Responses": responses }))
1420    }
1421
1422    fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1423        let body = Self::parse_body(req)?;
1424        validate_optional_string_length(
1425            "clientRequestToken",
1426            body["ClientRequestToken"].as_str(),
1427            1,
1428            36,
1429        )?;
1430        validate_optional_enum_value(
1431            "returnConsumedCapacity",
1432            &body["ReturnConsumedCapacity"],
1433            &["INDEXES", "TOTAL", "NONE"],
1434        )?;
1435        validate_optional_enum_value(
1436            "returnItemCollectionMetrics",
1437            &body["ReturnItemCollectionMetrics"],
1438            &["SIZE", "NONE"],
1439        )?;
1440        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1441            AwsServiceError::aws_error(
1442                StatusCode::BAD_REQUEST,
1443                "ValidationException",
1444                "TransactItems is required",
1445            )
1446        })?;
1447
1448        let mut state = self.state.write();
1449
1450        // First pass: validate all conditions
1451        let mut cancellation_reasons: Vec<Value> = Vec::new();
1452        let mut any_failed = false;
1453
1454        for ti in transact_items {
1455            if let Some(put) = ti.get("Put") {
1456                let table_name = put["TableName"].as_str().unwrap_or_default();
1457                let item: HashMap<String, AttributeValue> =
1458                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1459                let condition = put["ConditionExpression"].as_str();
1460
1461                if let Some(cond) = condition {
1462                    let table = get_table(&state.tables, table_name)?;
1463                    let expr_attr_names = parse_expression_attribute_names(put);
1464                    let expr_attr_values = parse_expression_attribute_values(put);
1465                    let key = extract_key(table, &item);
1466                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1467                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1468                        .is_err()
1469                    {
1470                        cancellation_reasons.push(json!({
1471                            "Code": "ConditionalCheckFailed",
1472                            "Message": "The conditional request failed"
1473                        }));
1474                        any_failed = true;
1475                        continue;
1476                    }
1477                }
1478                cancellation_reasons.push(json!({ "Code": "None" }));
1479            } else if let Some(delete) = ti.get("Delete") {
1480                let table_name = delete["TableName"].as_str().unwrap_or_default();
1481                let key: HashMap<String, AttributeValue> =
1482                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1483                let condition = delete["ConditionExpression"].as_str();
1484
1485                if let Some(cond) = condition {
1486                    let table = get_table(&state.tables, table_name)?;
1487                    let expr_attr_names = parse_expression_attribute_names(delete);
1488                    let expr_attr_values = parse_expression_attribute_values(delete);
1489                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1490                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1491                        .is_err()
1492                    {
1493                        cancellation_reasons.push(json!({
1494                            "Code": "ConditionalCheckFailed",
1495                            "Message": "The conditional request failed"
1496                        }));
1497                        any_failed = true;
1498                        continue;
1499                    }
1500                }
1501                cancellation_reasons.push(json!({ "Code": "None" }));
1502            } else if let Some(update) = ti.get("Update") {
1503                let table_name = update["TableName"].as_str().unwrap_or_default();
1504                let key: HashMap<String, AttributeValue> =
1505                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1506                let condition = update["ConditionExpression"].as_str();
1507
1508                if let Some(cond) = condition {
1509                    let table = get_table(&state.tables, table_name)?;
1510                    let expr_attr_names = parse_expression_attribute_names(update);
1511                    let expr_attr_values = parse_expression_attribute_values(update);
1512                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1513                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1514                        .is_err()
1515                    {
1516                        cancellation_reasons.push(json!({
1517                            "Code": "ConditionalCheckFailed",
1518                            "Message": "The conditional request failed"
1519                        }));
1520                        any_failed = true;
1521                        continue;
1522                    }
1523                }
1524                cancellation_reasons.push(json!({ "Code": "None" }));
1525            } else if let Some(check) = ti.get("ConditionCheck") {
1526                let table_name = check["TableName"].as_str().unwrap_or_default();
1527                let key: HashMap<String, AttributeValue> =
1528                    serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1529                let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1530
1531                let table = get_table(&state.tables, table_name)?;
1532                let expr_attr_names = parse_expression_attribute_names(check);
1533                let expr_attr_values = parse_expression_attribute_values(check);
1534                let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1535                if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1536                {
1537                    cancellation_reasons.push(json!({
1538                        "Code": "ConditionalCheckFailed",
1539                        "Message": "The conditional request failed"
1540                    }));
1541                    any_failed = true;
1542                    continue;
1543                }
1544                cancellation_reasons.push(json!({ "Code": "None" }));
1545            } else {
1546                cancellation_reasons.push(json!({ "Code": "None" }));
1547            }
1548        }
1549
1550        if any_failed {
1551            let error_body = json!({
1552                "__type": "TransactionCanceledException",
1553                "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1554                "CancellationReasons": cancellation_reasons
1555            });
1556            return Ok(AwsResponse::json(
1557                StatusCode::BAD_REQUEST,
1558                serde_json::to_vec(&error_body).unwrap(),
1559            ));
1560        }
1561
1562        // Second pass: apply all writes
1563        for ti in transact_items {
1564            if let Some(put) = ti.get("Put") {
1565                let table_name = put["TableName"].as_str().unwrap_or_default();
1566                let item: HashMap<String, AttributeValue> =
1567                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1568                let table = get_table_mut(&mut state.tables, table_name)?;
1569                let key = extract_key(table, &item);
1570                if let Some(idx) = table.find_item_index(&key) {
1571                    table.items[idx] = item;
1572                } else {
1573                    table.items.push(item);
1574                }
1575                table.recalculate_stats();
1576            } else if let Some(delete) = ti.get("Delete") {
1577                let table_name = delete["TableName"].as_str().unwrap_or_default();
1578                let key: HashMap<String, AttributeValue> =
1579                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1580                let table = get_table_mut(&mut state.tables, table_name)?;
1581                if let Some(idx) = table.find_item_index(&key) {
1582                    table.items.remove(idx);
1583                }
1584                table.recalculate_stats();
1585            } else if let Some(update) = ti.get("Update") {
1586                let table_name = update["TableName"].as_str().unwrap_or_default();
1587                let key: HashMap<String, AttributeValue> =
1588                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1589                let update_expression = update["UpdateExpression"].as_str();
1590                let expr_attr_names = parse_expression_attribute_names(update);
1591                let expr_attr_values = parse_expression_attribute_values(update);
1592
1593                let table = get_table_mut(&mut state.tables, table_name)?;
1594                let idx = match table.find_item_index(&key) {
1595                    Some(i) => i,
1596                    None => {
1597                        let mut new_item = HashMap::new();
1598                        for (k, v) in &key {
1599                            new_item.insert(k.clone(), v.clone());
1600                        }
1601                        table.items.push(new_item);
1602                        table.items.len() - 1
1603                    }
1604                };
1605
1606                if let Some(expr) = update_expression {
1607                    apply_update_expression(
1608                        &mut table.items[idx],
1609                        expr,
1610                        &expr_attr_names,
1611                        &expr_attr_values,
1612                    )?;
1613                }
1614                table.recalculate_stats();
1615            }
1616            // ConditionCheck: no write needed
1617        }
1618
1619        Self::ok_json(json!({}))
1620    }
1621
1622    fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1623        let body = Self::parse_body(req)?;
1624        let statement = require_str(&body, "Statement")?;
1625        let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1626
1627        execute_partiql_statement(&self.state, statement, &parameters)
1628    }
1629
1630    fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1631        let body = Self::parse_body(req)?;
1632        validate_optional_enum_value(
1633            "returnConsumedCapacity",
1634            &body["ReturnConsumedCapacity"],
1635            &["INDEXES", "TOTAL", "NONE"],
1636        )?;
1637        let statements = body["Statements"].as_array().ok_or_else(|| {
1638            AwsServiceError::aws_error(
1639                StatusCode::BAD_REQUEST,
1640                "ValidationException",
1641                "Statements is required",
1642            )
1643        })?;
1644
1645        let mut responses: Vec<Value> = Vec::new();
1646        for stmt_obj in statements {
1647            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1648            let parameters = stmt_obj["Parameters"]
1649                .as_array()
1650                .cloned()
1651                .unwrap_or_default();
1652
1653            match execute_partiql_statement(&self.state, statement, &parameters) {
1654                Ok(resp) => {
1655                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1656                    responses.push(resp_body);
1657                }
1658                Err(e) => {
1659                    responses.push(json!({
1660                        "Error": {
1661                            "Code": "ValidationException",
1662                            "Message": e.to_string()
1663                        }
1664                    }));
1665                }
1666            }
1667        }
1668
1669        Self::ok_json(json!({ "Responses": responses }))
1670    }
1671
1672    fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1673        let body = Self::parse_body(req)?;
1674        validate_optional_string_length(
1675            "clientRequestToken",
1676            body["ClientRequestToken"].as_str(),
1677            1,
1678            36,
1679        )?;
1680        validate_optional_enum_value(
1681            "returnConsumedCapacity",
1682            &body["ReturnConsumedCapacity"],
1683            &["INDEXES", "TOTAL", "NONE"],
1684        )?;
1685        let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1686            AwsServiceError::aws_error(
1687                StatusCode::BAD_REQUEST,
1688                "ValidationException",
1689                "TransactStatements is required",
1690            )
1691        })?;
1692
1693        // Collect all results; if any fail, return TransactionCanceledException
1694        let mut results: Vec<Result<Value, String>> = Vec::new();
1695        for stmt_obj in transact_statements {
1696            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1697            let parameters = stmt_obj["Parameters"]
1698                .as_array()
1699                .cloned()
1700                .unwrap_or_default();
1701
1702            match execute_partiql_statement(&self.state, statement, &parameters) {
1703                Ok(resp) => {
1704                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1705                    results.push(Ok(resp_body));
1706                }
1707                Err(e) => {
1708                    results.push(Err(e.to_string()));
1709                }
1710            }
1711        }
1712
1713        let any_failed = results.iter().any(|r| r.is_err());
1714        if any_failed {
1715            let reasons: Vec<Value> = results
1716                .iter()
1717                .map(|r| match r {
1718                    Ok(_) => json!({ "Code": "None" }),
1719                    Err(msg) => json!({
1720                        "Code": "ValidationException",
1721                        "Message": msg
1722                    }),
1723                })
1724                .collect();
1725            let error_body = json!({
1726                "__type": "TransactionCanceledException",
1727                "message": "Transaction cancelled due to validation errors",
1728                "CancellationReasons": reasons
1729            });
1730            return Ok(AwsResponse::json(
1731                StatusCode::BAD_REQUEST,
1732                serde_json::to_vec(&error_body).unwrap(),
1733            ));
1734        }
1735
1736        let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1737        Self::ok_json(json!({ "Responses": responses }))
1738    }
1739
1740    // ── TTL ─────────────────────────────────────────────────────────────
1741
1742    fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1743        let body = Self::parse_body(req)?;
1744        let table_name = require_str(&body, "TableName")?;
1745        let spec = &body["TimeToLiveSpecification"];
1746        let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1747            AwsServiceError::aws_error(
1748                StatusCode::BAD_REQUEST,
1749                "ValidationException",
1750                "TimeToLiveSpecification.AttributeName is required",
1751            )
1752        })?;
1753        let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1754            AwsServiceError::aws_error(
1755                StatusCode::BAD_REQUEST,
1756                "ValidationException",
1757                "TimeToLiveSpecification.Enabled is required",
1758            )
1759        })?;
1760
1761        let mut state = self.state.write();
1762        let table = get_table_mut(&mut state.tables, table_name)?;
1763
1764        if enabled {
1765            table.ttl_attribute = Some(attr_name.to_string());
1766            table.ttl_enabled = true;
1767        } else {
1768            table.ttl_enabled = false;
1769        }
1770
1771        Self::ok_json(json!({
1772            "TimeToLiveSpecification": {
1773                "AttributeName": attr_name,
1774                "Enabled": enabled
1775            }
1776        }))
1777    }
1778
1779    fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1780        let body = Self::parse_body(req)?;
1781        let table_name = require_str(&body, "TableName")?;
1782
1783        let state = self.state.read();
1784        let table = get_table(&state.tables, table_name)?;
1785
1786        let status = if table.ttl_enabled {
1787            "ENABLED"
1788        } else {
1789            "DISABLED"
1790        };
1791
1792        let mut desc = json!({
1793            "TimeToLiveDescription": {
1794                "TimeToLiveStatus": status
1795            }
1796        });
1797
1798        if let Some(ref attr) = table.ttl_attribute {
1799            desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1800        }
1801
1802        Self::ok_json(desc)
1803    }
1804
1805    // ── Resource Policies ───────────────────────────────────────────────
1806
1807    fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1808        let body = Self::parse_body(req)?;
1809        let resource_arn = require_str(&body, "ResourceArn")?;
1810        let policy = require_str(&body, "Policy")?;
1811
1812        let mut state = self.state.write();
1813        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1814        table.resource_policy = Some(policy.to_string());
1815
1816        let revision_id = uuid::Uuid::new_v4().to_string();
1817        Self::ok_json(json!({ "RevisionId": revision_id }))
1818    }
1819
1820    fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1821        let body = Self::parse_body(req)?;
1822        let resource_arn = require_str(&body, "ResourceArn")?;
1823
1824        let state = self.state.read();
1825        let table = find_table_by_arn(&state.tables, resource_arn)?;
1826
1827        match &table.resource_policy {
1828            Some(policy) => {
1829                let revision_id = uuid::Uuid::new_v4().to_string();
1830                Self::ok_json(json!({
1831                    "Policy": policy,
1832                    "RevisionId": revision_id
1833                }))
1834            }
1835            None => Self::ok_json(json!({ "Policy": null })),
1836        }
1837    }
1838
1839    fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1840        let body = Self::parse_body(req)?;
1841        let resource_arn = require_str(&body, "ResourceArn")?;
1842
1843        let mut state = self.state.write();
1844        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1845        table.resource_policy = None;
1846
1847        Self::ok_json(json!({}))
1848    }
1849
1850    // ── Stubs ──────────────────────────────────────────────────────────
1851
1852    fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1853        Self::ok_json(json!({
1854            "Endpoints": [{
1855                "Address": "dynamodb.us-east-1.amazonaws.com",
1856                "CachePeriodInMinutes": 1440
1857            }]
1858        }))
1859    }
1860
1861    fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1862        Self::ok_json(json!({
1863            "AccountMaxReadCapacityUnits": 80000,
1864            "AccountMaxWriteCapacityUnits": 80000,
1865            "TableMaxReadCapacityUnits": 40000,
1866            "TableMaxWriteCapacityUnits": 40000
1867        }))
1868    }
1869
1870    // ── Backups ────────────────────────────────────────────────────────
1871
1872    fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1873        let body = Self::parse_body(req)?;
1874        let table_name = require_str(&body, "TableName")?;
1875        let backup_name = require_str(&body, "BackupName")?;
1876
1877        let mut state = self.state.write();
1878        let table = get_table(&state.tables, table_name)?;
1879
1880        let backup_arn = format!(
1881            "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1882            state.region,
1883            state.account_id,
1884            table_name,
1885            Utc::now().format("%Y%m%d%H%M%S")
1886        );
1887        let now = Utc::now();
1888
1889        let backup = BackupDescription {
1890            backup_arn: backup_arn.clone(),
1891            backup_name: backup_name.to_string(),
1892            table_name: table_name.to_string(),
1893            table_arn: table.arn.clone(),
1894            backup_status: "AVAILABLE".to_string(),
1895            backup_type: "USER".to_string(),
1896            backup_creation_date: now,
1897            key_schema: table.key_schema.clone(),
1898            attribute_definitions: table.attribute_definitions.clone(),
1899            provisioned_throughput: table.provisioned_throughput.clone(),
1900            billing_mode: table.billing_mode.clone(),
1901            item_count: table.item_count,
1902            size_bytes: table.size_bytes,
1903            items: table.items.clone(),
1904        };
1905
1906        state.backups.insert(backup_arn.clone(), backup);
1907
1908        Self::ok_json(json!({
1909            "BackupDetails": {
1910                "BackupArn": backup_arn,
1911                "BackupName": backup_name,
1912                "BackupStatus": "AVAILABLE",
1913                "BackupType": "USER",
1914                "BackupCreationDateTime": now.timestamp() as f64,
1915                "BackupSizeBytes": 0
1916            }
1917        }))
1918    }
1919
1920    fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1921        let body = Self::parse_body(req)?;
1922        let backup_arn = require_str(&body, "BackupArn")?;
1923
1924        let mut state = self.state.write();
1925        let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1926            AwsServiceError::aws_error(
1927                StatusCode::BAD_REQUEST,
1928                "BackupNotFoundException",
1929                format!("Backup not found: {backup_arn}"),
1930            )
1931        })?;
1932
1933        Self::ok_json(json!({
1934            "BackupDescription": {
1935                "BackupDetails": {
1936                    "BackupArn": backup.backup_arn,
1937                    "BackupName": backup.backup_name,
1938                    "BackupStatus": "DELETED",
1939                    "BackupType": backup.backup_type,
1940                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1941                    "BackupSizeBytes": backup.size_bytes
1942                },
1943                "SourceTableDetails": {
1944                    "TableName": backup.table_name,
1945                    "TableArn": backup.table_arn,
1946                    "TableId": uuid::Uuid::new_v4().to_string(),
1947                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1948                        "AttributeName": ks.attribute_name,
1949                        "KeyType": ks.key_type
1950                    })).collect::<Vec<_>>(),
1951                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1952                    "ProvisionedThroughput": {
1953                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1954                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1955                    },
1956                    "ItemCount": backup.item_count,
1957                    "BillingMode": backup.billing_mode,
1958                    "TableSizeBytes": backup.size_bytes
1959                },
1960                "SourceTableFeatureDetails": {}
1961            }
1962        }))
1963    }
1964
1965    fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1966        let body = Self::parse_body(req)?;
1967        let backup_arn = require_str(&body, "BackupArn")?;
1968
1969        let state = self.state.read();
1970        let backup = state.backups.get(backup_arn).ok_or_else(|| {
1971            AwsServiceError::aws_error(
1972                StatusCode::BAD_REQUEST,
1973                "BackupNotFoundException",
1974                format!("Backup not found: {backup_arn}"),
1975            )
1976        })?;
1977
1978        Self::ok_json(json!({
1979            "BackupDescription": {
1980                "BackupDetails": {
1981                    "BackupArn": backup.backup_arn,
1982                    "BackupName": backup.backup_name,
1983                    "BackupStatus": backup.backup_status,
1984                    "BackupType": backup.backup_type,
1985                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1986                    "BackupSizeBytes": backup.size_bytes
1987                },
1988                "SourceTableDetails": {
1989                    "TableName": backup.table_name,
1990                    "TableArn": backup.table_arn,
1991                    "TableId": uuid::Uuid::new_v4().to_string(),
1992                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1993                        "AttributeName": ks.attribute_name,
1994                        "KeyType": ks.key_type
1995                    })).collect::<Vec<_>>(),
1996                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1997                    "ProvisionedThroughput": {
1998                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1999                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
2000                    },
2001                    "ItemCount": backup.item_count,
2002                    "BillingMode": backup.billing_mode,
2003                    "TableSizeBytes": backup.size_bytes
2004                },
2005                "SourceTableFeatureDetails": {}
2006            }
2007        }))
2008    }
2009
2010    fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2011        let body = Self::parse_body(req)?;
2012        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2013        validate_optional_string_length(
2014            "exclusiveStartBackupArn",
2015            body["ExclusiveStartBackupArn"].as_str(),
2016            37,
2017            1024,
2018        )?;
2019        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2020        validate_optional_enum_value(
2021            "backupType",
2022            &body["BackupType"],
2023            &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
2024        )?;
2025        let table_name = body["TableName"].as_str();
2026
2027        let state = self.state.read();
2028        let summaries: Vec<Value> = state
2029            .backups
2030            .values()
2031            .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
2032            .map(|b| {
2033                json!({
2034                    "TableName": b.table_name,
2035                    "TableArn": b.table_arn,
2036                    "BackupArn": b.backup_arn,
2037                    "BackupName": b.backup_name,
2038                    "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
2039                    "BackupStatus": b.backup_status,
2040                    "BackupType": b.backup_type,
2041                    "BackupSizeBytes": b.size_bytes
2042                })
2043            })
2044            .collect();
2045
2046        Self::ok_json(json!({
2047            "BackupSummaries": summaries
2048        }))
2049    }
2050
2051    fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2052        let body = Self::parse_body(req)?;
2053        let backup_arn = require_str(&body, "BackupArn")?;
2054        let target_table_name = require_str(&body, "TargetTableName")?;
2055
2056        let mut state = self.state.write();
2057        let backup = state.backups.get(backup_arn).ok_or_else(|| {
2058            AwsServiceError::aws_error(
2059                StatusCode::BAD_REQUEST,
2060                "BackupNotFoundException",
2061                format!("Backup not found: {backup_arn}"),
2062            )
2063        })?;
2064
2065        if state.tables.contains_key(target_table_name) {
2066            return Err(AwsServiceError::aws_error(
2067                StatusCode::BAD_REQUEST,
2068                "TableAlreadyExistsException",
2069                format!("Table already exists: {target_table_name}"),
2070            ));
2071        }
2072
2073        let now = Utc::now();
2074        let arn = format!(
2075            "arn:aws:dynamodb:{}:{}:table/{}",
2076            state.region, state.account_id, target_table_name
2077        );
2078
2079        let restored_items = backup.items.clone();
2080        let mut table = DynamoTable {
2081            name: target_table_name.to_string(),
2082            arn: arn.clone(),
2083            key_schema: backup.key_schema.clone(),
2084            attribute_definitions: backup.attribute_definitions.clone(),
2085            provisioned_throughput: backup.provisioned_throughput.clone(),
2086            items: restored_items,
2087            gsi: Vec::new(),
2088            lsi: Vec::new(),
2089            tags: HashMap::new(),
2090            created_at: now,
2091            status: "ACTIVE".to_string(),
2092            item_count: 0,
2093            size_bytes: 0,
2094            billing_mode: backup.billing_mode.clone(),
2095            ttl_attribute: None,
2096            ttl_enabled: false,
2097            resource_policy: None,
2098            pitr_enabled: false,
2099            kinesis_destinations: Vec::new(),
2100            contributor_insights_status: "DISABLED".to_string(),
2101            contributor_insights_counters: HashMap::new(),
2102            stream_enabled: false,
2103            stream_view_type: None,
2104            stream_arn: None,
2105            stream_records: Arc::new(RwLock::new(Vec::new())),
2106            sse_type: None,
2107            sse_kms_key_arn: None,
2108        };
2109        table.recalculate_stats();
2110
2111        let desc = build_table_description(&table);
2112        state.tables.insert(target_table_name.to_string(), table);
2113
2114        Self::ok_json(json!({
2115            "TableDescription": desc
2116        }))
2117    }
2118
2119    fn restore_table_to_point_in_time(
2120        &self,
2121        req: &AwsRequest,
2122    ) -> Result<AwsResponse, AwsServiceError> {
2123        let body = Self::parse_body(req)?;
2124        let target_table_name = require_str(&body, "TargetTableName")?;
2125        let source_table_name = body["SourceTableName"].as_str();
2126        let source_table_arn = body["SourceTableArn"].as_str();
2127
2128        let mut state = self.state.write();
2129
2130        // Resolve source table
2131        let source = if let Some(name) = source_table_name {
2132            get_table(&state.tables, name)?.clone()
2133        } else if let Some(arn) = source_table_arn {
2134            find_table_by_arn(&state.tables, arn)?.clone()
2135        } else {
2136            return Err(AwsServiceError::aws_error(
2137                StatusCode::BAD_REQUEST,
2138                "ValidationException",
2139                "SourceTableName or SourceTableArn is required",
2140            ));
2141        };
2142
2143        if state.tables.contains_key(target_table_name) {
2144            return Err(AwsServiceError::aws_error(
2145                StatusCode::BAD_REQUEST,
2146                "TableAlreadyExistsException",
2147                format!("Table already exists: {target_table_name}"),
2148            ));
2149        }
2150
2151        let now = Utc::now();
2152        let arn = format!(
2153            "arn:aws:dynamodb:{}:{}:table/{}",
2154            state.region, state.account_id, target_table_name
2155        );
2156
2157        let mut table = DynamoTable {
2158            name: target_table_name.to_string(),
2159            arn: arn.clone(),
2160            key_schema: source.key_schema.clone(),
2161            attribute_definitions: source.attribute_definitions.clone(),
2162            provisioned_throughput: source.provisioned_throughput.clone(),
2163            items: source.items.clone(),
2164            gsi: Vec::new(),
2165            lsi: Vec::new(),
2166            tags: HashMap::new(),
2167            created_at: now,
2168            status: "ACTIVE".to_string(),
2169            item_count: 0,
2170            size_bytes: 0,
2171            billing_mode: source.billing_mode.clone(),
2172            ttl_attribute: None,
2173            ttl_enabled: false,
2174            resource_policy: None,
2175            pitr_enabled: false,
2176            kinesis_destinations: Vec::new(),
2177            contributor_insights_status: "DISABLED".to_string(),
2178            contributor_insights_counters: HashMap::new(),
2179            stream_enabled: false,
2180            stream_view_type: None,
2181            stream_arn: None,
2182            stream_records: Arc::new(RwLock::new(Vec::new())),
2183            sse_type: None,
2184            sse_kms_key_arn: None,
2185        };
2186        table.recalculate_stats();
2187
2188        let desc = build_table_description(&table);
2189        state.tables.insert(target_table_name.to_string(), table);
2190
2191        Self::ok_json(json!({
2192            "TableDescription": desc
2193        }))
2194    }
2195
2196    fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2197        let body = Self::parse_body(req)?;
2198        let table_name = require_str(&body, "TableName")?;
2199
2200        let pitr_spec = body["PointInTimeRecoverySpecification"]
2201            .as_object()
2202            .ok_or_else(|| {
2203                AwsServiceError::aws_error(
2204                    StatusCode::BAD_REQUEST,
2205                    "ValidationException",
2206                    "PointInTimeRecoverySpecification is required",
2207                )
2208            })?;
2209        let enabled = pitr_spec
2210            .get("PointInTimeRecoveryEnabled")
2211            .and_then(|v| v.as_bool())
2212            .unwrap_or(false);
2213
2214        let mut state = self.state.write();
2215        let table = get_table_mut(&mut state.tables, table_name)?;
2216        table.pitr_enabled = enabled;
2217
2218        let status = if enabled { "ENABLED" } else { "DISABLED" };
2219        Self::ok_json(json!({
2220            "ContinuousBackupsDescription": {
2221                "ContinuousBackupsStatus": status,
2222                "PointInTimeRecoveryDescription": {
2223                    "PointInTimeRecoveryStatus": status,
2224                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2225                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
2226                }
2227            }
2228        }))
2229    }
2230
2231    fn describe_continuous_backups(
2232        &self,
2233        req: &AwsRequest,
2234    ) -> Result<AwsResponse, AwsServiceError> {
2235        let body = Self::parse_body(req)?;
2236        let table_name = require_str(&body, "TableName")?;
2237
2238        let state = self.state.read();
2239        let table = get_table(&state.tables, table_name)?;
2240
2241        let status = if table.pitr_enabled {
2242            "ENABLED"
2243        } else {
2244            "DISABLED"
2245        };
2246        Self::ok_json(json!({
2247            "ContinuousBackupsDescription": {
2248                "ContinuousBackupsStatus": status,
2249                "PointInTimeRecoveryDescription": {
2250                    "PointInTimeRecoveryStatus": status,
2251                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2252                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
2253                }
2254            }
2255        }))
2256    }
2257
2258    // ── Global Tables ──────────────────────────────────────────────────
2259
2260    fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2261        let body = Self::parse_body(req)?;
2262        let global_table_name = require_str(&body, "GlobalTableName")?;
2263        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2264
2265        let replication_group = body["ReplicationGroup"]
2266            .as_array()
2267            .ok_or_else(|| {
2268                AwsServiceError::aws_error(
2269                    StatusCode::BAD_REQUEST,
2270                    "ValidationException",
2271                    "ReplicationGroup is required",
2272                )
2273            })?
2274            .iter()
2275            .filter_map(|r| {
2276                r["RegionName"].as_str().map(|rn| ReplicaDescription {
2277                    region_name: rn.to_string(),
2278                    replica_status: "ACTIVE".to_string(),
2279                })
2280            })
2281            .collect::<Vec<_>>();
2282
2283        let mut state = self.state.write();
2284
2285        if state.global_tables.contains_key(global_table_name) {
2286            return Err(AwsServiceError::aws_error(
2287                StatusCode::BAD_REQUEST,
2288                "GlobalTableAlreadyExistsException",
2289                format!("Global table already exists: {global_table_name}"),
2290            ));
2291        }
2292
2293        let arn = format!(
2294            "arn:aws:dynamodb::{}:global-table/{}",
2295            state.account_id, global_table_name
2296        );
2297        let now = Utc::now();
2298
2299        let gt = GlobalTableDescription {
2300            global_table_name: global_table_name.to_string(),
2301            global_table_arn: arn.clone(),
2302            global_table_status: "ACTIVE".to_string(),
2303            creation_date: now,
2304            replication_group: replication_group.clone(),
2305        };
2306
2307        state
2308            .global_tables
2309            .insert(global_table_name.to_string(), gt);
2310
2311        Self::ok_json(json!({
2312            "GlobalTableDescription": {
2313                "GlobalTableName": global_table_name,
2314                "GlobalTableArn": arn,
2315                "GlobalTableStatus": "ACTIVE",
2316                "CreationDateTime": now.timestamp() as f64,
2317                "ReplicationGroup": replication_group.iter().map(|r| json!({
2318                    "RegionName": r.region_name,
2319                    "ReplicaStatus": r.replica_status
2320                })).collect::<Vec<_>>()
2321            }
2322        }))
2323    }
2324
2325    fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2326        let body = Self::parse_body(req)?;
2327        let global_table_name = require_str(&body, "GlobalTableName")?;
2328        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2329
2330        let state = self.state.read();
2331        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2332            AwsServiceError::aws_error(
2333                StatusCode::BAD_REQUEST,
2334                "GlobalTableNotFoundException",
2335                format!("Global table not found: {global_table_name}"),
2336            )
2337        })?;
2338
2339        Self::ok_json(json!({
2340            "GlobalTableDescription": {
2341                "GlobalTableName": gt.global_table_name,
2342                "GlobalTableArn": gt.global_table_arn,
2343                "GlobalTableStatus": gt.global_table_status,
2344                "CreationDateTime": gt.creation_date.timestamp() as f64,
2345                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2346                    "RegionName": r.region_name,
2347                    "ReplicaStatus": r.replica_status
2348                })).collect::<Vec<_>>()
2349            }
2350        }))
2351    }
2352
2353    fn describe_global_table_settings(
2354        &self,
2355        req: &AwsRequest,
2356    ) -> Result<AwsResponse, AwsServiceError> {
2357        let body = Self::parse_body(req)?;
2358        let global_table_name = require_str(&body, "GlobalTableName")?;
2359        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2360
2361        let state = self.state.read();
2362        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2363            AwsServiceError::aws_error(
2364                StatusCode::BAD_REQUEST,
2365                "GlobalTableNotFoundException",
2366                format!("Global table not found: {global_table_name}"),
2367            )
2368        })?;
2369
2370        let replica_settings: Vec<Value> = gt
2371            .replication_group
2372            .iter()
2373            .map(|r| {
2374                json!({
2375                    "RegionName": r.region_name,
2376                    "ReplicaStatus": r.replica_status,
2377                    "ReplicaProvisionedReadCapacityUnits": 0,
2378                    "ReplicaProvisionedWriteCapacityUnits": 0
2379                })
2380            })
2381            .collect();
2382
2383        Self::ok_json(json!({
2384            "GlobalTableName": gt.global_table_name,
2385            "ReplicaSettings": replica_settings
2386        }))
2387    }
2388
2389    fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2390        let body = Self::parse_body(req)?;
2391        validate_optional_string_length(
2392            "exclusiveStartGlobalTableName",
2393            body["ExclusiveStartGlobalTableName"].as_str(),
2394            3,
2395            255,
2396        )?;
2397        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
2398        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2399
2400        let state = self.state.read();
2401        let tables: Vec<Value> = state
2402            .global_tables
2403            .values()
2404            .take(limit)
2405            .map(|gt| {
2406                json!({
2407                    "GlobalTableName": gt.global_table_name,
2408                    "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2409                        "RegionName": r.region_name
2410                    })).collect::<Vec<_>>()
2411                })
2412            })
2413            .collect();
2414
2415        Self::ok_json(json!({
2416            "GlobalTables": tables
2417        }))
2418    }
2419
2420    fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2421        let body = Self::parse_body(req)?;
2422        let global_table_name = require_str(&body, "GlobalTableName")?;
2423        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2424        validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
2425
2426        let mut state = self.state.write();
2427        let gt = state
2428            .global_tables
2429            .get_mut(global_table_name)
2430            .ok_or_else(|| {
2431                AwsServiceError::aws_error(
2432                    StatusCode::BAD_REQUEST,
2433                    "GlobalTableNotFoundException",
2434                    format!("Global table not found: {global_table_name}"),
2435                )
2436            })?;
2437
2438        if let Some(updates) = body["ReplicaUpdates"].as_array() {
2439            for update in updates {
2440                if let Some(create) = update["Create"].as_object() {
2441                    if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
2442                        gt.replication_group.push(ReplicaDescription {
2443                            region_name: region.to_string(),
2444                            replica_status: "ACTIVE".to_string(),
2445                        });
2446                    }
2447                }
2448                if let Some(delete) = update["Delete"].as_object() {
2449                    if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
2450                        gt.replication_group.retain(|r| r.region_name != region);
2451                    }
2452                }
2453            }
2454        }
2455
2456        Self::ok_json(json!({
2457            "GlobalTableDescription": {
2458                "GlobalTableName": gt.global_table_name,
2459                "GlobalTableArn": gt.global_table_arn,
2460                "GlobalTableStatus": gt.global_table_status,
2461                "CreationDateTime": gt.creation_date.timestamp() as f64,
2462                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2463                    "RegionName": r.region_name,
2464                    "ReplicaStatus": r.replica_status
2465                })).collect::<Vec<_>>()
2466            }
2467        }))
2468    }
2469
2470    fn update_global_table_settings(
2471        &self,
2472        req: &AwsRequest,
2473    ) -> Result<AwsResponse, AwsServiceError> {
2474        let body = Self::parse_body(req)?;
2475        let global_table_name = require_str(&body, "GlobalTableName")?;
2476        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2477        validate_optional_enum_value(
2478            "globalTableBillingMode",
2479            &body["GlobalTableBillingMode"],
2480            &["PROVISIONED", "PAY_PER_REQUEST"],
2481        )?;
2482        validate_optional_range_i64(
2483            "globalTableProvisionedWriteCapacityUnits",
2484            body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
2485            1,
2486            i64::MAX,
2487        )?;
2488
2489        let state = self.state.read();
2490        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2491            AwsServiceError::aws_error(
2492                StatusCode::BAD_REQUEST,
2493                "GlobalTableNotFoundException",
2494                format!("Global table not found: {global_table_name}"),
2495            )
2496        })?;
2497
2498        let replica_settings: Vec<Value> = gt
2499            .replication_group
2500            .iter()
2501            .map(|r| {
2502                json!({
2503                    "RegionName": r.region_name,
2504                    "ReplicaStatus": r.replica_status,
2505                    "ReplicaProvisionedReadCapacityUnits": 0,
2506                    "ReplicaProvisionedWriteCapacityUnits": 0
2507                })
2508            })
2509            .collect();
2510
2511        Self::ok_json(json!({
2512            "GlobalTableName": gt.global_table_name,
2513            "ReplicaSettings": replica_settings
2514        }))
2515    }
2516
2517    fn describe_table_replica_auto_scaling(
2518        &self,
2519        req: &AwsRequest,
2520    ) -> Result<AwsResponse, AwsServiceError> {
2521        let body = Self::parse_body(req)?;
2522        let table_name = require_str(&body, "TableName")?;
2523
2524        let state = self.state.read();
2525        let table = get_table(&state.tables, table_name)?;
2526
2527        Self::ok_json(json!({
2528            "TableAutoScalingDescription": {
2529                "TableName": table.name,
2530                "TableStatus": table.status,
2531                "Replicas": []
2532            }
2533        }))
2534    }
2535
2536    fn update_table_replica_auto_scaling(
2537        &self,
2538        req: &AwsRequest,
2539    ) -> Result<AwsResponse, AwsServiceError> {
2540        let body = Self::parse_body(req)?;
2541        let table_name = require_str(&body, "TableName")?;
2542
2543        let state = self.state.read();
2544        let table = get_table(&state.tables, table_name)?;
2545
2546        Self::ok_json(json!({
2547            "TableAutoScalingDescription": {
2548                "TableName": table.name,
2549                "TableStatus": table.status,
2550                "Replicas": []
2551            }
2552        }))
2553    }
2554
2555    // ── Kinesis Streaming ──────────────────────────────────────────────
2556
2557    fn enable_kinesis_streaming_destination(
2558        &self,
2559        req: &AwsRequest,
2560    ) -> Result<AwsResponse, AwsServiceError> {
2561        let body = Self::parse_body(req)?;
2562        let table_name = require_str(&body, "TableName")?;
2563        let stream_arn = require_str(&body, "StreamArn")?;
2564        let precision = body["EnableKinesisStreamingConfiguration"]
2565            ["ApproximateCreationDateTimePrecision"]
2566            .as_str()
2567            .unwrap_or("MILLISECOND");
2568
2569        let mut state = self.state.write();
2570        let table = get_table_mut(&mut state.tables, table_name)?;
2571
2572        table.kinesis_destinations.push(KinesisDestination {
2573            stream_arn: stream_arn.to_string(),
2574            destination_status: "ACTIVE".to_string(),
2575            approximate_creation_date_time_precision: precision.to_string(),
2576        });
2577
2578        Self::ok_json(json!({
2579            "TableName": table_name,
2580            "StreamArn": stream_arn,
2581            "DestinationStatus": "ACTIVE",
2582            "EnableKinesisStreamingConfiguration": {
2583                "ApproximateCreationDateTimePrecision": precision
2584            }
2585        }))
2586    }
2587
2588    fn disable_kinesis_streaming_destination(
2589        &self,
2590        req: &AwsRequest,
2591    ) -> Result<AwsResponse, AwsServiceError> {
2592        let body = Self::parse_body(req)?;
2593        let table_name = require_str(&body, "TableName")?;
2594        let stream_arn = require_str(&body, "StreamArn")?;
2595
2596        let mut state = self.state.write();
2597        let table = get_table_mut(&mut state.tables, table_name)?;
2598
2599        if let Some(dest) = table
2600            .kinesis_destinations
2601            .iter_mut()
2602            .find(|d| d.stream_arn == stream_arn)
2603        {
2604            dest.destination_status = "DISABLED".to_string();
2605        }
2606
2607        Self::ok_json(json!({
2608            "TableName": table_name,
2609            "StreamArn": stream_arn,
2610            "DestinationStatus": "DISABLED"
2611        }))
2612    }
2613
2614    fn describe_kinesis_streaming_destination(
2615        &self,
2616        req: &AwsRequest,
2617    ) -> Result<AwsResponse, AwsServiceError> {
2618        let body = Self::parse_body(req)?;
2619        let table_name = require_str(&body, "TableName")?;
2620
2621        let state = self.state.read();
2622        let table = get_table(&state.tables, table_name)?;
2623
2624        let destinations: Vec<Value> = table
2625            .kinesis_destinations
2626            .iter()
2627            .map(|d| {
2628                json!({
2629                    "StreamArn": d.stream_arn,
2630                    "DestinationStatus": d.destination_status,
2631                    "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2632                })
2633            })
2634            .collect();
2635
2636        Self::ok_json(json!({
2637            "TableName": table_name,
2638            "KinesisDataStreamDestinations": destinations
2639        }))
2640    }
2641
2642    fn update_kinesis_streaming_destination(
2643        &self,
2644        req: &AwsRequest,
2645    ) -> Result<AwsResponse, AwsServiceError> {
2646        let body = Self::parse_body(req)?;
2647        let table_name = require_str(&body, "TableName")?;
2648        let stream_arn = require_str(&body, "StreamArn")?;
2649        let precision = body["UpdateKinesisStreamingConfiguration"]
2650            ["ApproximateCreationDateTimePrecision"]
2651            .as_str()
2652            .unwrap_or("MILLISECOND");
2653
2654        let mut state = self.state.write();
2655        let table = get_table_mut(&mut state.tables, table_name)?;
2656
2657        if let Some(dest) = table
2658            .kinesis_destinations
2659            .iter_mut()
2660            .find(|d| d.stream_arn == stream_arn)
2661        {
2662            dest.approximate_creation_date_time_precision = precision.to_string();
2663        }
2664
2665        Self::ok_json(json!({
2666            "TableName": table_name,
2667            "StreamArn": stream_arn,
2668            "DestinationStatus": "ACTIVE",
2669            "UpdateKinesisStreamingConfiguration": {
2670                "ApproximateCreationDateTimePrecision": precision
2671            }
2672        }))
2673    }
2674
2675    // ── Contributor Insights ───────────────────────────────────────────
2676
2677    fn describe_contributor_insights(
2678        &self,
2679        req: &AwsRequest,
2680    ) -> Result<AwsResponse, AwsServiceError> {
2681        let body = Self::parse_body(req)?;
2682        let table_name = require_str(&body, "TableName")?;
2683        let index_name = body["IndexName"].as_str();
2684
2685        let state = self.state.read();
2686        let table = get_table(&state.tables, table_name)?;
2687
2688        let top = table.top_contributors(10);
2689        let contributors: Vec<Value> = top
2690            .iter()
2691            .map(|(key, count)| {
2692                json!({
2693                    "Key": key,
2694                    "Count": count
2695                })
2696            })
2697            .collect();
2698
2699        let mut result = json!({
2700            "TableName": table_name,
2701            "ContributorInsightsStatus": table.contributor_insights_status,
2702            "ContributorInsightsRuleList": ["DynamoDBContributorInsights"],
2703            "TopContributors": contributors
2704        });
2705        if let Some(idx) = index_name {
2706            result["IndexName"] = json!(idx);
2707        }
2708
2709        Self::ok_json(result)
2710    }
2711
2712    fn update_contributor_insights(
2713        &self,
2714        req: &AwsRequest,
2715    ) -> Result<AwsResponse, AwsServiceError> {
2716        let body = Self::parse_body(req)?;
2717        let table_name = require_str(&body, "TableName")?;
2718        let action = require_str(&body, "ContributorInsightsAction")?;
2719        let index_name = body["IndexName"].as_str();
2720
2721        let mut state = self.state.write();
2722        let table = get_table_mut(&mut state.tables, table_name)?;
2723
2724        let status = match action {
2725            "ENABLE" => "ENABLED",
2726            "DISABLE" => "DISABLED",
2727            _ => {
2728                return Err(AwsServiceError::aws_error(
2729                    StatusCode::BAD_REQUEST,
2730                    "ValidationException",
2731                    format!("Invalid ContributorInsightsAction: {action}"),
2732                ))
2733            }
2734        };
2735        table.contributor_insights_status = status.to_string();
2736        if status == "DISABLED" {
2737            table.contributor_insights_counters.clear();
2738        }
2739
2740        let mut result = json!({
2741            "TableName": table_name,
2742            "ContributorInsightsStatus": status
2743        });
2744        if let Some(idx) = index_name {
2745            result["IndexName"] = json!(idx);
2746        }
2747
2748        Self::ok_json(result)
2749    }
2750
2751    fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2752        let body = Self::parse_body(req)?;
2753        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2754        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2755        let table_name = body["TableName"].as_str();
2756
2757        let state = self.state.read();
2758        let summaries: Vec<Value> = state
2759            .tables
2760            .values()
2761            .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2762            .map(|t| {
2763                json!({
2764                    "TableName": t.name,
2765                    "ContributorInsightsStatus": t.contributor_insights_status
2766                })
2767            })
2768            .collect();
2769
2770        Self::ok_json(json!({
2771            "ContributorInsightsSummaries": summaries
2772        }))
2773    }
2774
2775    // ── Import/Export ──────────────────────────────────────────────────
2776
2777    fn export_table_to_point_in_time(
2778        &self,
2779        req: &AwsRequest,
2780    ) -> Result<AwsResponse, AwsServiceError> {
2781        let body = Self::parse_body(req)?;
2782        let table_arn = require_str(&body, "TableArn")?;
2783        let s3_bucket = require_str(&body, "S3Bucket")?;
2784        let s3_prefix = body["S3Prefix"].as_str();
2785        let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2786
2787        let state = self.state.read();
2788        // Verify table exists and get items
2789        let table = find_table_by_arn(&state.tables, table_arn)?;
2790        let items = table.items.clone();
2791        let item_count = items.len() as i64;
2792
2793        let now = Utc::now();
2794        let export_arn = format!(
2795            "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2796            state.region,
2797            state.account_id,
2798            table_arn.rsplit('/').next().unwrap_or("unknown"),
2799            uuid::Uuid::new_v4()
2800        );
2801
2802        drop(state);
2803
2804        // Serialize items as JSON Lines and write to S3
2805        let mut json_lines = String::new();
2806        for item in &items {
2807            let item_json = if export_format == "DYNAMODB_JSON" {
2808                json!({ "Item": item })
2809            } else {
2810                json!(item)
2811            };
2812            json_lines.push_str(&serde_json::to_string(&item_json).unwrap_or_default());
2813            json_lines.push('\n');
2814        }
2815        let data_size = json_lines.len() as i64;
2816
2817        // Build S3 key for the export data
2818        let s3_key = if let Some(prefix) = s3_prefix {
2819            format!("{prefix}/data/manifest-files.json")
2820        } else {
2821            "data/manifest-files.json".to_string()
2822        };
2823
2824        // Write to S3 if we have access to S3 state
2825        let mut export_failed = false;
2826        let mut failure_reason = String::new();
2827        if let Some(ref s3_state) = self.s3_state {
2828            let mut s3 = s3_state.write();
2829            if let Some(bucket) = s3.buckets.get_mut(s3_bucket) {
2830                let etag = uuid::Uuid::new_v4().to_string().replace('-', "");
2831                let obj = fakecloud_s3::state::S3Object {
2832                    key: s3_key.clone(),
2833                    data: bytes::Bytes::from(json_lines),
2834                    content_type: "application/json".to_string(),
2835                    etag,
2836                    size: data_size as u64,
2837                    last_modified: now,
2838                    metadata: HashMap::new(),
2839                    storage_class: "STANDARD".to_string(),
2840                    tags: HashMap::new(),
2841                    acl_grants: Vec::new(),
2842                    acl_owner_id: None,
2843                    parts_count: None,
2844                    part_sizes: None,
2845                    sse_algorithm: None,
2846                    sse_kms_key_id: None,
2847                    bucket_key_enabled: None,
2848                    version_id: None,
2849                    is_delete_marker: false,
2850                    content_encoding: None,
2851                    website_redirect_location: None,
2852                    restore_ongoing: None,
2853                    restore_expiry: None,
2854                    checksum_algorithm: None,
2855                    checksum_value: None,
2856                    lock_mode: None,
2857                    lock_retain_until: None,
2858                    lock_legal_hold: None,
2859                };
2860                bucket.objects.insert(s3_key, obj);
2861            } else {
2862                export_failed = true;
2863                failure_reason = format!("S3 bucket does not exist: {s3_bucket}");
2864            }
2865        }
2866
2867        let export_status = if export_failed { "FAILED" } else { "COMPLETED" };
2868
2869        let export = ExportDescription {
2870            export_arn: export_arn.clone(),
2871            export_status: export_status.to_string(),
2872            table_arn: table_arn.to_string(),
2873            s3_bucket: s3_bucket.to_string(),
2874            s3_prefix: s3_prefix.map(|s| s.to_string()),
2875            export_format: export_format.to_string(),
2876            start_time: now,
2877            end_time: now,
2878            export_time: now,
2879            item_count,
2880            billed_size_bytes: data_size,
2881        };
2882
2883        let mut state = self.state.write();
2884        state.exports.insert(export_arn.clone(), export);
2885
2886        let mut response = json!({
2887            "ExportDescription": {
2888                "ExportArn": export_arn,
2889                "ExportStatus": export_status,
2890                "TableArn": table_arn,
2891                "S3Bucket": s3_bucket,
2892                "S3Prefix": s3_prefix,
2893                "ExportFormat": export_format,
2894                "StartTime": now.timestamp() as f64,
2895                "EndTime": now.timestamp() as f64,
2896                "ExportTime": now.timestamp() as f64,
2897                "ItemCount": item_count,
2898                "BilledSizeBytes": data_size
2899            }
2900        });
2901        if export_failed {
2902            response["ExportDescription"]["FailureCode"] = json!("S3NoSuchBucket");
2903            response["ExportDescription"]["FailureMessage"] = json!(failure_reason);
2904        }
2905        Self::ok_json(response)
2906    }
2907
2908    fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2909        let body = Self::parse_body(req)?;
2910        let export_arn = require_str(&body, "ExportArn")?;
2911
2912        let state = self.state.read();
2913        let export = state.exports.get(export_arn).ok_or_else(|| {
2914            AwsServiceError::aws_error(
2915                StatusCode::BAD_REQUEST,
2916                "ExportNotFoundException",
2917                format!("Export not found: {export_arn}"),
2918            )
2919        })?;
2920
2921        Self::ok_json(json!({
2922            "ExportDescription": {
2923                "ExportArn": export.export_arn,
2924                "ExportStatus": export.export_status,
2925                "TableArn": export.table_arn,
2926                "S3Bucket": export.s3_bucket,
2927                "S3Prefix": export.s3_prefix,
2928                "ExportFormat": export.export_format,
2929                "StartTime": export.start_time.timestamp() as f64,
2930                "EndTime": export.end_time.timestamp() as f64,
2931                "ExportTime": export.export_time.timestamp() as f64,
2932                "ItemCount": export.item_count,
2933                "BilledSizeBytes": export.billed_size_bytes
2934            }
2935        }))
2936    }
2937
2938    fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2939        let body = Self::parse_body(req)?;
2940        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2941        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2942        let table_arn = body["TableArn"].as_str();
2943
2944        let state = self.state.read();
2945        let summaries: Vec<Value> = state
2946            .exports
2947            .values()
2948            .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2949            .map(|e| {
2950                json!({
2951                    "ExportArn": e.export_arn,
2952                    "ExportStatus": e.export_status,
2953                    "TableArn": e.table_arn
2954                })
2955            })
2956            .collect();
2957
2958        Self::ok_json(json!({
2959            "ExportSummaries": summaries
2960        }))
2961    }
2962
2963    fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2964        let body = Self::parse_body(req)?;
2965        let input_format = require_str(&body, "InputFormat")?;
2966        let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2967            AwsServiceError::aws_error(
2968                StatusCode::BAD_REQUEST,
2969                "ValidationException",
2970                "S3BucketSource is required",
2971            )
2972        })?;
2973        let s3_bucket = s3_source
2974            .get("S3Bucket")
2975            .and_then(|v| v.as_str())
2976            .unwrap_or("");
2977        let s3_key_prefix = s3_source
2978            .get("S3KeyPrefix")
2979            .and_then(|v| v.as_str())
2980            .unwrap_or("");
2981
2982        let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2983            AwsServiceError::aws_error(
2984                StatusCode::BAD_REQUEST,
2985                "ValidationException",
2986                "TableCreationParameters is required",
2987            )
2988        })?;
2989        let table_name = table_params
2990            .get("TableName")
2991            .and_then(|v| v.as_str())
2992            .ok_or_else(|| {
2993                AwsServiceError::aws_error(
2994                    StatusCode::BAD_REQUEST,
2995                    "ValidationException",
2996                    "TableCreationParameters.TableName is required",
2997                )
2998            })?;
2999
3000        let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
3001        let attribute_definitions = parse_attribute_definitions(
3002            table_params
3003                .get("AttributeDefinitions")
3004                .unwrap_or(&Value::Null),
3005        )?;
3006
3007        // Read items from S3 if we have access
3008        let mut imported_items: Vec<HashMap<String, Value>> = Vec::new();
3009        let mut processed_size_bytes: i64 = 0;
3010        if let Some(ref s3_state) = self.s3_state {
3011            let s3 = s3_state.read();
3012            let bucket = s3.buckets.get(s3_bucket).ok_or_else(|| {
3013                AwsServiceError::aws_error(
3014                    StatusCode::BAD_REQUEST,
3015                    "ImportConflictException",
3016                    format!("S3 bucket does not exist: {s3_bucket}"),
3017                )
3018            })?;
3019            // Find all objects under the prefix and try to parse JSON Lines from each
3020            let prefix = if s3_key_prefix.is_empty() {
3021                String::new()
3022            } else {
3023                s3_key_prefix.to_string()
3024            };
3025            for (key, obj) in &bucket.objects {
3026                if !prefix.is_empty() && !key.starts_with(&prefix) {
3027                    continue;
3028                }
3029                let data = std::str::from_utf8(&obj.data).unwrap_or("");
3030                processed_size_bytes += obj.size as i64;
3031                for line in data.lines() {
3032                    let line = line.trim();
3033                    if line.is_empty() {
3034                        continue;
3035                    }
3036                    if let Ok(parsed) = serde_json::from_str::<Value>(line) {
3037                        // DYNAMODB_JSON format wraps items in {"Item": {...}}
3038                        let item = if input_format == "DYNAMODB_JSON" {
3039                            if let Some(item_obj) = parsed.get("Item") {
3040                                item_obj.as_object().cloned().unwrap_or_default()
3041                            } else {
3042                                parsed.as_object().cloned().unwrap_or_default()
3043                            }
3044                        } else {
3045                            parsed.as_object().cloned().unwrap_or_default()
3046                        };
3047                        if !item.is_empty() {
3048                            imported_items
3049                                .push(item.into_iter().collect::<HashMap<String, Value>>());
3050                        }
3051                    }
3052                }
3053            }
3054        }
3055
3056        let mut state = self.state.write();
3057
3058        if state.tables.contains_key(table_name) {
3059            return Err(AwsServiceError::aws_error(
3060                StatusCode::BAD_REQUEST,
3061                "ResourceInUseException",
3062                format!("Table already exists: {table_name}"),
3063            ));
3064        }
3065
3066        let now = Utc::now();
3067        let table_arn = format!(
3068            "arn:aws:dynamodb:{}:{}:table/{}",
3069            state.region, state.account_id, table_name
3070        );
3071        let import_arn = format!(
3072            "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
3073            state.region,
3074            state.account_id,
3075            table_name,
3076            uuid::Uuid::new_v4()
3077        );
3078
3079        let processed_item_count = imported_items.len() as i64;
3080
3081        let mut table = DynamoTable {
3082            name: table_name.to_string(),
3083            arn: table_arn.clone(),
3084            key_schema,
3085            attribute_definitions,
3086            provisioned_throughput: ProvisionedThroughput {
3087                read_capacity_units: 0,
3088                write_capacity_units: 0,
3089            },
3090            items: imported_items,
3091            gsi: Vec::new(),
3092            lsi: Vec::new(),
3093            tags: HashMap::new(),
3094            created_at: now,
3095            status: "ACTIVE".to_string(),
3096            item_count: 0,
3097            size_bytes: 0,
3098            billing_mode: "PAY_PER_REQUEST".to_string(),
3099            ttl_attribute: None,
3100            ttl_enabled: false,
3101            resource_policy: None,
3102            pitr_enabled: false,
3103            kinesis_destinations: Vec::new(),
3104            contributor_insights_status: "DISABLED".to_string(),
3105            contributor_insights_counters: HashMap::new(),
3106            stream_enabled: false,
3107            stream_view_type: None,
3108            stream_arn: None,
3109            stream_records: Arc::new(RwLock::new(Vec::new())),
3110            sse_type: None,
3111            sse_kms_key_arn: None,
3112        };
3113        table.recalculate_stats();
3114        state.tables.insert(table_name.to_string(), table);
3115
3116        let import_desc = ImportDescription {
3117            import_arn: import_arn.clone(),
3118            import_status: "COMPLETED".to_string(),
3119            table_arn: table_arn.clone(),
3120            table_name: table_name.to_string(),
3121            s3_bucket_source: s3_bucket.to_string(),
3122            input_format: input_format.to_string(),
3123            start_time: now,
3124            end_time: now,
3125            processed_item_count,
3126            processed_size_bytes,
3127        };
3128        state.imports.insert(import_arn.clone(), import_desc);
3129
3130        let table_ref = state.tables.get(table_name).unwrap();
3131        let ks: Vec<Value> = table_ref
3132            .key_schema
3133            .iter()
3134            .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3135            .collect();
3136        let ad: Vec<Value> = table_ref
3137            .attribute_definitions
3138            .iter()
3139            .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3140            .collect();
3141
3142        Self::ok_json(json!({
3143            "ImportTableDescription": {
3144                "ImportArn": import_arn,
3145                "ImportStatus": "COMPLETED",
3146                "TableArn": table_arn,
3147                "TableId": uuid::Uuid::new_v4().to_string(),
3148                "S3BucketSource": {
3149                    "S3Bucket": s3_bucket
3150                },
3151                "InputFormat": input_format,
3152                "TableCreationParameters": {
3153                    "TableName": table_name,
3154                    "KeySchema": ks,
3155                    "AttributeDefinitions": ad
3156                },
3157                "StartTime": now.timestamp() as f64,
3158                "EndTime": now.timestamp() as f64,
3159                "ProcessedItemCount": processed_item_count,
3160                "ProcessedSizeBytes": processed_size_bytes
3161            }
3162        }))
3163    }
3164
3165    fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3166        let body = Self::parse_body(req)?;
3167        let import_arn = require_str(&body, "ImportArn")?;
3168
3169        let state = self.state.read();
3170        let import = state.imports.get(import_arn).ok_or_else(|| {
3171            AwsServiceError::aws_error(
3172                StatusCode::BAD_REQUEST,
3173                "ImportNotFoundException",
3174                format!("Import not found: {import_arn}"),
3175            )
3176        })?;
3177
3178        Self::ok_json(json!({
3179            "ImportTableDescription": {
3180                "ImportArn": import.import_arn,
3181                "ImportStatus": import.import_status,
3182                "TableArn": import.table_arn,
3183                "S3BucketSource": {
3184                    "S3Bucket": import.s3_bucket_source
3185                },
3186                "InputFormat": import.input_format,
3187                "StartTime": import.start_time.timestamp() as f64,
3188                "EndTime": import.end_time.timestamp() as f64,
3189                "ProcessedItemCount": import.processed_item_count,
3190                "ProcessedSizeBytes": import.processed_size_bytes
3191            }
3192        }))
3193    }
3194
3195    fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3196        let body = Self::parse_body(req)?;
3197        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
3198        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
3199        validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
3200        let table_arn = body["TableArn"].as_str();
3201
3202        let state = self.state.read();
3203        let summaries: Vec<Value> = state
3204            .imports
3205            .values()
3206            .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
3207            .map(|i| {
3208                json!({
3209                    "ImportArn": i.import_arn,
3210                    "ImportStatus": i.import_status,
3211                    "TableArn": i.table_arn
3212                })
3213            })
3214            .collect();
3215
3216        Self::ok_json(json!({
3217            "ImportSummaryList": summaries
3218        }))
3219    }
3220}
3221
3222#[async_trait]
3223impl AwsService for DynamoDbService {
3224    fn service_name(&self) -> &str {
3225        "dynamodb"
3226    }
3227
3228    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3229        match req.action.as_str() {
3230            "CreateTable" => self.create_table(&req),
3231            "DeleteTable" => self.delete_table(&req),
3232            "DescribeTable" => self.describe_table(&req),
3233            "ListTables" => self.list_tables(&req),
3234            "UpdateTable" => self.update_table(&req),
3235            "PutItem" => self.put_item(&req),
3236            "GetItem" => self.get_item(&req),
3237            "DeleteItem" => self.delete_item(&req),
3238            "UpdateItem" => self.update_item(&req),
3239            "Query" => self.query(&req),
3240            "Scan" => self.scan(&req),
3241            "BatchGetItem" => self.batch_get_item(&req),
3242            "BatchWriteItem" => self.batch_write_item(&req),
3243            "TagResource" => self.tag_resource(&req),
3244            "UntagResource" => self.untag_resource(&req),
3245            "ListTagsOfResource" => self.list_tags_of_resource(&req),
3246            "TransactGetItems" => self.transact_get_items(&req),
3247            "TransactWriteItems" => self.transact_write_items(&req),
3248            "ExecuteStatement" => self.execute_statement(&req),
3249            "BatchExecuteStatement" => self.batch_execute_statement(&req),
3250            "ExecuteTransaction" => self.execute_transaction(&req),
3251            "UpdateTimeToLive" => self.update_time_to_live(&req),
3252            "DescribeTimeToLive" => self.describe_time_to_live(&req),
3253            "PutResourcePolicy" => self.put_resource_policy(&req),
3254            "GetResourcePolicy" => self.get_resource_policy(&req),
3255            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
3256            // Stubs
3257            "DescribeEndpoints" => self.describe_endpoints(&req),
3258            "DescribeLimits" => self.describe_limits(&req),
3259            // Backups
3260            "CreateBackup" => self.create_backup(&req),
3261            "DeleteBackup" => self.delete_backup(&req),
3262            "DescribeBackup" => self.describe_backup(&req),
3263            "ListBackups" => self.list_backups(&req),
3264            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
3265            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
3266            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
3267            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
3268            // Global tables
3269            "CreateGlobalTable" => self.create_global_table(&req),
3270            "DescribeGlobalTable" => self.describe_global_table(&req),
3271            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
3272            "ListGlobalTables" => self.list_global_tables(&req),
3273            "UpdateGlobalTable" => self.update_global_table(&req),
3274            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
3275            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
3276            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
3277            // Kinesis streaming
3278            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
3279            "DisableKinesisStreamingDestination" => {
3280                self.disable_kinesis_streaming_destination(&req)
3281            }
3282            "DescribeKinesisStreamingDestination" => {
3283                self.describe_kinesis_streaming_destination(&req)
3284            }
3285            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
3286            // Contributor insights
3287            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
3288            "UpdateContributorInsights" => self.update_contributor_insights(&req),
3289            "ListContributorInsights" => self.list_contributor_insights(&req),
3290            // Import/Export
3291            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
3292            "DescribeExport" => self.describe_export(&req),
3293            "ListExports" => self.list_exports(&req),
3294            "ImportTable" => self.import_table(&req),
3295            "DescribeImport" => self.describe_import(&req),
3296            "ListImports" => self.list_imports(&req),
3297            _ => Err(AwsServiceError::action_not_implemented(
3298                "dynamodb",
3299                &req.action,
3300            )),
3301        }
3302    }
3303
3304    fn supported_actions(&self) -> &[&str] {
3305        &[
3306            "CreateTable",
3307            "DeleteTable",
3308            "DescribeTable",
3309            "ListTables",
3310            "UpdateTable",
3311            "PutItem",
3312            "GetItem",
3313            "DeleteItem",
3314            "UpdateItem",
3315            "Query",
3316            "Scan",
3317            "BatchGetItem",
3318            "BatchWriteItem",
3319            "TagResource",
3320            "UntagResource",
3321            "ListTagsOfResource",
3322            "TransactGetItems",
3323            "TransactWriteItems",
3324            "ExecuteStatement",
3325            "BatchExecuteStatement",
3326            "ExecuteTransaction",
3327            "UpdateTimeToLive",
3328            "DescribeTimeToLive",
3329            "PutResourcePolicy",
3330            "GetResourcePolicy",
3331            "DeleteResourcePolicy",
3332            "DescribeEndpoints",
3333            "DescribeLimits",
3334            "CreateBackup",
3335            "DeleteBackup",
3336            "DescribeBackup",
3337            "ListBackups",
3338            "RestoreTableFromBackup",
3339            "RestoreTableToPointInTime",
3340            "UpdateContinuousBackups",
3341            "DescribeContinuousBackups",
3342            "CreateGlobalTable",
3343            "DescribeGlobalTable",
3344            "DescribeGlobalTableSettings",
3345            "ListGlobalTables",
3346            "UpdateGlobalTable",
3347            "UpdateGlobalTableSettings",
3348            "DescribeTableReplicaAutoScaling",
3349            "UpdateTableReplicaAutoScaling",
3350            "EnableKinesisStreamingDestination",
3351            "DisableKinesisStreamingDestination",
3352            "DescribeKinesisStreamingDestination",
3353            "UpdateKinesisStreamingDestination",
3354            "DescribeContributorInsights",
3355            "UpdateContributorInsights",
3356            "ListContributorInsights",
3357            "ExportTableToPointInTime",
3358            "DescribeExport",
3359            "ListExports",
3360            "ImportTable",
3361            "DescribeImport",
3362            "ListImports",
3363        ]
3364    }
3365}
3366
3367// ── Helper functions ────────────────────────────────────────────────────
3368
3369fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
3370    body[field].as_str().ok_or_else(|| {
3371        AwsServiceError::aws_error(
3372            StatusCode::BAD_REQUEST,
3373            "ValidationException",
3374            format!("{field} is required"),
3375        )
3376    })
3377}
3378
3379fn require_object(
3380    body: &Value,
3381    field: &str,
3382) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
3383    let obj = body[field].as_object().ok_or_else(|| {
3384        AwsServiceError::aws_error(
3385            StatusCode::BAD_REQUEST,
3386            "ValidationException",
3387            format!("{field} is required"),
3388        )
3389    })?;
3390    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3391}
3392
3393fn get_table<'a>(
3394    tables: &'a HashMap<String, DynamoTable>,
3395    name: &str,
3396) -> Result<&'a DynamoTable, AwsServiceError> {
3397    tables.get(name).ok_or_else(|| {
3398        AwsServiceError::aws_error(
3399            StatusCode::BAD_REQUEST,
3400            "ResourceNotFoundException",
3401            format!("Requested resource not found: Table: {name} not found"),
3402        )
3403    })
3404}
3405
3406fn get_table_mut<'a>(
3407    tables: &'a mut HashMap<String, DynamoTable>,
3408    name: &str,
3409) -> Result<&'a mut DynamoTable, AwsServiceError> {
3410    tables.get_mut(name).ok_or_else(|| {
3411        AwsServiceError::aws_error(
3412            StatusCode::BAD_REQUEST,
3413            "ResourceNotFoundException",
3414            format!("Requested resource not found: Table: {name} not found"),
3415        )
3416    })
3417}
3418
3419fn find_table_by_arn<'a>(
3420    tables: &'a HashMap<String, DynamoTable>,
3421    arn: &str,
3422) -> Result<&'a DynamoTable, AwsServiceError> {
3423    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
3424        AwsServiceError::aws_error(
3425            StatusCode::BAD_REQUEST,
3426            "ResourceNotFoundException",
3427            format!("Requested resource not found: {arn}"),
3428        )
3429    })
3430}
3431
3432fn find_table_by_arn_mut<'a>(
3433    tables: &'a mut HashMap<String, DynamoTable>,
3434    arn: &str,
3435) -> Result<&'a mut DynamoTable, AwsServiceError> {
3436    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
3437        AwsServiceError::aws_error(
3438            StatusCode::BAD_REQUEST,
3439            "ResourceNotFoundException",
3440            format!("Requested resource not found: {arn}"),
3441        )
3442    })
3443}
3444
3445fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
3446    let arr = val.as_array().ok_or_else(|| {
3447        AwsServiceError::aws_error(
3448            StatusCode::BAD_REQUEST,
3449            "ValidationException",
3450            "KeySchema is required",
3451        )
3452    })?;
3453    Ok(arr
3454        .iter()
3455        .map(|elem| KeySchemaElement {
3456            attribute_name: elem["AttributeName"]
3457                .as_str()
3458                .unwrap_or_default()
3459                .to_string(),
3460            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
3461        })
3462        .collect())
3463}
3464
3465fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
3466    let arr = val.as_array().ok_or_else(|| {
3467        AwsServiceError::aws_error(
3468            StatusCode::BAD_REQUEST,
3469            "ValidationException",
3470            "AttributeDefinitions is required",
3471        )
3472    })?;
3473    Ok(arr
3474        .iter()
3475        .map(|elem| AttributeDefinition {
3476            attribute_name: elem["AttributeName"]
3477                .as_str()
3478                .unwrap_or_default()
3479                .to_string(),
3480            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
3481        })
3482        .collect())
3483}
3484
3485fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
3486    Ok(ProvisionedThroughput {
3487        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
3488        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
3489    })
3490}
3491
3492fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
3493    let Some(arr) = val.as_array() else {
3494        return Vec::new();
3495    };
3496    arr.iter()
3497        .filter_map(|g| {
3498            Some(GlobalSecondaryIndex {
3499                index_name: g["IndexName"].as_str()?.to_string(),
3500                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
3501                projection: parse_projection(&g["Projection"]),
3502                provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
3503                    .ok(),
3504            })
3505        })
3506        .collect()
3507}
3508
3509fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
3510    let Some(arr) = val.as_array() else {
3511        return Vec::new();
3512    };
3513    arr.iter()
3514        .filter_map(|l| {
3515            Some(LocalSecondaryIndex {
3516                index_name: l["IndexName"].as_str()?.to_string(),
3517                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
3518                projection: parse_projection(&l["Projection"]),
3519            })
3520        })
3521        .collect()
3522}
3523
3524fn parse_projection(val: &Value) -> Projection {
3525    Projection {
3526        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
3527        non_key_attributes: val["NonKeyAttributes"]
3528            .as_array()
3529            .map(|arr| {
3530                arr.iter()
3531                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3532                    .collect()
3533            })
3534            .unwrap_or_default(),
3535    }
3536}
3537
3538fn parse_tags(val: &Value) -> HashMap<String, String> {
3539    let mut tags = HashMap::new();
3540    if let Some(arr) = val.as_array() {
3541        for tag in arr {
3542            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
3543                tags.insert(k.to_string(), v.to_string());
3544            }
3545        }
3546    }
3547    tags
3548}
3549
3550fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
3551    let mut names = HashMap::new();
3552    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
3553        for (k, v) in obj {
3554            if let Some(s) = v.as_str() {
3555                names.insert(k.clone(), s.to_string());
3556            }
3557        }
3558    }
3559    names
3560}
3561
3562fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
3563    let mut values = HashMap::new();
3564    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
3565        for (k, v) in obj {
3566            values.insert(k.clone(), v.clone());
3567        }
3568    }
3569    values
3570}
3571
3572fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
3573    if name.starts_with('#') {
3574        expr_attr_names
3575            .get(name)
3576            .cloned()
3577            .unwrap_or_else(|| name.to_string())
3578    } else {
3579        name.to_string()
3580    }
3581}
3582
3583fn extract_key(
3584    table: &DynamoTable,
3585    item: &HashMap<String, AttributeValue>,
3586) -> HashMap<String, AttributeValue> {
3587    let mut key = HashMap::new();
3588    let hash_key = table.hash_key_name();
3589    if let Some(v) = item.get(hash_key) {
3590        key.insert(hash_key.to_string(), v.clone());
3591    }
3592    if let Some(range_key) = table.range_key_name() {
3593        if let Some(v) = item.get(range_key) {
3594            key.insert(range_key.to_string(), v.clone());
3595        }
3596    }
3597    key
3598}
3599
3600/// Parse a JSON object into a key map (used for ExclusiveStartKey).
3601fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
3602    let obj = value.as_object()?;
3603    if obj.is_empty() {
3604        return None;
3605    }
3606    Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3607}
3608
3609/// Check whether an item's key attributes match the given key map.
3610fn item_matches_key(
3611    item: &HashMap<String, AttributeValue>,
3612    key: &HashMap<String, AttributeValue>,
3613    hash_key_name: &str,
3614    range_key_name: Option<&str>,
3615) -> bool {
3616    let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
3617        (Some(a), Some(b)) => a == b,
3618        _ => false,
3619    };
3620    if !hash_match {
3621        return false;
3622    }
3623    match range_key_name {
3624        Some(rk) => match (item.get(rk), key.get(rk)) {
3625            (Some(a), Some(b)) => a == b,
3626            (None, None) => true,
3627            _ => false,
3628        },
3629        None => true,
3630    }
3631}
3632
3633/// Extract the primary key from an item given explicit key attribute names.
3634fn extract_key_for_schema(
3635    item: &HashMap<String, AttributeValue>,
3636    hash_key_name: &str,
3637    range_key_name: Option<&str>,
3638) -> HashMap<String, AttributeValue> {
3639    let mut key = HashMap::new();
3640    if let Some(v) = item.get(hash_key_name) {
3641        key.insert(hash_key_name.to_string(), v.clone());
3642    }
3643    if let Some(rk) = range_key_name {
3644        if let Some(v) = item.get(rk) {
3645            key.insert(rk.to_string(), v.clone());
3646        }
3647    }
3648    key
3649}
3650
3651fn validate_key_in_item(
3652    table: &DynamoTable,
3653    item: &HashMap<String, AttributeValue>,
3654) -> Result<(), AwsServiceError> {
3655    let hash_key = table.hash_key_name();
3656    if !item.contains_key(hash_key) {
3657        return Err(AwsServiceError::aws_error(
3658            StatusCode::BAD_REQUEST,
3659            "ValidationException",
3660            format!("Missing the key {hash_key} in the item"),
3661        ));
3662    }
3663    if let Some(range_key) = table.range_key_name() {
3664        if !item.contains_key(range_key) {
3665            return Err(AwsServiceError::aws_error(
3666                StatusCode::BAD_REQUEST,
3667                "ValidationException",
3668                format!("Missing the key {range_key} in the item"),
3669            ));
3670        }
3671    }
3672    Ok(())
3673}
3674
3675fn validate_key_attributes_in_key(
3676    table: &DynamoTable,
3677    key: &HashMap<String, AttributeValue>,
3678) -> Result<(), AwsServiceError> {
3679    let hash_key = table.hash_key_name();
3680    if !key.contains_key(hash_key) {
3681        return Err(AwsServiceError::aws_error(
3682            StatusCode::BAD_REQUEST,
3683            "ValidationException",
3684            format!("Missing the key {hash_key} in the item"),
3685        ));
3686    }
3687    Ok(())
3688}
3689
3690fn project_item(
3691    item: &HashMap<String, AttributeValue>,
3692    body: &Value,
3693) -> HashMap<String, AttributeValue> {
3694    let projection = body["ProjectionExpression"].as_str();
3695    match projection {
3696        Some(proj) if !proj.is_empty() => {
3697            let expr_attr_names = parse_expression_attribute_names(body);
3698            let attrs: Vec<String> = proj
3699                .split(',')
3700                .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3701                .collect();
3702            let mut result = HashMap::new();
3703            for attr in &attrs {
3704                if let Some(v) = resolve_nested_path(item, attr) {
3705                    insert_nested_value(&mut result, attr, v);
3706                }
3707            }
3708            result
3709        }
3710        _ => item.clone(),
3711    }
3712}
3713
3714/// Resolve expression attribute names within each segment of a projection path.
3715/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
3716fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3717    // Split on dots, resolve each part, rejoin
3718    let mut result = String::new();
3719    for (i, segment) in path.split('.').enumerate() {
3720        if i > 0 {
3721            result.push('.');
3722        }
3723        // A segment might be like "#n" or "people[0]" or "#attr[0]"
3724        if let Some(bracket_pos) = segment.find('[') {
3725            let key_part = &segment[..bracket_pos];
3726            let index_part = &segment[bracket_pos..];
3727            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3728            result.push_str(index_part);
3729        } else {
3730            result.push_str(&resolve_attr_name(segment, expr_attr_names));
3731        }
3732    }
3733    result
3734}
3735
3736/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
3737fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3738    let segments = parse_path_segments(path);
3739    if segments.is_empty() {
3740        return None;
3741    }
3742
3743    let first = &segments[0];
3744    let top_key = match first {
3745        PathSegment::Key(k) => k.as_str(),
3746        _ => return None,
3747    };
3748
3749    let mut current = item.get(top_key)?.clone();
3750
3751    for segment in &segments[1..] {
3752        match segment {
3753            PathSegment::Key(k) => {
3754                // Navigate into a Map: {"M": {"key": ...}}
3755                current = current.get("M")?.get(k)?.clone();
3756            }
3757            PathSegment::Index(idx) => {
3758                // Navigate into a List: {"L": [...]}
3759                current = current.get("L")?.get(*idx)?.clone();
3760            }
3761        }
3762    }
3763
3764    Some(current)
3765}
3766
3767#[derive(Debug)]
3768enum PathSegment {
3769    Key(String),
3770    Index(usize),
3771}
3772
3773/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
3774fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3775    let mut segments = Vec::new();
3776    let mut current = String::new();
3777
3778    let chars: Vec<char> = path.chars().collect();
3779    let mut i = 0;
3780    while i < chars.len() {
3781        match chars[i] {
3782            '.' => {
3783                if !current.is_empty() {
3784                    segments.push(PathSegment::Key(current.clone()));
3785                    current.clear();
3786                }
3787            }
3788            '[' => {
3789                if !current.is_empty() {
3790                    segments.push(PathSegment::Key(current.clone()));
3791                    current.clear();
3792                }
3793                i += 1;
3794                let mut num = String::new();
3795                while i < chars.len() && chars[i] != ']' {
3796                    num.push(chars[i]);
3797                    i += 1;
3798                }
3799                if let Ok(idx) = num.parse::<usize>() {
3800                    segments.push(PathSegment::Index(idx));
3801                }
3802                // skip ']'
3803            }
3804            c => {
3805                current.push(c);
3806            }
3807        }
3808        i += 1;
3809    }
3810    if !current.is_empty() {
3811        segments.push(PathSegment::Key(current));
3812    }
3813    segments
3814}
3815
3816/// Insert a value at a nested path in the result HashMap.
3817/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
3818fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3819    // Simple case: no nesting
3820    if !path.contains('.') && !path.contains('[') {
3821        result.insert(path.to_string(), value);
3822        return;
3823    }
3824
3825    let segments = parse_path_segments(path);
3826    if segments.is_empty() {
3827        return;
3828    }
3829
3830    let top_key = match &segments[0] {
3831        PathSegment::Key(k) => k.clone(),
3832        _ => return,
3833    };
3834
3835    if segments.len() == 1 {
3836        result.insert(top_key, value);
3837        return;
3838    }
3839
3840    // For nested paths, wrap the value back into the nested structure
3841    let wrapped = wrap_value_in_path(&segments[1..], value);
3842    // Merge into existing value if present
3843    let existing = result.remove(&top_key);
3844    let merged = match existing {
3845        Some(existing) => merge_attribute_values(existing, wrapped),
3846        None => wrapped,
3847    };
3848    result.insert(top_key, merged);
3849}
3850
3851/// Wrap a value in the nested path structure.
3852fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3853    if segments.is_empty() {
3854        return value;
3855    }
3856    let inner = wrap_value_in_path(&segments[1..], value);
3857    match &segments[0] {
3858        PathSegment::Key(k) => {
3859            json!({"M": {k.clone(): inner}})
3860        }
3861        PathSegment::Index(idx) => {
3862            let mut arr = vec![Value::Null; idx + 1];
3863            arr[*idx] = inner;
3864            json!({"L": arr})
3865        }
3866    }
3867}
3868
3869/// Merge two attribute values (for overlapping projections).
3870fn merge_attribute_values(a: Value, b: Value) -> Value {
3871    if let (Some(a_map), Some(b_map)) = (
3872        a.get("M").and_then(|v| v.as_object()),
3873        b.get("M").and_then(|v| v.as_object()),
3874    ) {
3875        let mut merged = a_map.clone();
3876        for (k, v) in b_map {
3877            if let Some(existing) = merged.get(k) {
3878                merged.insert(
3879                    k.clone(),
3880                    merge_attribute_values(existing.clone(), v.clone()),
3881                );
3882            } else {
3883                merged.insert(k.clone(), v.clone());
3884            }
3885        }
3886        json!({"M": merged})
3887    } else {
3888        b
3889    }
3890}
3891
3892fn evaluate_condition(
3893    condition: &str,
3894    existing: Option<&HashMap<String, AttributeValue>>,
3895    expr_attr_names: &HashMap<String, String>,
3896    expr_attr_values: &HashMap<String, Value>,
3897) -> Result<(), AwsServiceError> {
3898    // ConditionExpression and FilterExpression share the same DynamoDB grammar,
3899    // so we delegate to evaluate_filter_expression. An empty map models "item
3900    // doesn't exist" correctly: attribute_exists → false, attribute_not_exists
3901    // → true, comparisons against missing attributes → None vs Some(val).
3902    let empty = HashMap::new();
3903    let item = existing.unwrap_or(&empty);
3904    if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
3905        Ok(())
3906    } else {
3907        Err(AwsServiceError::aws_error(
3908            StatusCode::BAD_REQUEST,
3909            "ConditionalCheckFailedException",
3910            "The conditional request failed",
3911        ))
3912    }
3913}
3914
3915fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3916    let prefix = format!("{func_name}(");
3917    if let Some(rest) = expr.strip_prefix(&prefix) {
3918        if let Some(inner) = rest.strip_suffix(')') {
3919            return Some(inner.trim());
3920        }
3921    }
3922    None
3923}
3924
3925fn evaluate_key_condition(
3926    expr: &str,
3927    item: &HashMap<String, AttributeValue>,
3928    hash_key_name: &str,
3929    _range_key_name: Option<&str>,
3930    expr_attr_names: &HashMap<String, String>,
3931    expr_attr_values: &HashMap<String, Value>,
3932) -> bool {
3933    let parts: Vec<&str> = split_on_and(expr);
3934    for part in &parts {
3935        let part = part.trim();
3936        if !evaluate_single_key_condition(
3937            part,
3938            item,
3939            hash_key_name,
3940            expr_attr_names,
3941            expr_attr_values,
3942        ) {
3943            return false;
3944        }
3945    }
3946    true
3947}
3948
3949fn split_on_and(expr: &str) -> Vec<&str> {
3950    let mut parts = Vec::new();
3951    let mut start = 0;
3952    let len = expr.len();
3953    let mut i = 0;
3954    let mut depth = 0;
3955    while i < len {
3956        let ch = expr.as_bytes()[i];
3957        if ch == b'(' {
3958            depth += 1;
3959        } else if ch == b')' {
3960            if depth > 0 {
3961                depth -= 1;
3962            }
3963        } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
3964            parts.push(&expr[start..i]);
3965            start = i + 5;
3966            i = start;
3967            continue;
3968        }
3969        i += 1;
3970    }
3971    parts.push(&expr[start..]);
3972    parts
3973}
3974
3975fn split_on_or(expr: &str) -> Vec<&str> {
3976    let mut parts = Vec::new();
3977    let mut start = 0;
3978    let len = expr.len();
3979    let mut i = 0;
3980    let mut depth = 0;
3981    while i < len {
3982        let ch = expr.as_bytes()[i];
3983        if ch == b'(' {
3984            depth += 1;
3985        } else if ch == b')' {
3986            if depth > 0 {
3987                depth -= 1;
3988            }
3989        } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
3990            parts.push(&expr[start..i]);
3991            start = i + 4;
3992            i = start;
3993            continue;
3994        }
3995        i += 1;
3996    }
3997    parts.push(&expr[start..]);
3998    parts
3999}
4000
4001fn evaluate_single_key_condition(
4002    part: &str,
4003    item: &HashMap<String, AttributeValue>,
4004    _hash_key_name: &str,
4005    expr_attr_names: &HashMap<String, String>,
4006    expr_attr_values: &HashMap<String, Value>,
4007) -> bool {
4008    let part = part.trim();
4009
4010    // begins_with(attr, :val)
4011    if let Some(rest) = part
4012        .strip_prefix("begins_with(")
4013        .or_else(|| part.strip_prefix("begins_with ("))
4014    {
4015        if let Some(inner) = rest.strip_suffix(')') {
4016            let mut split = inner.splitn(2, ',');
4017            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4018                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4019                let val_ref = val_ref.trim();
4020                let expected = expr_attr_values.get(val_ref);
4021                let actual = item.get(&attr_name);
4022                return match (actual, expected) {
4023                    (Some(a), Some(e)) => {
4024                        let a_str = extract_string_value(a);
4025                        let e_str = extract_string_value(e);
4026                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
4027                    }
4028                    _ => false,
4029                };
4030            }
4031        }
4032        return false;
4033    }
4034
4035    // BETWEEN
4036    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
4037        let attr_part = part[..between_pos].trim();
4038        let attr_name = resolve_attr_name(attr_part, expr_attr_names);
4039        let range_part = &part[between_pos + 7..];
4040        if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
4041            let lo_ref = range_part[..and_pos].trim();
4042            let hi_ref = range_part[and_pos + 5..].trim();
4043            let lo = expr_attr_values.get(lo_ref);
4044            let hi = expr_attr_values.get(hi_ref);
4045            let actual = item.get(&attr_name);
4046            return match (actual, lo, hi) {
4047                (Some(a), Some(l), Some(h)) => {
4048                    compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
4049                        && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
4050                }
4051                _ => false,
4052            };
4053        }
4054    }
4055
4056    // Simple comparison: attr <op> :val
4057    for op in &["<=", ">=", "<>", "=", "<", ">"] {
4058        if let Some(pos) = part.find(op) {
4059            let left = part[..pos].trim();
4060            let right = part[pos + op.len()..].trim();
4061            let attr_name = resolve_attr_name(left, expr_attr_names);
4062            let expected = expr_attr_values.get(right);
4063            let actual = item.get(&attr_name);
4064
4065            return match *op {
4066                "=" => actual == expected,
4067                "<>" => actual != expected,
4068                "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
4069                ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
4070                "<=" => {
4071                    let cmp = compare_attribute_values(actual, expected);
4072                    cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
4073                }
4074                ">=" => {
4075                    let cmp = compare_attribute_values(actual, expected);
4076                    cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
4077                }
4078                _ => true,
4079            };
4080        }
4081    }
4082
4083    true
4084}
4085
4086fn extract_string_value(val: &Value) -> Option<String> {
4087    val.get("S")
4088        .and_then(|v| v.as_str())
4089        .map(|s| s.to_string())
4090        .or_else(|| val.get("N").and_then(|v| v.as_str()).map(|n| n.to_string()))
4091}
4092
4093fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
4094    match (a, b) {
4095        (None, None) => std::cmp::Ordering::Equal,
4096        (None, Some(_)) => std::cmp::Ordering::Less,
4097        (Some(_), None) => std::cmp::Ordering::Greater,
4098        (Some(a), Some(b)) => {
4099            let a_type = attribute_type_and_value(a);
4100            let b_type = attribute_type_and_value(b);
4101            match (a_type, b_type) {
4102                (Some(("S", a_val)), Some(("S", b_val))) => {
4103                    let a_str = a_val.as_str().unwrap_or("");
4104                    let b_str = b_val.as_str().unwrap_or("");
4105                    a_str.cmp(b_str)
4106                }
4107                (Some(("N", a_val)), Some(("N", b_val))) => {
4108                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4109                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4110                    a_num
4111                        .partial_cmp(&b_num)
4112                        .unwrap_or(std::cmp::Ordering::Equal)
4113                }
4114                (Some(("B", a_val)), Some(("B", b_val))) => {
4115                    let a_str = a_val.as_str().unwrap_or("");
4116                    let b_str = b_val.as_str().unwrap_or("");
4117                    a_str.cmp(b_str)
4118                }
4119                _ => std::cmp::Ordering::Equal,
4120            }
4121        }
4122    }
4123}
4124
4125fn evaluate_filter_expression(
4126    expr: &str,
4127    item: &HashMap<String, AttributeValue>,
4128    expr_attr_names: &HashMap<String, String>,
4129    expr_attr_values: &HashMap<String, Value>,
4130) -> bool {
4131    let trimmed = expr.trim();
4132
4133    // Split on OR first (lower precedence), respecting parentheses
4134    let or_parts = split_on_or(trimmed);
4135    if or_parts.len() > 1 {
4136        return or_parts.iter().any(|part| {
4137            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4138        });
4139    }
4140
4141    // Then split on AND (higher precedence), respecting parentheses
4142    let and_parts = split_on_and(trimmed);
4143    if and_parts.len() > 1 {
4144        return and_parts.iter().all(|part| {
4145            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4146        });
4147    }
4148
4149    // Strip outer parentheses if present
4150    let stripped = strip_outer_parens(trimmed);
4151    if stripped != trimmed {
4152        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
4153    }
4154
4155    // Handle NOT prefix (case-insensitive)
4156    if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
4157        return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
4158    }
4159
4160    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
4161}
4162
4163/// Strip matching outer parentheses from an expression.
4164fn strip_outer_parens(expr: &str) -> &str {
4165    let trimmed = expr.trim();
4166    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
4167        return trimmed;
4168    }
4169    // Verify the outer parens actually match each other
4170    let inner = &trimmed[1..trimmed.len() - 1];
4171    let mut depth = 0;
4172    for ch in inner.bytes() {
4173        match ch {
4174            b'(' => depth += 1,
4175            b')' => {
4176                if depth == 0 {
4177                    return trimmed; // closing paren matches something inside, not the outer one
4178                }
4179                depth -= 1;
4180            }
4181            _ => {}
4182        }
4183    }
4184    if depth == 0 {
4185        inner
4186    } else {
4187        trimmed
4188    }
4189}
4190
4191fn evaluate_single_filter_condition(
4192    part: &str,
4193    item: &HashMap<String, AttributeValue>,
4194    expr_attr_names: &HashMap<String, String>,
4195    expr_attr_values: &HashMap<String, Value>,
4196) -> bool {
4197    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
4198        let attr = resolve_attr_name(inner, expr_attr_names);
4199        return item.contains_key(&attr);
4200    }
4201
4202    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
4203        let attr = resolve_attr_name(inner, expr_attr_names);
4204        return !item.contains_key(&attr);
4205    }
4206
4207    if let Some(rest) = part
4208        .strip_prefix("begins_with(")
4209        .or_else(|| part.strip_prefix("begins_with ("))
4210    {
4211        if let Some(inner) = rest.strip_suffix(')') {
4212            let mut split = inner.splitn(2, ',');
4213            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4214                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4215                let expected = expr_attr_values.get(val_ref.trim());
4216                let actual = item.get(&attr_name);
4217                return match (actual, expected) {
4218                    (Some(a), Some(e)) => {
4219                        let a_str = extract_string_value(a);
4220                        let e_str = extract_string_value(e);
4221                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
4222                    }
4223                    _ => false,
4224                };
4225            }
4226        }
4227    }
4228
4229    if let Some(rest) = part
4230        .strip_prefix("contains(")
4231        .or_else(|| part.strip_prefix("contains ("))
4232    {
4233        if let Some(inner) = rest.strip_suffix(')') {
4234            let mut split = inner.splitn(2, ',');
4235            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4236                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4237                let expected = expr_attr_values.get(val_ref.trim());
4238                let actual = item.get(&attr_name);
4239                return match (actual, expected) {
4240                    (Some(a), Some(e)) => {
4241                        let a_str = extract_string_value(a);
4242                        let e_str = extract_string_value(e);
4243                        matches!((a_str, e_str), (Some(a), Some(e)) if a.contains(&e))
4244                    }
4245                    _ => false,
4246                };
4247            }
4248        }
4249    }
4250
4251    evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
4252}
4253
4254fn apply_update_expression(
4255    item: &mut HashMap<String, AttributeValue>,
4256    expr: &str,
4257    expr_attr_names: &HashMap<String, String>,
4258    expr_attr_values: &HashMap<String, Value>,
4259) -> Result<(), AwsServiceError> {
4260    let clauses = parse_update_clauses(expr);
4261    for (action, assignments) in &clauses {
4262        match action.to_ascii_uppercase().as_str() {
4263            "SET" => {
4264                for assignment in assignments {
4265                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4266                }
4267            }
4268            "REMOVE" => {
4269                for attr_ref in assignments {
4270                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4271                    item.remove(&attr);
4272                }
4273            }
4274            "ADD" => {
4275                for assignment in assignments {
4276                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4277                }
4278            }
4279            "DELETE" => {
4280                for assignment in assignments {
4281                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4282                }
4283            }
4284            _ => {}
4285        }
4286    }
4287    Ok(())
4288}
4289
4290fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
4291    let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
4292    let upper = expr.to_ascii_uppercase();
4293    let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
4294    let mut positions: Vec<(usize, &str)> = Vec::new();
4295
4296    for kw in &keywords {
4297        let mut search_from = 0;
4298        while let Some(pos) = upper[search_from..].find(kw) {
4299            let abs_pos = search_from + pos;
4300            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
4301            let after_pos = abs_pos + kw.len();
4302            let after_ok =
4303                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
4304            if before_ok && after_ok {
4305                positions.push((abs_pos, kw));
4306            }
4307            search_from = abs_pos + kw.len();
4308        }
4309    }
4310
4311    positions.sort_by_key(|(pos, _)| *pos);
4312
4313    for (i, &(pos, kw)) in positions.iter().enumerate() {
4314        let start = pos + kw.len();
4315        let end = if i + 1 < positions.len() {
4316            positions[i + 1].0
4317        } else {
4318            expr.len()
4319        };
4320        let content = expr[start..end].trim();
4321        let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
4322        clauses.push((kw.to_string(), assignments));
4323    }
4324
4325    clauses
4326}
4327
4328fn apply_set_assignment(
4329    item: &mut HashMap<String, AttributeValue>,
4330    assignment: &str,
4331    expr_attr_names: &HashMap<String, String>,
4332    expr_attr_values: &HashMap<String, Value>,
4333) -> Result<(), AwsServiceError> {
4334    let Some((left, right)) = assignment.split_once('=') else {
4335        return Ok(());
4336    };
4337
4338    let attr = resolve_attr_name(left.trim(), expr_attr_names);
4339    let right = right.trim();
4340
4341    // if_not_exists(attr, :val)
4342    if let Some(rest) = right
4343        .strip_prefix("if_not_exists(")
4344        .or_else(|| right.strip_prefix("if_not_exists ("))
4345    {
4346        if let Some(inner) = rest.strip_suffix(')') {
4347            let mut split = inner.splitn(2, ',');
4348            if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
4349                let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
4350                if !item.contains_key(&check_name) {
4351                    if let Some(val) = expr_attr_values.get(default_ref.trim()) {
4352                        item.insert(attr, val.clone());
4353                    }
4354                }
4355                return Ok(());
4356            }
4357        }
4358    }
4359
4360    // list_append(a, b)
4361    if let Some(rest) = right
4362        .strip_prefix("list_append(")
4363        .or_else(|| right.strip_prefix("list_append ("))
4364    {
4365        if let Some(inner) = rest.strip_suffix(')') {
4366            let mut split = inner.splitn(2, ',');
4367            if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
4368                let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
4369                let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
4370
4371                let mut merged = Vec::new();
4372                if let Some(Value::Object(obj)) = &a_val {
4373                    if let Some(Value::Array(arr)) = obj.get("L") {
4374                        merged.extend(arr.clone());
4375                    }
4376                }
4377                if let Some(Value::Object(obj)) = &b_val {
4378                    if let Some(Value::Array(arr)) = obj.get("L") {
4379                        merged.extend(arr.clone());
4380                    }
4381                }
4382
4383                item.insert(attr, json!({"L": merged}));
4384                return Ok(());
4385            }
4386        }
4387    }
4388
4389    // Arithmetic: attr + :val or attr - :val
4390    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
4391        let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
4392        let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
4393
4394        let left_num = extract_number(&left_val).unwrap_or(0.0);
4395        let right_num = extract_number(&right_val).unwrap_or(0.0);
4396
4397        let result = if is_add {
4398            left_num + right_num
4399        } else {
4400            left_num - right_num
4401        };
4402
4403        let num_str = if result == result.trunc() {
4404            format!("{}", result as i64)
4405        } else {
4406            format!("{result}")
4407        };
4408
4409        item.insert(attr, json!({"N": num_str}));
4410        return Ok(());
4411    }
4412
4413    // Simple assignment
4414    let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
4415    if let Some(v) = val {
4416        item.insert(attr, v);
4417    }
4418
4419    Ok(())
4420}
4421
4422fn resolve_value(
4423    reference: &str,
4424    item: &HashMap<String, AttributeValue>,
4425    expr_attr_names: &HashMap<String, String>,
4426    expr_attr_values: &HashMap<String, Value>,
4427) -> Option<Value> {
4428    let reference = reference.trim();
4429    if reference.starts_with(':') {
4430        expr_attr_values.get(reference).cloned()
4431    } else {
4432        let attr_name = resolve_attr_name(reference, expr_attr_names);
4433        item.get(&attr_name).cloned()
4434    }
4435}
4436
4437fn extract_number(val: &Option<Value>) -> Option<f64> {
4438    val.as_ref()
4439        .and_then(|v| v.get("N"))
4440        .and_then(|n| n.as_str())
4441        .and_then(|s| s.parse().ok())
4442}
4443
4444fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
4445    let mut depth = 0;
4446    for (i, c) in expr.char_indices() {
4447        match c {
4448            '(' => depth += 1,
4449            ')' => depth -= 1,
4450            '+' if depth == 0 && i > 0 => {
4451                return Some((&expr[..i], &expr[i + 1..], true));
4452            }
4453            '-' if depth == 0 && i > 0 => {
4454                return Some((&expr[..i], &expr[i + 1..], false));
4455            }
4456            _ => {}
4457        }
4458    }
4459    None
4460}
4461
4462fn apply_add_assignment(
4463    item: &mut HashMap<String, AttributeValue>,
4464    assignment: &str,
4465    expr_attr_names: &HashMap<String, String>,
4466    expr_attr_values: &HashMap<String, Value>,
4467) -> Result<(), AwsServiceError> {
4468    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4469    if parts.len() != 2 {
4470        return Ok(());
4471    }
4472
4473    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4474    let val_ref = parts[1].trim();
4475    let add_val = expr_attr_values.get(val_ref);
4476
4477    if let Some(add_val) = add_val {
4478        if let Some(existing) = item.get(&attr) {
4479            if let (Some(existing_num), Some(add_num)) = (
4480                extract_number(&Some(existing.clone())),
4481                extract_number(&Some(add_val.clone())),
4482            ) {
4483                let result = existing_num + add_num;
4484                let num_str = if result == result.trunc() {
4485                    format!("{}", result as i64)
4486                } else {
4487                    format!("{result}")
4488                };
4489                item.insert(attr, json!({"N": num_str}));
4490            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
4491                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
4492                    let mut merged: Vec<Value> = existing_set.clone();
4493                    for v in add_set {
4494                        if !merged.contains(v) {
4495                            merged.push(v.clone());
4496                        }
4497                    }
4498                    item.insert(attr, json!({"SS": merged}));
4499                }
4500            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
4501                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
4502                    let mut merged: Vec<Value> = existing_set.clone();
4503                    for v in add_set {
4504                        if !merged.contains(v) {
4505                            merged.push(v.clone());
4506                        }
4507                    }
4508                    item.insert(attr, json!({"NS": merged}));
4509                }
4510            }
4511        } else {
4512            item.insert(attr, add_val.clone());
4513        }
4514    }
4515
4516    Ok(())
4517}
4518
4519fn apply_delete_assignment(
4520    item: &mut HashMap<String, AttributeValue>,
4521    assignment: &str,
4522    expr_attr_names: &HashMap<String, String>,
4523    expr_attr_values: &HashMap<String, Value>,
4524) -> Result<(), AwsServiceError> {
4525    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4526    if parts.len() != 2 {
4527        return Ok(());
4528    }
4529
4530    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4531    let val_ref = parts[1].trim();
4532    let del_val = expr_attr_values.get(val_ref);
4533
4534    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
4535        if let (Some(existing_set), Some(del_set)) = (
4536            existing.get("SS").and_then(|v| v.as_array()),
4537            del_val.get("SS").and_then(|v| v.as_array()),
4538        ) {
4539            let filtered: Vec<Value> = existing_set
4540                .iter()
4541                .filter(|v| !del_set.contains(v))
4542                .cloned()
4543                .collect();
4544            if filtered.is_empty() {
4545                item.remove(&attr);
4546            } else {
4547                item.insert(attr, json!({"SS": filtered}));
4548            }
4549        } else if let (Some(existing_set), Some(del_set)) = (
4550            existing.get("NS").and_then(|v| v.as_array()),
4551            del_val.get("NS").and_then(|v| v.as_array()),
4552        ) {
4553            let filtered: Vec<Value> = existing_set
4554                .iter()
4555                .filter(|v| !del_set.contains(v))
4556                .cloned()
4557                .collect();
4558            if filtered.is_empty() {
4559                item.remove(&attr);
4560            } else {
4561                item.insert(attr, json!({"NS": filtered}));
4562            }
4563        }
4564    }
4565
4566    Ok(())
4567}
4568
4569#[allow(clippy::too_many_arguments)]
4570fn build_table_description_json(
4571    arn: &str,
4572    key_schema: &[KeySchemaElement],
4573    attribute_definitions: &[AttributeDefinition],
4574    provisioned_throughput: &ProvisionedThroughput,
4575    gsi: &[GlobalSecondaryIndex],
4576    lsi: &[LocalSecondaryIndex],
4577    billing_mode: &str,
4578    created_at: chrono::DateTime<chrono::Utc>,
4579    item_count: i64,
4580    size_bytes: i64,
4581    status: &str,
4582) -> Value {
4583    let table_name = arn.rsplit('/').next().unwrap_or("");
4584    let creation_timestamp =
4585        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
4586
4587    let ks: Vec<Value> = key_schema
4588        .iter()
4589        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4590        .collect();
4591
4592    let ad: Vec<Value> = attribute_definitions
4593        .iter()
4594        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
4595        .collect();
4596
4597    let mut desc = json!({
4598        "TableName": table_name,
4599        "TableArn": arn,
4600        "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
4601        "TableStatus": status,
4602        "KeySchema": ks,
4603        "AttributeDefinitions": ad,
4604        "CreationDateTime": creation_timestamp,
4605        "ItemCount": item_count,
4606        "TableSizeBytes": size_bytes,
4607        "BillingModeSummary": { "BillingMode": billing_mode },
4608    });
4609
4610    if billing_mode != "PAY_PER_REQUEST" {
4611        desc["ProvisionedThroughput"] = json!({
4612            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
4613            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
4614            "NumberOfDecreasesToday": 0,
4615        });
4616    } else {
4617        desc["ProvisionedThroughput"] = json!({
4618            "ReadCapacityUnits": 0,
4619            "WriteCapacityUnits": 0,
4620            "NumberOfDecreasesToday": 0,
4621        });
4622    }
4623
4624    if !gsi.is_empty() {
4625        let gsi_json: Vec<Value> = gsi
4626            .iter()
4627            .map(|g| {
4628                let gks: Vec<Value> = g
4629                    .key_schema
4630                    .iter()
4631                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4632                    .collect();
4633                let mut idx = json!({
4634                    "IndexName": g.index_name,
4635                    "KeySchema": gks,
4636                    "Projection": { "ProjectionType": g.projection.projection_type },
4637                    "IndexStatus": "ACTIVE",
4638                    "IndexArn": format!("{arn}/index/{}", g.index_name),
4639                    "ItemCount": 0,
4640                    "IndexSizeBytes": 0,
4641                });
4642                if !g.projection.non_key_attributes.is_empty() {
4643                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
4644                }
4645                if let Some(ref pt) = g.provisioned_throughput {
4646                    idx["ProvisionedThroughput"] = json!({
4647                        "ReadCapacityUnits": pt.read_capacity_units,
4648                        "WriteCapacityUnits": pt.write_capacity_units,
4649                        "NumberOfDecreasesToday": 0,
4650                    });
4651                }
4652                idx
4653            })
4654            .collect();
4655        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
4656    }
4657
4658    if !lsi.is_empty() {
4659        let lsi_json: Vec<Value> = lsi
4660            .iter()
4661            .map(|l| {
4662                let lks: Vec<Value> = l
4663                    .key_schema
4664                    .iter()
4665                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4666                    .collect();
4667                let mut idx = json!({
4668                    "IndexName": l.index_name,
4669                    "KeySchema": lks,
4670                    "Projection": { "ProjectionType": l.projection.projection_type },
4671                    "IndexArn": format!("{arn}/index/{}", l.index_name),
4672                    "ItemCount": 0,
4673                    "IndexSizeBytes": 0,
4674                });
4675                if !l.projection.non_key_attributes.is_empty() {
4676                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
4677                }
4678                idx
4679            })
4680            .collect();
4681        desc["LocalSecondaryIndexes"] = json!(lsi_json);
4682    }
4683
4684    desc
4685}
4686
4687fn build_table_description(table: &DynamoTable) -> Value {
4688    let mut desc = build_table_description_json(
4689        &table.arn,
4690        &table.key_schema,
4691        &table.attribute_definitions,
4692        &table.provisioned_throughput,
4693        &table.gsi,
4694        &table.lsi,
4695        &table.billing_mode,
4696        table.created_at,
4697        table.item_count,
4698        table.size_bytes,
4699        &table.status,
4700    );
4701
4702    // Add stream specification if streams are enabled
4703    if table.stream_enabled {
4704        if let Some(ref stream_arn) = table.stream_arn {
4705            desc["LatestStreamArn"] = json!(stream_arn);
4706            desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
4707        }
4708        if let Some(ref view_type) = table.stream_view_type {
4709            desc["StreamSpecification"] = json!({
4710                "StreamEnabled": true,
4711                "StreamViewType": view_type,
4712            });
4713        }
4714    }
4715
4716    // Add SSE description
4717    if let Some(ref sse_type) = table.sse_type {
4718        let mut sse_desc = json!({
4719            "Status": "ENABLED",
4720            "SSEType": sse_type,
4721        });
4722        if let Some(ref key_arn) = table.sse_kms_key_arn {
4723            sse_desc["KMSMasterKeyArn"] = json!(key_arn);
4724        }
4725        desc["SSEDescription"] = sse_desc;
4726    } else {
4727        // Default: AWS owned key encryption (always enabled in real AWS)
4728        desc["SSEDescription"] = json!({
4729            "Status": "ENABLED",
4730            "SSEType": "AES256",
4731        });
4732    }
4733
4734    desc
4735}
4736
4737fn execute_partiql_statement(
4738    state: &SharedDynamoDbState,
4739    statement: &str,
4740    parameters: &[Value],
4741) -> Result<AwsResponse, AwsServiceError> {
4742    let trimmed = statement.trim();
4743    let upper = trimmed.to_ascii_uppercase();
4744
4745    if upper.starts_with("SELECT") {
4746        execute_partiql_select(state, trimmed, parameters)
4747    } else if upper.starts_with("INSERT") {
4748        execute_partiql_insert(state, trimmed, parameters)
4749    } else if upper.starts_with("UPDATE") {
4750        execute_partiql_update(state, trimmed, parameters)
4751    } else if upper.starts_with("DELETE") {
4752        execute_partiql_delete(state, trimmed, parameters)
4753    } else {
4754        Err(AwsServiceError::aws_error(
4755            StatusCode::BAD_REQUEST,
4756            "ValidationException",
4757            format!("Unsupported PartiQL statement: {trimmed}"),
4758        ))
4759    }
4760}
4761
4762/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
4763fn execute_partiql_select(
4764    state: &SharedDynamoDbState,
4765    statement: &str,
4766    parameters: &[Value],
4767) -> Result<AwsResponse, AwsServiceError> {
4768    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
4769    let upper = statement.to_ascii_uppercase();
4770    let from_pos = upper.find("FROM").ok_or_else(|| {
4771        AwsServiceError::aws_error(
4772            StatusCode::BAD_REQUEST,
4773            "ValidationException",
4774            "Invalid SELECT statement: missing FROM",
4775        )
4776    })?;
4777
4778    let after_from = statement[from_pos + 4..].trim();
4779    let (table_name, rest) = parse_partiql_table_name(after_from);
4780
4781    let state = state.read();
4782    let table = get_table(&state.tables, &table_name)?;
4783
4784    let rest_upper = rest.trim().to_ascii_uppercase();
4785    if rest_upper.starts_with("WHERE") {
4786        let where_clause = rest.trim()[5..].trim();
4787        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
4788        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
4789        DynamoDbService::ok_json(json!({ "Items": items }))
4790    } else {
4791        // No WHERE, return all items
4792        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
4793        DynamoDbService::ok_json(json!({ "Items": items }))
4794    }
4795}
4796
4797fn execute_partiql_insert(
4798    state: &SharedDynamoDbState,
4799    statement: &str,
4800    parameters: &[Value],
4801) -> Result<AwsResponse, AwsServiceError> {
4802    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
4803    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
4804    let upper = statement.to_ascii_uppercase();
4805    let into_pos = upper.find("INTO").ok_or_else(|| {
4806        AwsServiceError::aws_error(
4807            StatusCode::BAD_REQUEST,
4808            "ValidationException",
4809            "Invalid INSERT statement: missing INTO",
4810        )
4811    })?;
4812
4813    let after_into = statement[into_pos + 4..].trim();
4814    let (table_name, rest) = parse_partiql_table_name(after_into);
4815
4816    let rest_upper = rest.trim().to_ascii_uppercase();
4817    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
4818        AwsServiceError::aws_error(
4819            StatusCode::BAD_REQUEST,
4820            "ValidationException",
4821            "Invalid INSERT statement: missing VALUE",
4822        )
4823    })?;
4824
4825    let value_str = rest.trim()[value_pos + 5..].trim();
4826    let item = parse_partiql_value_object(value_str, parameters)?;
4827
4828    let mut state = state.write();
4829    let table = get_table_mut(&mut state.tables, &table_name)?;
4830    let key = extract_key(table, &item);
4831    if table.find_item_index(&key).is_some() {
4832        // DynamoDB PartiQL INSERT fails if item exists
4833        return Err(AwsServiceError::aws_error(
4834            StatusCode::BAD_REQUEST,
4835            "DuplicateItemException",
4836            "Duplicate primary key exists in table",
4837        ));
4838    } else {
4839        table.items.push(item);
4840    }
4841    table.recalculate_stats();
4842
4843    DynamoDbService::ok_json(json!({}))
4844}
4845
4846fn execute_partiql_update(
4847    state: &SharedDynamoDbState,
4848    statement: &str,
4849    parameters: &[Value],
4850) -> Result<AwsResponse, AwsServiceError> {
4851    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
4852    // or: UPDATE "tablename" SET attr=? WHERE pk=?
4853    let after_update = statement[6..].trim(); // skip "UPDATE"
4854    let (table_name, rest) = parse_partiql_table_name(after_update);
4855
4856    let rest_upper = rest.trim().to_ascii_uppercase();
4857    let set_pos = rest_upper.find("SET").ok_or_else(|| {
4858        AwsServiceError::aws_error(
4859            StatusCode::BAD_REQUEST,
4860            "ValidationException",
4861            "Invalid UPDATE statement: missing SET",
4862        )
4863    })?;
4864
4865    let after_set = rest.trim()[set_pos + 3..].trim();
4866
4867    // Split on WHERE
4868    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
4869    let (set_clause, where_clause) = if let Some(wp) = where_pos {
4870        (&after_set[..wp], after_set[wp + 5..].trim())
4871    } else {
4872        (after_set, "")
4873    };
4874
4875    let mut state = state.write();
4876    let table = get_table_mut(&mut state.tables, &table_name)?;
4877
4878    let matched_indices = if !where_clause.is_empty() {
4879        find_partiql_where_indices(table, where_clause, parameters)?
4880    } else {
4881        (0..table.items.len()).collect()
4882    };
4883
4884    // Parse SET assignments: attr=value, attr2=value2
4885    let param_offset = count_params_in_str(where_clause);
4886    let assignments: Vec<&str> = set_clause.split(',').collect();
4887    for idx in &matched_indices {
4888        let mut local_offset = param_offset;
4889        for assignment in &assignments {
4890            let assignment = assignment.trim();
4891            if let Some((attr, val_str)) = assignment.split_once('=') {
4892                let attr = attr.trim().trim_matches('"');
4893                let val_str = val_str.trim();
4894                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
4895                if let Some(v) = value {
4896                    table.items[*idx].insert(attr.to_string(), v);
4897                }
4898            }
4899        }
4900    }
4901    table.recalculate_stats();
4902
4903    DynamoDbService::ok_json(json!({}))
4904}
4905
4906fn execute_partiql_delete(
4907    state: &SharedDynamoDbState,
4908    statement: &str,
4909    parameters: &[Value],
4910) -> Result<AwsResponse, AwsServiceError> {
4911    // Pattern: DELETE FROM "tablename" WHERE pk='val'
4912    let upper = statement.to_ascii_uppercase();
4913    let from_pos = upper.find("FROM").ok_or_else(|| {
4914        AwsServiceError::aws_error(
4915            StatusCode::BAD_REQUEST,
4916            "ValidationException",
4917            "Invalid DELETE statement: missing FROM",
4918        )
4919    })?;
4920
4921    let after_from = statement[from_pos + 4..].trim();
4922    let (table_name, rest) = parse_partiql_table_name(after_from);
4923
4924    let rest_upper = rest.trim().to_ascii_uppercase();
4925    if !rest_upper.starts_with("WHERE") {
4926        return Err(AwsServiceError::aws_error(
4927            StatusCode::BAD_REQUEST,
4928            "ValidationException",
4929            "DELETE requires a WHERE clause",
4930        ));
4931    }
4932    let where_clause = rest.trim()[5..].trim();
4933
4934    let mut state = state.write();
4935    let table = get_table_mut(&mut state.tables, &table_name)?;
4936
4937    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
4938    // Remove from highest index first to avoid invalidating lower indices
4939    indices.sort_unstable();
4940    indices.reverse();
4941    for idx in indices {
4942        table.items.remove(idx);
4943    }
4944    table.recalculate_stats();
4945
4946    DynamoDbService::ok_json(json!({}))
4947}
4948
4949/// Parse a table name that may be quoted with double quotes.
4950/// Returns (table_name, rest_of_string).
4951fn parse_partiql_table_name(s: &str) -> (String, &str) {
4952    let s = s.trim();
4953    if let Some(stripped) = s.strip_prefix('"') {
4954        // Quoted name
4955        if let Some(end) = stripped.find('"') {
4956            let name = &stripped[..end];
4957            let rest = &stripped[end + 1..];
4958            (name.to_string(), rest)
4959        } else {
4960            let end = s.find(' ').unwrap_or(s.len());
4961            (s[..end].trim_matches('"').to_string(), &s[end..])
4962        }
4963    } else {
4964        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
4965        (s[..end].to_string(), &s[end..])
4966    }
4967}
4968
4969/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
4970/// Returns matching items.
4971fn evaluate_partiql_where<'a>(
4972    table: &'a DynamoTable,
4973    where_clause: &str,
4974    parameters: &[Value],
4975) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
4976    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
4977    Ok(indices.iter().map(|i| &table.items[*i]).collect())
4978}
4979
4980fn find_partiql_where_indices(
4981    table: &DynamoTable,
4982    where_clause: &str,
4983    parameters: &[Value],
4984) -> Result<Vec<usize>, AwsServiceError> {
4985    // Support: col = 'val' AND col2 = 'val2'  or  col = ? AND col2 = ?
4986    // Case-insensitive AND splitting
4987    let upper = where_clause.to_uppercase();
4988    let conditions = if upper.contains(" AND ") {
4989        // Find positions of " AND " case-insensitively and split
4990        let mut parts = Vec::new();
4991        let mut last = 0;
4992        for (i, _) in upper.match_indices(" AND ") {
4993            parts.push(where_clause[last..i].trim());
4994            last = i + 5;
4995        }
4996        parts.push(where_clause[last..].trim());
4997        parts
4998    } else {
4999        vec![where_clause.trim()]
5000    };
5001
5002    let mut param_idx = 0usize;
5003    let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
5004
5005    for cond in &conditions {
5006        let cond = cond.trim();
5007        if let Some((left, right)) = cond.split_once('=') {
5008            let attr = left.trim().trim_matches('"').to_string();
5009            let val_str = right.trim();
5010            let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
5011            if let Some(v) = value {
5012                parsed_conditions.push((attr, v));
5013            }
5014        }
5015    }
5016
5017    let mut indices = Vec::new();
5018    for (i, item) in table.items.iter().enumerate() {
5019        let all_match = parsed_conditions
5020            .iter()
5021            .all(|(attr, expected)| item.get(attr) == Some(expected));
5022        if all_match {
5023            indices.push(i);
5024        }
5025    }
5026
5027    Ok(indices)
5028}
5029
5030/// Parse a PartiQL literal value. Supports:
5031/// - 'string' -> {"S": "string"}
5032/// - 123 -> {"N": "123"}
5033/// - ? -> parameter from list
5034fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
5035    let s = s.trim();
5036    if s == "?" {
5037        let idx = *param_idx;
5038        *param_idx += 1;
5039        parameters.get(idx).cloned()
5040    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
5041        let inner = &s[1..s.len() - 1];
5042        Some(json!({"S": inner}))
5043    } else if let Ok(n) = s.parse::<f64>() {
5044        let num_str = if n == n.trunc() {
5045            format!("{}", n as i64)
5046        } else {
5047            format!("{n}")
5048        };
5049        Some(json!({"N": num_str}))
5050    } else {
5051        None
5052    }
5053}
5054
5055/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
5056fn parse_partiql_value_object(
5057    s: &str,
5058    parameters: &[Value],
5059) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
5060    let s = s.trim();
5061    let inner = s
5062        .strip_prefix('{')
5063        .and_then(|s| s.strip_suffix('}'))
5064        .ok_or_else(|| {
5065            AwsServiceError::aws_error(
5066                StatusCode::BAD_REQUEST,
5067                "ValidationException",
5068                "Invalid VALUE: expected object literal",
5069            )
5070        })?;
5071
5072    let mut item = HashMap::new();
5073    let mut param_idx = 0usize;
5074
5075    // Simple comma-separated key:value parsing
5076    for pair in split_partiql_pairs(inner) {
5077        let pair = pair.trim();
5078        if pair.is_empty() {
5079            continue;
5080        }
5081        if let Some((key_part, val_part)) = pair.split_once(':') {
5082            let key = key_part
5083                .trim()
5084                .trim_matches('\'')
5085                .trim_matches('"')
5086                .to_string();
5087            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
5088                item.insert(key, val);
5089            }
5090        }
5091    }
5092
5093    Ok(item)
5094}
5095
5096/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
5097fn split_partiql_pairs(s: &str) -> Vec<&str> {
5098    let mut parts = Vec::new();
5099    let mut start = 0;
5100    let mut depth = 0;
5101    let mut in_quote = false;
5102
5103    for (i, c) in s.char_indices() {
5104        match c {
5105            '\'' if !in_quote => in_quote = true,
5106            '\'' if in_quote => in_quote = false,
5107            '{' if !in_quote => depth += 1,
5108            '}' if !in_quote => depth -= 1,
5109            ',' if !in_quote && depth == 0 => {
5110                parts.push(&s[start..i]);
5111                start = i + 1;
5112            }
5113            _ => {}
5114        }
5115    }
5116    parts.push(&s[start..]);
5117    parts
5118}
5119
5120/// Count ? parameters in a string.
5121fn count_params_in_str(s: &str) -> usize {
5122    s.chars().filter(|c| *c == '?').count()
5123}
5124
5125#[cfg(test)]
5126mod tests {
5127    use super::*;
5128    use serde_json::json;
5129
5130    #[test]
5131    fn test_parse_update_clauses_set() {
5132        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
5133        assert_eq!(clauses.len(), 1);
5134        assert_eq!(clauses[0].0, "SET");
5135        assert_eq!(clauses[0].1.len(), 2);
5136    }
5137
5138    #[test]
5139    fn test_parse_update_clauses_set_and_remove() {
5140        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
5141        assert_eq!(clauses.len(), 2);
5142        assert_eq!(clauses[0].0, "SET");
5143        assert_eq!(clauses[1].0, "REMOVE");
5144    }
5145
5146    #[test]
5147    fn test_evaluate_key_condition_simple() {
5148        let mut item = HashMap::new();
5149        item.insert("pk".to_string(), json!({"S": "user1"}));
5150        item.insert("sk".to_string(), json!({"S": "order1"}));
5151
5152        let mut expr_values = HashMap::new();
5153        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
5154
5155        assert!(evaluate_key_condition(
5156            "pk = :pk",
5157            &item,
5158            "pk",
5159            Some("sk"),
5160            &HashMap::new(),
5161            &expr_values,
5162        ));
5163    }
5164
5165    #[test]
5166    fn test_compare_attribute_values_numbers() {
5167        let a = json!({"N": "10"});
5168        let b = json!({"N": "20"});
5169        assert_eq!(
5170            compare_attribute_values(Some(&a), Some(&b)),
5171            std::cmp::Ordering::Less
5172        );
5173    }
5174
5175    #[test]
5176    fn test_compare_attribute_values_strings() {
5177        let a = json!({"S": "apple"});
5178        let b = json!({"S": "banana"});
5179        assert_eq!(
5180            compare_attribute_values(Some(&a), Some(&b)),
5181            std::cmp::Ordering::Less
5182        );
5183    }
5184
5185    #[test]
5186    fn test_split_on_and() {
5187        let parts = split_on_and("pk = :pk AND sk > :sk");
5188        assert_eq!(parts.len(), 2);
5189        assert_eq!(parts[0].trim(), "pk = :pk");
5190        assert_eq!(parts[1].trim(), "sk > :sk");
5191    }
5192
5193    #[test]
5194    fn test_split_on_and_respects_parentheses() {
5195        // Before fix: split_on_and would split inside the parens
5196        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
5197        // Should NOT split on the AND inside parentheses
5198        assert_eq!(parts.len(), 1);
5199        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
5200    }
5201
5202    #[test]
5203    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
5204        // (a AND b) OR c — should match when c is true but a is false
5205        let mut item = HashMap::new();
5206        item.insert("x".to_string(), json!({"S": "no"}));
5207        item.insert("y".to_string(), json!({"S": "no"}));
5208        item.insert("z".to_string(), json!({"S": "yes"}));
5209
5210        let mut expr_values = HashMap::new();
5211        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
5212
5213        // x=yes AND y=yes => false, but z=yes => true => overall true
5214        let result = evaluate_filter_expression(
5215            "(x = :yes AND y = :yes) OR z = :yes",
5216            &item,
5217            &HashMap::new(),
5218            &expr_values,
5219        );
5220        assert!(result, "should match because z = :yes is true");
5221
5222        // x=yes AND y=yes => false, z=yes => false => overall false
5223        let mut item2 = HashMap::new();
5224        item2.insert("x".to_string(), json!({"S": "no"}));
5225        item2.insert("y".to_string(), json!({"S": "no"}));
5226        item2.insert("z".to_string(), json!({"S": "no"}));
5227
5228        let result2 = evaluate_filter_expression(
5229            "(x = :yes AND y = :yes) OR z = :yes",
5230            &item2,
5231            &HashMap::new(),
5232            &expr_values,
5233        );
5234        assert!(!result2, "should not match because nothing is true");
5235    }
5236
5237    #[test]
5238    fn test_project_item_nested_path() {
5239        // Item with a list attribute containing maps
5240        let mut item = HashMap::new();
5241        item.insert("pk".to_string(), json!({"S": "key1"}));
5242        item.insert(
5243            "data".to_string(),
5244            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
5245        );
5246
5247        let body = json!({
5248            "ProjectionExpression": "data[0].name"
5249        });
5250
5251        let projected = project_item(&item, &body);
5252        // Should contain data[0].name = "Alice", not the entire data[0] element
5253        let name = projected
5254            .get("data")
5255            .and_then(|v| v.get("L"))
5256            .and_then(|v| v.get(0))
5257            .and_then(|v| v.get("M"))
5258            .and_then(|v| v.get("name"))
5259            .and_then(|v| v.get("S"))
5260            .and_then(|v| v.as_str());
5261        assert_eq!(name, Some("Alice"));
5262
5263        // Should NOT contain the "age" field
5264        let age = projected
5265            .get("data")
5266            .and_then(|v| v.get("L"))
5267            .and_then(|v| v.get(0))
5268            .and_then(|v| v.get("M"))
5269            .and_then(|v| v.get("age"));
5270        assert!(age.is_none(), "age should not be present in projection");
5271    }
5272
5273    #[test]
5274    fn test_resolve_nested_path_map() {
5275        let mut item = HashMap::new();
5276        item.insert(
5277            "info".to_string(),
5278            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
5279        );
5280
5281        let result = resolve_nested_path(&item, "info.address.city");
5282        assert_eq!(result, Some(json!({"S": "NYC"})));
5283    }
5284
5285    #[test]
5286    fn test_resolve_nested_path_list_then_map() {
5287        let mut item = HashMap::new();
5288        item.insert(
5289            "items".to_string(),
5290            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
5291        );
5292
5293        let result = resolve_nested_path(&item, "items[0].sku");
5294        assert_eq!(result, Some(json!({"S": "ABC"})));
5295    }
5296
5297    // -- Integration-style tests using DynamoDbService --
5298
5299    use crate::state::SharedDynamoDbState;
5300    use parking_lot::RwLock;
5301    use std::sync::Arc;
5302
5303    fn make_service() -> DynamoDbService {
5304        let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
5305            "123456789012",
5306            "us-east-1",
5307        )));
5308        DynamoDbService::new(state)
5309    }
5310
5311    fn make_request(action: &str, body: Value) -> AwsRequest {
5312        AwsRequest {
5313            service: "dynamodb".to_string(),
5314            action: action.to_string(),
5315            region: "us-east-1".to_string(),
5316            account_id: "123456789012".to_string(),
5317            request_id: "test-id".to_string(),
5318            headers: http::HeaderMap::new(),
5319            query_params: HashMap::new(),
5320            body: serde_json::to_vec(&body).unwrap().into(),
5321            path_segments: vec![],
5322            raw_path: "/".to_string(),
5323            raw_query: String::new(),
5324            method: http::Method::POST,
5325            is_query_protocol: false,
5326            access_key_id: None,
5327        }
5328    }
5329
5330    fn create_test_table(svc: &DynamoDbService) {
5331        let req = make_request(
5332            "CreateTable",
5333            json!({
5334                "TableName": "test-table",
5335                "KeySchema": [
5336                    { "AttributeName": "pk", "KeyType": "HASH" }
5337                ],
5338                "AttributeDefinitions": [
5339                    { "AttributeName": "pk", "AttributeType": "S" }
5340                ],
5341                "BillingMode": "PAY_PER_REQUEST"
5342            }),
5343        );
5344        svc.create_table(&req).unwrap();
5345    }
5346
5347    #[test]
5348    fn delete_item_return_values_all_old() {
5349        let svc = make_service();
5350        create_test_table(&svc);
5351
5352        // Put an item
5353        let req = make_request(
5354            "PutItem",
5355            json!({
5356                "TableName": "test-table",
5357                "Item": {
5358                    "pk": { "S": "key1" },
5359                    "name": { "S": "Alice" },
5360                    "age": { "N": "30" }
5361                }
5362            }),
5363        );
5364        svc.put_item(&req).unwrap();
5365
5366        // Delete with ReturnValues=ALL_OLD
5367        let req = make_request(
5368            "DeleteItem",
5369            json!({
5370                "TableName": "test-table",
5371                "Key": { "pk": { "S": "key1" } },
5372                "ReturnValues": "ALL_OLD"
5373            }),
5374        );
5375        let resp = svc.delete_item(&req).unwrap();
5376        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5377
5378        // Verify the old item is returned
5379        let attrs = &body["Attributes"];
5380        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
5381        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
5382        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
5383
5384        // Verify the item is actually deleted
5385        let req = make_request(
5386            "GetItem",
5387            json!({
5388                "TableName": "test-table",
5389                "Key": { "pk": { "S": "key1" } }
5390            }),
5391        );
5392        let resp = svc.get_item(&req).unwrap();
5393        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5394        assert!(body.get("Item").is_none(), "item should be deleted");
5395    }
5396
5397    #[test]
5398    fn transact_get_items_returns_existing_and_missing() {
5399        let svc = make_service();
5400        create_test_table(&svc);
5401
5402        // Put one item
5403        let req = make_request(
5404            "PutItem",
5405            json!({
5406                "TableName": "test-table",
5407                "Item": {
5408                    "pk": { "S": "exists" },
5409                    "val": { "S": "hello" }
5410                }
5411            }),
5412        );
5413        svc.put_item(&req).unwrap();
5414
5415        let req = make_request(
5416            "TransactGetItems",
5417            json!({
5418                "TransactItems": [
5419                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
5420                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
5421                ]
5422            }),
5423        );
5424        let resp = svc.transact_get_items(&req).unwrap();
5425        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5426        let responses = body["Responses"].as_array().unwrap();
5427        assert_eq!(responses.len(), 2);
5428        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
5429        assert!(responses[1].get("Item").is_none());
5430    }
5431
5432    #[test]
5433    fn transact_write_items_put_and_delete() {
5434        let svc = make_service();
5435        create_test_table(&svc);
5436
5437        // Put initial item
5438        let req = make_request(
5439            "PutItem",
5440            json!({
5441                "TableName": "test-table",
5442                "Item": {
5443                    "pk": { "S": "to-delete" },
5444                    "val": { "S": "bye" }
5445                }
5446            }),
5447        );
5448        svc.put_item(&req).unwrap();
5449
5450        // TransactWrite: put new + delete existing
5451        let req = make_request(
5452            "TransactWriteItems",
5453            json!({
5454                "TransactItems": [
5455                    {
5456                        "Put": {
5457                            "TableName": "test-table",
5458                            "Item": {
5459                                "pk": { "S": "new-item" },
5460                                "val": { "S": "hi" }
5461                            }
5462                        }
5463                    },
5464                    {
5465                        "Delete": {
5466                            "TableName": "test-table",
5467                            "Key": { "pk": { "S": "to-delete" } }
5468                        }
5469                    }
5470                ]
5471            }),
5472        );
5473        let resp = svc.transact_write_items(&req).unwrap();
5474        assert_eq!(resp.status, StatusCode::OK);
5475
5476        // Verify new item exists
5477        let req = make_request(
5478            "GetItem",
5479            json!({
5480                "TableName": "test-table",
5481                "Key": { "pk": { "S": "new-item" } }
5482            }),
5483        );
5484        let resp = svc.get_item(&req).unwrap();
5485        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5486        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
5487
5488        // Verify deleted item is gone
5489        let req = make_request(
5490            "GetItem",
5491            json!({
5492                "TableName": "test-table",
5493                "Key": { "pk": { "S": "to-delete" } }
5494            }),
5495        );
5496        let resp = svc.get_item(&req).unwrap();
5497        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5498        assert!(body.get("Item").is_none());
5499    }
5500
5501    #[test]
5502    fn transact_write_items_condition_check_failure() {
5503        let svc = make_service();
5504        create_test_table(&svc);
5505
5506        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
5507        let req = make_request(
5508            "TransactWriteItems",
5509            json!({
5510                "TransactItems": [
5511                    {
5512                        "ConditionCheck": {
5513                            "TableName": "test-table",
5514                            "Key": { "pk": { "S": "nonexistent" } },
5515                            "ConditionExpression": "attribute_exists(pk)"
5516                        }
5517                    }
5518                ]
5519            }),
5520        );
5521        let resp = svc.transact_write_items(&req).unwrap();
5522        // Should be a 400 error response
5523        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
5524        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5525        assert_eq!(
5526            body["__type"].as_str().unwrap(),
5527            "TransactionCanceledException"
5528        );
5529        assert!(body["CancellationReasons"].as_array().is_some());
5530    }
5531
5532    #[test]
5533    fn update_and_describe_time_to_live() {
5534        let svc = make_service();
5535        create_test_table(&svc);
5536
5537        // Enable TTL
5538        let req = make_request(
5539            "UpdateTimeToLive",
5540            json!({
5541                "TableName": "test-table",
5542                "TimeToLiveSpecification": {
5543                    "AttributeName": "ttl",
5544                    "Enabled": true
5545                }
5546            }),
5547        );
5548        let resp = svc.update_time_to_live(&req).unwrap();
5549        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5550        assert_eq!(
5551            body["TimeToLiveSpecification"]["AttributeName"]
5552                .as_str()
5553                .unwrap(),
5554            "ttl"
5555        );
5556        assert!(body["TimeToLiveSpecification"]["Enabled"]
5557            .as_bool()
5558            .unwrap());
5559
5560        // Describe TTL
5561        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5562        let resp = svc.describe_time_to_live(&req).unwrap();
5563        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5564        assert_eq!(
5565            body["TimeToLiveDescription"]["TimeToLiveStatus"]
5566                .as_str()
5567                .unwrap(),
5568            "ENABLED"
5569        );
5570        assert_eq!(
5571            body["TimeToLiveDescription"]["AttributeName"]
5572                .as_str()
5573                .unwrap(),
5574            "ttl"
5575        );
5576
5577        // Disable TTL
5578        let req = make_request(
5579            "UpdateTimeToLive",
5580            json!({
5581                "TableName": "test-table",
5582                "TimeToLiveSpecification": {
5583                    "AttributeName": "ttl",
5584                    "Enabled": false
5585                }
5586            }),
5587        );
5588        svc.update_time_to_live(&req).unwrap();
5589
5590        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5591        let resp = svc.describe_time_to_live(&req).unwrap();
5592        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5593        assert_eq!(
5594            body["TimeToLiveDescription"]["TimeToLiveStatus"]
5595                .as_str()
5596                .unwrap(),
5597            "DISABLED"
5598        );
5599    }
5600
5601    #[test]
5602    fn resource_policy_lifecycle() {
5603        let svc = make_service();
5604        create_test_table(&svc);
5605
5606        let table_arn = {
5607            let state = svc.state.read();
5608            state.tables.get("test-table").unwrap().arn.clone()
5609        };
5610
5611        // Put policy
5612        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
5613        let req = make_request(
5614            "PutResourcePolicy",
5615            json!({
5616                "ResourceArn": table_arn,
5617                "Policy": policy_doc
5618            }),
5619        );
5620        let resp = svc.put_resource_policy(&req).unwrap();
5621        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5622        assert!(body["RevisionId"].as_str().is_some());
5623
5624        // Get policy
5625        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5626        let resp = svc.get_resource_policy(&req).unwrap();
5627        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5628        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
5629
5630        // Delete policy
5631        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
5632        svc.delete_resource_policy(&req).unwrap();
5633
5634        // Get should return null now
5635        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5636        let resp = svc.get_resource_policy(&req).unwrap();
5637        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5638        assert!(body["Policy"].is_null());
5639    }
5640
5641    #[test]
5642    fn describe_endpoints() {
5643        let svc = make_service();
5644        let req = make_request("DescribeEndpoints", json!({}));
5645        let resp = svc.describe_endpoints(&req).unwrap();
5646        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5647        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
5648    }
5649
5650    #[test]
5651    fn describe_limits() {
5652        let svc = make_service();
5653        let req = make_request("DescribeLimits", json!({}));
5654        let resp = svc.describe_limits(&req).unwrap();
5655        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5656        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
5657    }
5658
5659    #[test]
5660    fn backup_lifecycle() {
5661        let svc = make_service();
5662        create_test_table(&svc);
5663
5664        // Create backup
5665        let req = make_request(
5666            "CreateBackup",
5667            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
5668        );
5669        let resp = svc.create_backup(&req).unwrap();
5670        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5671        let backup_arn = body["BackupDetails"]["BackupArn"]
5672            .as_str()
5673            .unwrap()
5674            .to_string();
5675        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
5676
5677        // Describe backup
5678        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
5679        let resp = svc.describe_backup(&req).unwrap();
5680        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5681        assert_eq!(
5682            body["BackupDescription"]["BackupDetails"]["BackupName"],
5683            "my-backup"
5684        );
5685
5686        // List backups
5687        let req = make_request("ListBackups", json!({}));
5688        let resp = svc.list_backups(&req).unwrap();
5689        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5690        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
5691
5692        // Restore from backup
5693        let req = make_request(
5694            "RestoreTableFromBackup",
5695            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
5696        );
5697        svc.restore_table_from_backup(&req).unwrap();
5698
5699        // Verify restored table exists
5700        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
5701        let resp = svc.describe_table(&req).unwrap();
5702        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5703        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5704
5705        // Delete backup
5706        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
5707        svc.delete_backup(&req).unwrap();
5708
5709        // List should be empty
5710        let req = make_request("ListBackups", json!({}));
5711        let resp = svc.list_backups(&req).unwrap();
5712        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5713        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
5714    }
5715
5716    #[test]
5717    fn continuous_backups() {
5718        let svc = make_service();
5719        create_test_table(&svc);
5720
5721        // Initially disabled
5722        let req = make_request(
5723            "DescribeContinuousBackups",
5724            json!({ "TableName": "test-table" }),
5725        );
5726        let resp = svc.describe_continuous_backups(&req).unwrap();
5727        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5728        assert_eq!(
5729            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5730                ["PointInTimeRecoveryStatus"],
5731            "DISABLED"
5732        );
5733
5734        // Enable
5735        let req = make_request(
5736            "UpdateContinuousBackups",
5737            json!({
5738                "TableName": "test-table",
5739                "PointInTimeRecoverySpecification": {
5740                    "PointInTimeRecoveryEnabled": true
5741                }
5742            }),
5743        );
5744        svc.update_continuous_backups(&req).unwrap();
5745
5746        // Verify
5747        let req = make_request(
5748            "DescribeContinuousBackups",
5749            json!({ "TableName": "test-table" }),
5750        );
5751        let resp = svc.describe_continuous_backups(&req).unwrap();
5752        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5753        assert_eq!(
5754            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5755                ["PointInTimeRecoveryStatus"],
5756            "ENABLED"
5757        );
5758    }
5759
5760    #[test]
5761    fn restore_table_to_point_in_time() {
5762        let svc = make_service();
5763        create_test_table(&svc);
5764
5765        let req = make_request(
5766            "RestoreTableToPointInTime",
5767            json!({
5768                "SourceTableName": "test-table",
5769                "TargetTableName": "pitr-restored"
5770            }),
5771        );
5772        svc.restore_table_to_point_in_time(&req).unwrap();
5773
5774        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
5775        let resp = svc.describe_table(&req).unwrap();
5776        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5777        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5778    }
5779
5780    #[test]
5781    fn global_table_lifecycle() {
5782        let svc = make_service();
5783
5784        // Create global table
5785        let req = make_request(
5786            "CreateGlobalTable",
5787            json!({
5788                "GlobalTableName": "my-global",
5789                "ReplicationGroup": [
5790                    { "RegionName": "us-east-1" },
5791                    { "RegionName": "eu-west-1" }
5792                ]
5793            }),
5794        );
5795        let resp = svc.create_global_table(&req).unwrap();
5796        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5797        assert_eq!(
5798            body["GlobalTableDescription"]["GlobalTableStatus"],
5799            "ACTIVE"
5800        );
5801
5802        // Describe
5803        let req = make_request(
5804            "DescribeGlobalTable",
5805            json!({ "GlobalTableName": "my-global" }),
5806        );
5807        let resp = svc.describe_global_table(&req).unwrap();
5808        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5809        assert_eq!(
5810            body["GlobalTableDescription"]["ReplicationGroup"]
5811                .as_array()
5812                .unwrap()
5813                .len(),
5814            2
5815        );
5816
5817        // List
5818        let req = make_request("ListGlobalTables", json!({}));
5819        let resp = svc.list_global_tables(&req).unwrap();
5820        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5821        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
5822
5823        // Update - add a region
5824        let req = make_request(
5825            "UpdateGlobalTable",
5826            json!({
5827                "GlobalTableName": "my-global",
5828                "ReplicaUpdates": [
5829                    { "Create": { "RegionName": "ap-southeast-1" } }
5830                ]
5831            }),
5832        );
5833        let resp = svc.update_global_table(&req).unwrap();
5834        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5835        assert_eq!(
5836            body["GlobalTableDescription"]["ReplicationGroup"]
5837                .as_array()
5838                .unwrap()
5839                .len(),
5840            3
5841        );
5842
5843        // Describe settings
5844        let req = make_request(
5845            "DescribeGlobalTableSettings",
5846            json!({ "GlobalTableName": "my-global" }),
5847        );
5848        let resp = svc.describe_global_table_settings(&req).unwrap();
5849        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5850        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
5851
5852        // Update settings (no-op, just verify no error)
5853        let req = make_request(
5854            "UpdateGlobalTableSettings",
5855            json!({ "GlobalTableName": "my-global" }),
5856        );
5857        svc.update_global_table_settings(&req).unwrap();
5858    }
5859
5860    #[test]
5861    fn table_replica_auto_scaling() {
5862        let svc = make_service();
5863        create_test_table(&svc);
5864
5865        let req = make_request(
5866            "DescribeTableReplicaAutoScaling",
5867            json!({ "TableName": "test-table" }),
5868        );
5869        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
5870        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5871        assert_eq!(
5872            body["TableAutoScalingDescription"]["TableName"],
5873            "test-table"
5874        );
5875
5876        let req = make_request(
5877            "UpdateTableReplicaAutoScaling",
5878            json!({ "TableName": "test-table" }),
5879        );
5880        svc.update_table_replica_auto_scaling(&req).unwrap();
5881    }
5882
5883    #[test]
5884    fn kinesis_streaming_lifecycle() {
5885        let svc = make_service();
5886        create_test_table(&svc);
5887
5888        // Enable
5889        let req = make_request(
5890            "EnableKinesisStreamingDestination",
5891            json!({
5892                "TableName": "test-table",
5893                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5894            }),
5895        );
5896        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
5897        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5898        assert_eq!(body["DestinationStatus"], "ACTIVE");
5899
5900        // Describe
5901        let req = make_request(
5902            "DescribeKinesisStreamingDestination",
5903            json!({ "TableName": "test-table" }),
5904        );
5905        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
5906        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5907        assert_eq!(
5908            body["KinesisDataStreamDestinations"]
5909                .as_array()
5910                .unwrap()
5911                .len(),
5912            1
5913        );
5914
5915        // Update
5916        let req = make_request(
5917            "UpdateKinesisStreamingDestination",
5918            json!({
5919                "TableName": "test-table",
5920                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
5921                "UpdateKinesisStreamingConfiguration": {
5922                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
5923                }
5924            }),
5925        );
5926        svc.update_kinesis_streaming_destination(&req).unwrap();
5927
5928        // Disable
5929        let req = make_request(
5930            "DisableKinesisStreamingDestination",
5931            json!({
5932                "TableName": "test-table",
5933                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5934            }),
5935        );
5936        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
5937        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5938        assert_eq!(body["DestinationStatus"], "DISABLED");
5939    }
5940
5941    #[test]
5942    fn contributor_insights_lifecycle() {
5943        let svc = make_service();
5944        create_test_table(&svc);
5945
5946        // Initially disabled
5947        let req = make_request(
5948            "DescribeContributorInsights",
5949            json!({ "TableName": "test-table" }),
5950        );
5951        let resp = svc.describe_contributor_insights(&req).unwrap();
5952        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5953        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5954
5955        // Enable
5956        let req = make_request(
5957            "UpdateContributorInsights",
5958            json!({
5959                "TableName": "test-table",
5960                "ContributorInsightsAction": "ENABLE"
5961            }),
5962        );
5963        let resp = svc.update_contributor_insights(&req).unwrap();
5964        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5965        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
5966
5967        // List
5968        let req = make_request("ListContributorInsights", json!({}));
5969        let resp = svc.list_contributor_insights(&req).unwrap();
5970        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5971        assert_eq!(
5972            body["ContributorInsightsSummaries"]
5973                .as_array()
5974                .unwrap()
5975                .len(),
5976            1
5977        );
5978    }
5979
5980    #[test]
5981    fn export_lifecycle() {
5982        let svc = make_service();
5983        create_test_table(&svc);
5984
5985        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
5986
5987        // Export
5988        let req = make_request(
5989            "ExportTableToPointInTime",
5990            json!({
5991                "TableArn": table_arn,
5992                "S3Bucket": "my-bucket"
5993            }),
5994        );
5995        let resp = svc.export_table_to_point_in_time(&req).unwrap();
5996        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5997        let export_arn = body["ExportDescription"]["ExportArn"]
5998            .as_str()
5999            .unwrap()
6000            .to_string();
6001        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
6002
6003        // Describe
6004        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
6005        let resp = svc.describe_export(&req).unwrap();
6006        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6007        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
6008
6009        // List
6010        let req = make_request("ListExports", json!({}));
6011        let resp = svc.list_exports(&req).unwrap();
6012        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6013        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
6014    }
6015
6016    #[test]
6017    fn import_lifecycle() {
6018        let svc = make_service();
6019
6020        let req = make_request(
6021            "ImportTable",
6022            json!({
6023                "InputFormat": "DYNAMODB_JSON",
6024                "S3BucketSource": { "S3Bucket": "import-bucket" },
6025                "TableCreationParameters": {
6026                    "TableName": "imported-table",
6027                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
6028                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
6029                }
6030            }),
6031        );
6032        let resp = svc.import_table(&req).unwrap();
6033        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6034        let import_arn = body["ImportTableDescription"]["ImportArn"]
6035            .as_str()
6036            .unwrap()
6037            .to_string();
6038        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6039
6040        // Describe import
6041        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
6042        let resp = svc.describe_import(&req).unwrap();
6043        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6044        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6045
6046        // List imports
6047        let req = make_request("ListImports", json!({}));
6048        let resp = svc.list_imports(&req).unwrap();
6049        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6050        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
6051
6052        // Verify the table was created
6053        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
6054        let resp = svc.describe_table(&req).unwrap();
6055        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6056        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6057    }
6058
6059    #[test]
6060    fn backup_restore_preserves_items() {
6061        let svc = make_service();
6062        create_test_table(&svc);
6063
6064        // Put 3 items
6065        for i in 1..=3 {
6066            let req = make_request(
6067                "PutItem",
6068                json!({
6069                    "TableName": "test-table",
6070                    "Item": {
6071                        "pk": { "S": format!("key{i}") },
6072                        "data": { "S": format!("value{i}") }
6073                    }
6074                }),
6075            );
6076            svc.put_item(&req).unwrap();
6077        }
6078
6079        // Create backup
6080        let req = make_request(
6081            "CreateBackup",
6082            json!({
6083                "TableName": "test-table",
6084                "BackupName": "my-backup"
6085            }),
6086        );
6087        let resp = svc.create_backup(&req).unwrap();
6088        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6089        let backup_arn = body["BackupDetails"]["BackupArn"]
6090            .as_str()
6091            .unwrap()
6092            .to_string();
6093
6094        // Delete all items from the original table
6095        for i in 1..=3 {
6096            let req = make_request(
6097                "DeleteItem",
6098                json!({
6099                    "TableName": "test-table",
6100                    "Key": { "pk": { "S": format!("key{i}") } }
6101                }),
6102            );
6103            svc.delete_item(&req).unwrap();
6104        }
6105
6106        // Verify original table is empty
6107        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6108        let resp = svc.scan(&req).unwrap();
6109        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6110        assert_eq!(body["Count"], 0);
6111
6112        // Restore from backup
6113        let req = make_request(
6114            "RestoreTableFromBackup",
6115            json!({
6116                "BackupArn": backup_arn,
6117                "TargetTableName": "restored-table"
6118            }),
6119        );
6120        svc.restore_table_from_backup(&req).unwrap();
6121
6122        // Scan restored table — should have 3 items
6123        let req = make_request("Scan", json!({ "TableName": "restored-table" }));
6124        let resp = svc.scan(&req).unwrap();
6125        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6126        assert_eq!(body["Count"], 3);
6127        assert_eq!(body["Items"].as_array().unwrap().len(), 3);
6128    }
6129
6130    #[test]
6131    fn global_table_replicates_writes() {
6132        let svc = make_service();
6133        create_test_table(&svc);
6134
6135        // Create global table with replicas
6136        let req = make_request(
6137            "CreateGlobalTable",
6138            json!({
6139                "GlobalTableName": "test-table",
6140                "ReplicationGroup": [
6141                    { "RegionName": "us-east-1" },
6142                    { "RegionName": "eu-west-1" }
6143                ]
6144            }),
6145        );
6146        let resp = svc.create_global_table(&req).unwrap();
6147        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6148        assert_eq!(
6149            body["GlobalTableDescription"]["GlobalTableStatus"],
6150            "ACTIVE"
6151        );
6152
6153        // Put an item
6154        let req = make_request(
6155            "PutItem",
6156            json!({
6157                "TableName": "test-table",
6158                "Item": {
6159                    "pk": { "S": "replicated-key" },
6160                    "data": { "S": "replicated-value" }
6161                }
6162            }),
6163        );
6164        svc.put_item(&req).unwrap();
6165
6166        // Verify the item is readable (since all replicas share the same table)
6167        let req = make_request(
6168            "GetItem",
6169            json!({
6170                "TableName": "test-table",
6171                "Key": { "pk": { "S": "replicated-key" } }
6172            }),
6173        );
6174        let resp = svc.get_item(&req).unwrap();
6175        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6176        assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
6177        assert_eq!(body["Item"]["data"]["S"], "replicated-value");
6178    }
6179
6180    #[test]
6181    fn contributor_insights_tracks_access() {
6182        let svc = make_service();
6183        create_test_table(&svc);
6184
6185        // Enable contributor insights
6186        let req = make_request(
6187            "UpdateContributorInsights",
6188            json!({
6189                "TableName": "test-table",
6190                "ContributorInsightsAction": "ENABLE"
6191            }),
6192        );
6193        svc.update_contributor_insights(&req).unwrap();
6194
6195        // Put items with different partition keys
6196        for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
6197            let req = make_request(
6198                "PutItem",
6199                json!({
6200                    "TableName": "test-table",
6201                    "Item": {
6202                        "pk": { "S": key },
6203                        "data": { "S": "value" }
6204                    }
6205                }),
6206            );
6207            svc.put_item(&req).unwrap();
6208        }
6209
6210        // Get items (to also track read access)
6211        for _ in 0..3 {
6212            let req = make_request(
6213                "GetItem",
6214                json!({
6215                    "TableName": "test-table",
6216                    "Key": { "pk": { "S": "alpha" } }
6217                }),
6218            );
6219            svc.get_item(&req).unwrap();
6220        }
6221
6222        // Describe contributor insights — should show top contributors
6223        let req = make_request(
6224            "DescribeContributorInsights",
6225            json!({ "TableName": "test-table" }),
6226        );
6227        let resp = svc.describe_contributor_insights(&req).unwrap();
6228        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6229        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6230
6231        let contributors = body["TopContributors"].as_array().unwrap();
6232        assert!(
6233            !contributors.is_empty(),
6234            "TopContributors should not be empty"
6235        );
6236
6237        // alpha was accessed 3 (put) + 3 (get) = 6 times, beta 2 times
6238        // alpha should be the top contributor
6239        let top = &contributors[0];
6240        assert!(top["Count"].as_u64().unwrap() > 0);
6241
6242        // Verify the rule list is populated
6243        let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
6244        assert!(!rules.is_empty());
6245    }
6246
6247    #[test]
6248    fn contributor_insights_not_tracked_when_disabled() {
6249        let svc = make_service();
6250        create_test_table(&svc);
6251
6252        // Put items without enabling insights
6253        let req = make_request(
6254            "PutItem",
6255            json!({
6256                "TableName": "test-table",
6257                "Item": {
6258                    "pk": { "S": "key1" },
6259                    "data": { "S": "value" }
6260                }
6261            }),
6262        );
6263        svc.put_item(&req).unwrap();
6264
6265        // Describe — should show empty contributors
6266        let req = make_request(
6267            "DescribeContributorInsights",
6268            json!({ "TableName": "test-table" }),
6269        );
6270        let resp = svc.describe_contributor_insights(&req).unwrap();
6271        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6272        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6273
6274        let contributors = body["TopContributors"].as_array().unwrap();
6275        assert!(contributors.is_empty());
6276    }
6277
6278    #[test]
6279    fn contributor_insights_disabled_table_no_counters_after_scan() {
6280        let svc = make_service();
6281        create_test_table(&svc);
6282
6283        // Put items
6284        for key in &["alpha", "beta"] {
6285            let req = make_request(
6286                "PutItem",
6287                json!({
6288                    "TableName": "test-table",
6289                    "Item": { "pk": { "S": key } }
6290                }),
6291            );
6292            svc.put_item(&req).unwrap();
6293        }
6294
6295        // Enable insights, then scan, then disable, then check counters are cleared
6296        let req = make_request(
6297            "UpdateContributorInsights",
6298            json!({
6299                "TableName": "test-table",
6300                "ContributorInsightsAction": "ENABLE"
6301            }),
6302        );
6303        svc.update_contributor_insights(&req).unwrap();
6304
6305        // Scan to trigger counter collection
6306        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6307        svc.scan(&req).unwrap();
6308
6309        // Verify counters were collected
6310        let req = make_request(
6311            "DescribeContributorInsights",
6312            json!({ "TableName": "test-table" }),
6313        );
6314        let resp = svc.describe_contributor_insights(&req).unwrap();
6315        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6316        let contributors = body["TopContributors"].as_array().unwrap();
6317        assert!(
6318            !contributors.is_empty(),
6319            "counters should be non-empty while enabled"
6320        );
6321
6322        // Disable insights (this clears counters)
6323        let req = make_request(
6324            "UpdateContributorInsights",
6325            json!({
6326                "TableName": "test-table",
6327                "ContributorInsightsAction": "DISABLE"
6328            }),
6329        );
6330        svc.update_contributor_insights(&req).unwrap();
6331
6332        // Scan again -- should NOT accumulate counters since insights is disabled
6333        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6334        svc.scan(&req).unwrap();
6335
6336        // Verify counters are still empty
6337        let req = make_request(
6338            "DescribeContributorInsights",
6339            json!({ "TableName": "test-table" }),
6340        );
6341        let resp = svc.describe_contributor_insights(&req).unwrap();
6342        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6343        let contributors = body["TopContributors"].as_array().unwrap();
6344        assert!(
6345            contributors.is_empty(),
6346            "counters should be empty after disabling insights"
6347        );
6348    }
6349
6350    #[test]
6351    fn scan_pagination_with_limit() {
6352        let svc = make_service();
6353        create_test_table(&svc);
6354
6355        // Insert 5 items
6356        for i in 0..5 {
6357            let req = make_request(
6358                "PutItem",
6359                json!({
6360                    "TableName": "test-table",
6361                    "Item": {
6362                        "pk": { "S": format!("item{i}") },
6363                        "data": { "S": format!("value{i}") }
6364                    }
6365                }),
6366            );
6367            svc.put_item(&req).unwrap();
6368        }
6369
6370        // Scan with limit=2
6371        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
6372        let resp = svc.scan(&req).unwrap();
6373        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6374        assert_eq!(body["Count"], 2);
6375        assert!(
6376            body["LastEvaluatedKey"].is_object(),
6377            "should have LastEvaluatedKey when limit < total items"
6378        );
6379        assert!(body["LastEvaluatedKey"]["pk"].is_object());
6380
6381        // Page through all items
6382        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6383        let mut lek = body["LastEvaluatedKey"].clone();
6384
6385        while lek.is_object() {
6386            let req = make_request(
6387                "Scan",
6388                json!({
6389                    "TableName": "test-table",
6390                    "Limit": 2,
6391                    "ExclusiveStartKey": lek
6392                }),
6393            );
6394            let resp = svc.scan(&req).unwrap();
6395            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6396            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6397            lek = body["LastEvaluatedKey"].clone();
6398        }
6399
6400        assert_eq!(
6401            all_items.len(),
6402            5,
6403            "should retrieve all 5 items via pagination"
6404        );
6405    }
6406
6407    #[test]
6408    fn scan_no_pagination_when_all_fit() {
6409        let svc = make_service();
6410        create_test_table(&svc);
6411
6412        for i in 0..3 {
6413            let req = make_request(
6414                "PutItem",
6415                json!({
6416                    "TableName": "test-table",
6417                    "Item": {
6418                        "pk": { "S": format!("item{i}") }
6419                    }
6420                }),
6421            );
6422            svc.put_item(&req).unwrap();
6423        }
6424
6425        // Scan with limit > item count
6426        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
6427        let resp = svc.scan(&req).unwrap();
6428        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6429        assert_eq!(body["Count"], 3);
6430        assert!(
6431            body["LastEvaluatedKey"].is_null(),
6432            "should not have LastEvaluatedKey when all items fit"
6433        );
6434
6435        // Scan without limit
6436        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6437        let resp = svc.scan(&req).unwrap();
6438        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6439        assert_eq!(body["Count"], 3);
6440        assert!(body["LastEvaluatedKey"].is_null());
6441    }
6442
6443    fn create_composite_table(svc: &DynamoDbService) {
6444        let req = make_request(
6445            "CreateTable",
6446            json!({
6447                "TableName": "composite-table",
6448                "KeySchema": [
6449                    { "AttributeName": "pk", "KeyType": "HASH" },
6450                    { "AttributeName": "sk", "KeyType": "RANGE" }
6451                ],
6452                "AttributeDefinitions": [
6453                    { "AttributeName": "pk", "AttributeType": "S" },
6454                    { "AttributeName": "sk", "AttributeType": "S" }
6455                ],
6456                "BillingMode": "PAY_PER_REQUEST"
6457            }),
6458        );
6459        svc.create_table(&req).unwrap();
6460    }
6461
6462    #[test]
6463    fn query_pagination_with_composite_key() {
6464        let svc = make_service();
6465        create_composite_table(&svc);
6466
6467        // Insert 5 items under the same partition key
6468        for i in 0..5 {
6469            let req = make_request(
6470                "PutItem",
6471                json!({
6472                    "TableName": "composite-table",
6473                    "Item": {
6474                        "pk": { "S": "user1" },
6475                        "sk": { "S": format!("item{i:03}") },
6476                        "data": { "S": format!("value{i}") }
6477                    }
6478                }),
6479            );
6480            svc.put_item(&req).unwrap();
6481        }
6482
6483        // Query with limit=2
6484        let req = make_request(
6485            "Query",
6486            json!({
6487                "TableName": "composite-table",
6488                "KeyConditionExpression": "pk = :pk",
6489                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6490                "Limit": 2
6491            }),
6492        );
6493        let resp = svc.query(&req).unwrap();
6494        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6495        assert_eq!(body["Count"], 2);
6496        assert!(body["LastEvaluatedKey"].is_object());
6497        assert!(body["LastEvaluatedKey"]["pk"].is_object());
6498        assert!(body["LastEvaluatedKey"]["sk"].is_object());
6499
6500        // Page through all items
6501        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6502        let mut lek = body["LastEvaluatedKey"].clone();
6503
6504        while lek.is_object() {
6505            let req = make_request(
6506                "Query",
6507                json!({
6508                    "TableName": "composite-table",
6509                    "KeyConditionExpression": "pk = :pk",
6510                    "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6511                    "Limit": 2,
6512                    "ExclusiveStartKey": lek
6513                }),
6514            );
6515            let resp = svc.query(&req).unwrap();
6516            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6517            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6518            lek = body["LastEvaluatedKey"].clone();
6519        }
6520
6521        assert_eq!(
6522            all_items.len(),
6523            5,
6524            "should retrieve all 5 items via pagination"
6525        );
6526
6527        // Verify items came back sorted by sort key
6528        let sks: Vec<String> = all_items
6529            .iter()
6530            .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
6531            .collect();
6532        let mut sorted = sks.clone();
6533        sorted.sort();
6534        assert_eq!(sks, sorted, "items should be sorted by sort key");
6535    }
6536
6537    #[test]
6538    fn query_no_pagination_when_all_fit() {
6539        let svc = make_service();
6540        create_composite_table(&svc);
6541
6542        for i in 0..2 {
6543            let req = make_request(
6544                "PutItem",
6545                json!({
6546                    "TableName": "composite-table",
6547                    "Item": {
6548                        "pk": { "S": "user1" },
6549                        "sk": { "S": format!("item{i}") }
6550                    }
6551                }),
6552            );
6553            svc.put_item(&req).unwrap();
6554        }
6555
6556        let req = make_request(
6557            "Query",
6558            json!({
6559                "TableName": "composite-table",
6560                "KeyConditionExpression": "pk = :pk",
6561                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6562                "Limit": 10
6563            }),
6564        );
6565        let resp = svc.query(&req).unwrap();
6566        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6567        assert_eq!(body["Count"], 2);
6568        assert!(
6569            body["LastEvaluatedKey"].is_null(),
6570            "should not have LastEvaluatedKey when all items fit"
6571        );
6572    }
6573
6574    fn create_gsi_table(svc: &DynamoDbService) {
6575        let req = make_request(
6576            "CreateTable",
6577            json!({
6578                "TableName": "gsi-table",
6579                "KeySchema": [
6580                    { "AttributeName": "pk", "KeyType": "HASH" }
6581                ],
6582                "AttributeDefinitions": [
6583                    { "AttributeName": "pk", "AttributeType": "S" },
6584                    { "AttributeName": "gsi_pk", "AttributeType": "S" },
6585                    { "AttributeName": "gsi_sk", "AttributeType": "S" }
6586                ],
6587                "BillingMode": "PAY_PER_REQUEST",
6588                "GlobalSecondaryIndexes": [
6589                    {
6590                        "IndexName": "gsi-index",
6591                        "KeySchema": [
6592                            { "AttributeName": "gsi_pk", "KeyType": "HASH" },
6593                            { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
6594                        ],
6595                        "Projection": { "ProjectionType": "ALL" }
6596                    }
6597                ]
6598            }),
6599        );
6600        svc.create_table(&req).unwrap();
6601    }
6602
6603    #[test]
6604    fn gsi_query_last_evaluated_key_includes_table_pk() {
6605        let svc = make_service();
6606        create_gsi_table(&svc);
6607
6608        // Insert 3 items with the SAME GSI key but different table PKs
6609        for i in 0..3 {
6610            let req = make_request(
6611                "PutItem",
6612                json!({
6613                    "TableName": "gsi-table",
6614                    "Item": {
6615                        "pk": { "S": format!("item{i}") },
6616                        "gsi_pk": { "S": "shared" },
6617                        "gsi_sk": { "S": "sort" }
6618                    }
6619                }),
6620            );
6621            svc.put_item(&req).unwrap();
6622        }
6623
6624        // Query GSI with Limit=1 to trigger pagination
6625        let req = make_request(
6626            "Query",
6627            json!({
6628                "TableName": "gsi-table",
6629                "IndexName": "gsi-index",
6630                "KeyConditionExpression": "gsi_pk = :v",
6631                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6632                "Limit": 1
6633            }),
6634        );
6635        let resp = svc.query(&req).unwrap();
6636        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6637        assert_eq!(body["Count"], 1);
6638        let lek = &body["LastEvaluatedKey"];
6639        assert!(lek.is_object(), "should have LastEvaluatedKey");
6640        // Must contain the index keys
6641        assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
6642        assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
6643        // Must also contain the table PK
6644        assert!(
6645            lek["pk"].is_object(),
6646            "LEK must contain table PK for GSI queries"
6647        );
6648    }
6649
6650    #[test]
6651    fn gsi_query_pagination_returns_all_items() {
6652        let svc = make_service();
6653        create_gsi_table(&svc);
6654
6655        // Insert 4 items with the SAME GSI key but different table PKs
6656        for i in 0..4 {
6657            let req = make_request(
6658                "PutItem",
6659                json!({
6660                    "TableName": "gsi-table",
6661                    "Item": {
6662                        "pk": { "S": format!("item{i:03}") },
6663                        "gsi_pk": { "S": "shared" },
6664                        "gsi_sk": { "S": "sort" }
6665                    }
6666                }),
6667            );
6668            svc.put_item(&req).unwrap();
6669        }
6670
6671        // Paginate through all items with Limit=2
6672        let mut all_pks = Vec::new();
6673        let mut lek: Option<Value> = None;
6674
6675        loop {
6676            let mut query = json!({
6677                "TableName": "gsi-table",
6678                "IndexName": "gsi-index",
6679                "KeyConditionExpression": "gsi_pk = :v",
6680                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6681                "Limit": 2
6682            });
6683            if let Some(ref start_key) = lek {
6684                query["ExclusiveStartKey"] = start_key.clone();
6685            }
6686
6687            let req = make_request("Query", query);
6688            let resp = svc.query(&req).unwrap();
6689            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6690
6691            for item in body["Items"].as_array().unwrap() {
6692                let pk = item["pk"]["S"].as_str().unwrap().to_string();
6693                all_pks.push(pk);
6694            }
6695
6696            if body["LastEvaluatedKey"].is_object() {
6697                lek = Some(body["LastEvaluatedKey"].clone());
6698            } else {
6699                break;
6700            }
6701        }
6702
6703        all_pks.sort();
6704        assert_eq!(
6705            all_pks,
6706            vec!["item000", "item001", "item002", "item003"],
6707            "pagination should return all items without duplicates"
6708        );
6709    }
6710
6711    fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
6712        pairs
6713            .iter()
6714            .map(|(k, v)| (k.to_string(), json!({"S": v})))
6715            .collect()
6716    }
6717
6718    fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
6719        pairs
6720            .iter()
6721            .map(|(k, v)| (k.to_string(), v.to_string()))
6722            .collect()
6723    }
6724
6725    fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
6726        pairs
6727            .iter()
6728            .map(|(k, v)| (k.to_string(), json!({"S": v})))
6729            .collect()
6730    }
6731
6732    #[test]
6733    fn test_evaluate_condition_bare_not_equal() {
6734        let item = cond_item(&[("state", "active")]);
6735        let names = cond_names(&[("#s", "state")]);
6736        let values = cond_values(&[(":c", "complete")]);
6737
6738        assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
6739
6740        let item2 = cond_item(&[("state", "complete")]);
6741        assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
6742    }
6743
6744    #[test]
6745    fn test_evaluate_condition_parenthesized_not_equal() {
6746        let item = cond_item(&[("state", "active")]);
6747        let names = cond_names(&[("#s", "state")]);
6748        let values = cond_values(&[(":c", "complete")]);
6749
6750        assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
6751    }
6752
6753    #[test]
6754    fn test_evaluate_condition_parenthesized_equal_mismatch() {
6755        let item = cond_item(&[("state", "active")]);
6756        let names = cond_names(&[("#s", "state")]);
6757        let values = cond_values(&[(":c", "complete")]);
6758
6759        assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
6760    }
6761
6762    #[test]
6763    fn test_evaluate_condition_compound_and() {
6764        let item = cond_item(&[("state", "active")]);
6765        let names = cond_names(&[("#s", "state")]);
6766        let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
6767
6768        // active <> complete AND active <> failed => true
6769        assert!(
6770            evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
6771        );
6772    }
6773
6774    #[test]
6775    fn test_evaluate_condition_compound_and_mismatch() {
6776        let item = cond_item(&[("state", "inactive")]);
6777        let names = cond_names(&[("#s", "state")]);
6778        let values = cond_values(&[(":a", "active"), (":b", "active")]);
6779
6780        // inactive = active AND inactive = active => false
6781        assert!(
6782            evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
6783        );
6784    }
6785
6786    #[test]
6787    fn test_evaluate_condition_compound_or() {
6788        let item = cond_item(&[("state", "running")]);
6789        let names = cond_names(&[("#s", "state")]);
6790        let values = cond_values(&[(":a", "active"), (":b", "idle")]);
6791
6792        // running = active OR running = idle => false
6793        assert!(
6794            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
6795        );
6796
6797        // running = active OR running = running => true
6798        let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
6799        assert!(
6800            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
6801        );
6802    }
6803
6804    #[test]
6805    fn test_evaluate_condition_not_operator() {
6806        let item = cond_item(&[("state", "active")]);
6807        let names = cond_names(&[("#s", "state")]);
6808        let values = cond_values(&[(":c", "complete")]);
6809
6810        // NOT (active = complete) => NOT false => true
6811        assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
6812
6813        // NOT (active <> complete) => NOT true => false
6814        assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
6815
6816        // NOT attribute_exists(#s) on existing item => NOT true => false
6817        assert!(
6818            evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
6819        );
6820
6821        // NOT attribute_exists(#s) on missing item => NOT false => true
6822        assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
6823    }
6824
6825    #[test]
6826    fn test_evaluate_condition_begins_with() {
6827        // After unification, conditions support begins_with via
6828        // evaluate_single_filter_condition (previously only filters had it).
6829        let item = cond_item(&[("name", "fakecloud-dynamodb")]);
6830        let names = cond_names(&[("#n", "name")]);
6831        let values = cond_values(&[(":p", "fakecloud")]);
6832
6833        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
6834
6835        let values2 = cond_values(&[(":p", "realcloud")]);
6836        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
6837    }
6838
6839    #[test]
6840    fn test_evaluate_condition_contains() {
6841        let item = cond_item(&[("tags", "alpha,beta,gamma")]);
6842        let names = cond_names(&[("#t", "tags")]);
6843        let values = cond_values(&[(":v", "beta")]);
6844
6845        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
6846
6847        let values2 = cond_values(&[(":v", "delta")]);
6848        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
6849    }
6850
6851    #[test]
6852    fn test_evaluate_condition_no_existing_item() {
6853        // When no item exists (PutItem with condition), attribute_not_exists
6854        // should succeed and attribute_exists should fail.
6855        let names = cond_names(&[("#s", "state")]);
6856        let values = cond_values(&[(":v", "active")]);
6857
6858        assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
6859        assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
6860        // Comparison against missing item: None != Some(val) => true for <>
6861        assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
6862        // None == Some(val) => false for =
6863        assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
6864    }
6865
6866    #[test]
6867    fn test_evaluate_filter_not_operator() {
6868        let item = cond_item(&[("status", "pending")]);
6869        let names = cond_names(&[("#s", "status")]);
6870        let values = cond_values(&[(":v", "pending")]);
6871
6872        assert!(!evaluate_filter_expression(
6873            "NOT (#s = :v)",
6874            &item,
6875            &names,
6876            &values
6877        ));
6878        assert!(evaluate_filter_expression(
6879            "NOT (#s <> :v)",
6880            &item,
6881            &names,
6882            &values
6883        ));
6884    }
6885}