Skip to main content

fakecloud_dynamodb/
service.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use parking_lot::RwLock;
9use serde_json::{json, Value};
10
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_s3::state::SharedS3State;
16
17use crate::state::{
18    attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
19    ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
20    KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
21    ReplicaDescription, SharedDynamoDbState,
22};
23
24pub struct DynamoDbService {
25    state: SharedDynamoDbState,
26    s3_state: Option<SharedS3State>,
27    delivery: Option<Arc<DeliveryBus>>,
28}
29
30impl DynamoDbService {
31    pub fn new(state: SharedDynamoDbState) -> Self {
32        Self {
33            state,
34            s3_state: None,
35            delivery: None,
36        }
37    }
38
39    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
40        self.s3_state = Some(s3_state);
41        self
42    }
43
44    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
45        self.delivery = Some(delivery);
46        self
47    }
48
49    /// Deliver a change record to all active Kinesis streaming destinations for a table.
50    fn deliver_to_kinesis_destinations(
51        &self,
52        table: &DynamoTable,
53        event_name: &str,
54        keys: &HashMap<String, AttributeValue>,
55        old_image: Option<&HashMap<String, AttributeValue>>,
56        new_image: Option<&HashMap<String, AttributeValue>>,
57    ) {
58        let delivery = match &self.delivery {
59            Some(d) => d,
60            None => return,
61        };
62
63        let active_destinations: Vec<_> = table
64            .kinesis_destinations
65            .iter()
66            .filter(|d| d.destination_status == "ACTIVE")
67            .collect();
68
69        if active_destinations.is_empty() {
70            return;
71        }
72
73        let mut record = json!({
74            "eventID": uuid::Uuid::new_v4().to_string(),
75            "eventName": event_name,
76            "eventVersion": "1.1",
77            "eventSource": "aws:dynamodb",
78            "awsRegion": table.arn.split(':').nth(3).unwrap_or("us-east-1"),
79            "dynamodb": {
80                "Keys": keys,
81                "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
82                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
83                "StreamViewType": "NEW_AND_OLD_IMAGES",
84            },
85            "eventSourceARN": &table.arn,
86            "tableName": &table.name,
87        });
88
89        if let Some(old) = old_image {
90            record["dynamodb"]["OldImage"] = json!(old);
91        }
92        if let Some(new) = new_image {
93            record["dynamodb"]["NewImage"] = json!(new);
94        }
95
96        let record_str = serde_json::to_string(&record).unwrap_or_default();
97        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
98        let partition_key = serde_json::to_string(keys).unwrap_or_default();
99
100        for dest in active_destinations {
101            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
102        }
103    }
104
105    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
106        serde_json::from_slice(&req.body).map_err(|e| {
107            AwsServiceError::aws_error(
108                StatusCode::BAD_REQUEST,
109                "SerializationException",
110                format!("Invalid JSON: {e}"),
111            )
112        })
113    }
114
115    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
116        Ok(AwsResponse::ok_json(body))
117    }
118
119    // ── Table Operations ────────────────────────────────────────────────
120
121    fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
122        let body = Self::parse_body(req)?;
123
124        let table_name = body["TableName"]
125            .as_str()
126            .ok_or_else(|| {
127                AwsServiceError::aws_error(
128                    StatusCode::BAD_REQUEST,
129                    "ValidationException",
130                    "TableName is required",
131                )
132            })?
133            .to_string();
134
135        let key_schema = parse_key_schema(&body["KeySchema"])?;
136        let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
137
138        // Validate that key schema attributes are defined
139        for ks in &key_schema {
140            if !attribute_definitions
141                .iter()
142                .any(|ad| ad.attribute_name == ks.attribute_name)
143            {
144                return Err(AwsServiceError::aws_error(
145                    StatusCode::BAD_REQUEST,
146                    "ValidationException",
147                    format!(
148                        "One or more parameter values were invalid: \
149                         Some index key attributes are not defined in AttributeDefinitions. \
150                         Keys: [{}], AttributeDefinitions: [{}]",
151                        ks.attribute_name,
152                        attribute_definitions
153                            .iter()
154                            .map(|ad| ad.attribute_name.as_str())
155                            .collect::<Vec<_>>()
156                            .join(", ")
157                    ),
158                ));
159            }
160        }
161
162        let billing_mode = body["BillingMode"]
163            .as_str()
164            .unwrap_or("PROVISIONED")
165            .to_string();
166
167        let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
168            ProvisionedThroughput {
169                read_capacity_units: 0,
170                write_capacity_units: 0,
171            }
172        } else {
173            parse_provisioned_throughput(&body["ProvisionedThroughput"])?
174        };
175
176        let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
177        let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
178        let tags = parse_tags(&body["Tags"]);
179
180        // Parse StreamSpecification
181        let (stream_enabled, stream_view_type) =
182            if let Some(stream_spec) = body.get("StreamSpecification") {
183                let enabled = stream_spec
184                    .get("StreamEnabled")
185                    .and_then(|v| v.as_bool())
186                    .unwrap_or(false);
187                let view_type = if enabled {
188                    stream_spec
189                        .get("StreamViewType")
190                        .and_then(|v| v.as_str())
191                        .map(|s| s.to_string())
192                } else {
193                    None
194                };
195                (enabled, view_type)
196            } else {
197                (false, None)
198            };
199
200        // Parse SSESpecification
201        let (sse_type, sse_kms_key_arn) = if let Some(sse_spec) = body.get("SSESpecification") {
202            let enabled = sse_spec
203                .get("Enabled")
204                .and_then(|v| v.as_bool())
205                .unwrap_or(false);
206            if enabled {
207                let sse_type = sse_spec
208                    .get("SSEType")
209                    .and_then(|v| v.as_str())
210                    .unwrap_or("KMS")
211                    .to_string();
212                let kms_key = sse_spec
213                    .get("KMSMasterKeyId")
214                    .and_then(|v| v.as_str())
215                    .map(|s| s.to_string());
216                (Some(sse_type), kms_key)
217            } else {
218                (None, None)
219            }
220        } else {
221            (None, None)
222        };
223
224        let mut state = self.state.write();
225
226        if state.tables.contains_key(&table_name) {
227            return Err(AwsServiceError::aws_error(
228                StatusCode::BAD_REQUEST,
229                "ResourceInUseException",
230                format!("Table already exists: {table_name}"),
231            ));
232        }
233
234        let now = Utc::now();
235        let arn = format!(
236            "arn:aws:dynamodb:{}:{}:table/{}",
237            state.region, state.account_id, table_name
238        );
239        let stream_arn = if stream_enabled {
240            Some(format!(
241                "arn:aws:dynamodb:{}:{}:table/{}/stream/{}",
242                state.region,
243                state.account_id,
244                table_name,
245                now.format("%Y-%m-%dT%H:%M:%S.%3f")
246            ))
247        } else {
248            None
249        };
250
251        let table = DynamoTable {
252            name: table_name.clone(),
253            arn: arn.clone(),
254            key_schema: key_schema.clone(),
255            attribute_definitions: attribute_definitions.clone(),
256            provisioned_throughput: provisioned_throughput.clone(),
257            items: Vec::new(),
258            gsi: gsi.clone(),
259            lsi: lsi.clone(),
260            tags,
261            created_at: now,
262            status: "ACTIVE".to_string(),
263            item_count: 0,
264            size_bytes: 0,
265            billing_mode: billing_mode.clone(),
266            ttl_attribute: None,
267            ttl_enabled: false,
268            resource_policy: None,
269            pitr_enabled: false,
270            kinesis_destinations: Vec::new(),
271            contributor_insights_status: "DISABLED".to_string(),
272            contributor_insights_counters: HashMap::new(),
273            stream_enabled,
274            stream_view_type,
275            stream_arn,
276            stream_records: Arc::new(RwLock::new(Vec::new())),
277            sse_type,
278            sse_kms_key_arn,
279        };
280
281        state.tables.insert(table_name, table);
282
283        let table_desc = build_table_description_json(
284            &arn,
285            &key_schema,
286            &attribute_definitions,
287            &provisioned_throughput,
288            &gsi,
289            &lsi,
290            &billing_mode,
291            now,
292            0,
293            0,
294            "ACTIVE",
295        );
296
297        Self::ok_json(json!({ "TableDescription": table_desc }))
298    }
299
300    fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301        let body = Self::parse_body(req)?;
302        let table_name = require_str(&body, "TableName")?;
303
304        let mut state = self.state.write();
305        let table = state.tables.remove(table_name).ok_or_else(|| {
306            AwsServiceError::aws_error(
307                StatusCode::BAD_REQUEST,
308                "ResourceNotFoundException",
309                format!("Requested resource not found: Table: {table_name} not found"),
310            )
311        })?;
312
313        let table_desc = build_table_description_json(
314            &table.arn,
315            &table.key_schema,
316            &table.attribute_definitions,
317            &table.provisioned_throughput,
318            &table.gsi,
319            &table.lsi,
320            &table.billing_mode,
321            table.created_at,
322            table.item_count,
323            table.size_bytes,
324            "DELETING",
325        );
326
327        Self::ok_json(json!({ "TableDescription": table_desc }))
328    }
329
330    fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
331        let body = Self::parse_body(req)?;
332        let table_name = require_str(&body, "TableName")?;
333
334        let state = self.state.read();
335        let table = get_table(&state.tables, table_name)?;
336
337        let table_desc = build_table_description(table);
338
339        Self::ok_json(json!({ "Table": table_desc }))
340    }
341
342    fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
343        let body = Self::parse_body(req)?;
344
345        validate_optional_string_length(
346            "exclusiveStartTableName",
347            body["ExclusiveStartTableName"].as_str(),
348            3,
349            255,
350        )?;
351        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
352
353        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
354        let exclusive_start = body["ExclusiveStartTableName"]
355            .as_str()
356            .map(|s| s.to_string());
357
358        let state = self.state.read();
359        let mut names: Vec<&String> = state.tables.keys().collect();
360        names.sort();
361
362        let start_idx = match &exclusive_start {
363            Some(start) => names
364                .iter()
365                .position(|n| n.as_str() > start.as_str())
366                .unwrap_or(names.len()),
367            None => 0,
368        };
369
370        let page: Vec<&str> = names
371            .iter()
372            .skip(start_idx)
373            .take(limit)
374            .map(|n| n.as_str())
375            .collect();
376
377        let mut result = json!({ "TableNames": page });
378
379        if start_idx + limit < names.len() {
380            if let Some(last) = page.last() {
381                result["LastEvaluatedTableName"] = json!(last);
382            }
383        }
384
385        Self::ok_json(result)
386    }
387
388    fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
389        let body = Self::parse_body(req)?;
390        let table_name = require_str(&body, "TableName")?;
391
392        let mut state = self.state.write();
393        let table = state.tables.get_mut(table_name).ok_or_else(|| {
394            AwsServiceError::aws_error(
395                StatusCode::BAD_REQUEST,
396                "ResourceNotFoundException",
397                format!("Requested resource not found: Table: {table_name} not found"),
398            )
399        })?;
400
401        if let Some(pt) = body.get("ProvisionedThroughput") {
402            if let Ok(throughput) = parse_provisioned_throughput(pt) {
403                table.provisioned_throughput = throughput;
404            }
405        }
406
407        if let Some(bm) = body["BillingMode"].as_str() {
408            table.billing_mode = bm.to_string();
409        }
410
411        // Handle SSESpecification update
412        if let Some(sse_spec) = body.get("SSESpecification") {
413            let enabled = sse_spec
414                .get("Enabled")
415                .and_then(|v| v.as_bool())
416                .unwrap_or(false);
417            if enabled {
418                table.sse_type = Some(
419                    sse_spec
420                        .get("SSEType")
421                        .and_then(|v| v.as_str())
422                        .unwrap_or("KMS")
423                        .to_string(),
424                );
425                table.sse_kms_key_arn = sse_spec
426                    .get("KMSMasterKeyId")
427                    .and_then(|v| v.as_str())
428                    .map(|s| s.to_string());
429            } else {
430                table.sse_type = None;
431                table.sse_kms_key_arn = None;
432            }
433        }
434
435        let table_desc = build_table_description_json(
436            &table.arn,
437            &table.key_schema,
438            &table.attribute_definitions,
439            &table.provisioned_throughput,
440            &table.gsi,
441            &table.lsi,
442            &table.billing_mode,
443            table.created_at,
444            table.item_count,
445            table.size_bytes,
446            &table.status,
447        );
448
449        Self::ok_json(json!({ "TableDescription": table_desc }))
450    }
451
452    // ── Item Operations ─────────────────────────────────────────────────
453
454    fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
455        // --- Parse request body and expression attributes WITHOUT holding any lock ---
456        let body = Self::parse_body(req)?;
457        let table_name = require_str(&body, "TableName")?;
458        let item = require_object(&body, "Item")?;
459        let condition = body["ConditionExpression"].as_str().map(|s| s.to_string());
460        let expr_attr_names = parse_expression_attribute_names(&body);
461        let expr_attr_values = parse_expression_attribute_values(&body);
462        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE").to_string();
463
464        // --- Acquire write lock ONLY for validation + mutation ---
465        // Capture kinesis delivery info alongside the return value
466        let (old_item, kinesis_info) = {
467            let mut state = self.state.write();
468            let table = get_table_mut(&mut state.tables, table_name)?;
469
470            validate_key_in_item(table, &item)?;
471
472            let key = extract_key(table, &item);
473            let existing_idx = table.find_item_index(&key);
474
475            if let Some(ref cond) = condition {
476                let existing = existing_idx.map(|i| &table.items[i]);
477                evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
478            }
479
480            let old_item_for_return = if return_values == "ALL_OLD" {
481                existing_idx.map(|i| table.items[i].clone())
482            } else {
483                None
484            };
485
486            // Capture old item for stream/kinesis if needed
487            let needs_change_capture = table.stream_enabled
488                || table
489                    .kinesis_destinations
490                    .iter()
491                    .any(|d| d.destination_status == "ACTIVE");
492            let old_item_for_stream = if needs_change_capture {
493                existing_idx.map(|i| table.items[i].clone())
494            } else {
495                None
496            };
497
498            let is_modify = existing_idx.is_some();
499
500            if let Some(idx) = existing_idx {
501                table.items[idx] = item.clone();
502            } else {
503                table.items.push(item.clone());
504            }
505
506            table.record_item_access(&item);
507            table.recalculate_stats();
508
509            let event_name = if is_modify { "MODIFY" } else { "INSERT" };
510            let key = extract_key(table, &item);
511
512            // Generate stream record
513            if table.stream_enabled {
514                if let Some(record) = crate::streams::generate_stream_record(
515                    table,
516                    event_name,
517                    key.clone(),
518                    old_item_for_stream.clone(),
519                    Some(item.clone()),
520                ) {
521                    crate::streams::add_stream_record(table, record);
522                }
523            }
524
525            // Capture kinesis delivery info (delivered after lock release)
526            let kinesis_info = if table
527                .kinesis_destinations
528                .iter()
529                .any(|d| d.destination_status == "ACTIVE")
530            {
531                Some((
532                    table.clone(),
533                    event_name.to_string(),
534                    key,
535                    old_item_for_stream,
536                    Some(item.clone()),
537                ))
538            } else {
539                None
540            };
541
542            (old_item_for_return, kinesis_info)
543        };
544        // --- Write lock released, build response ---
545
546        // Deliver to Kinesis destinations outside the lock
547        if let Some((table, event_name, keys, old_image, new_image)) = kinesis_info {
548            self.deliver_to_kinesis_destinations(
549                &table,
550                &event_name,
551                &keys,
552                old_image.as_ref(),
553                new_image.as_ref(),
554            );
555        }
556
557        let mut result = json!({});
558        if let Some(old) = old_item {
559            result["Attributes"] = json!(old);
560        }
561
562        Self::ok_json(result)
563    }
564
565    fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
566        // --- Parse request body WITHOUT holding any lock ---
567        let body = Self::parse_body(req)?;
568        let table_name = require_str(&body, "TableName")?;
569        let key = require_object(&body, "Key")?;
570
571        // --- Use a read lock for the lookup (allows concurrent GetItem calls) ---
572        let (result, needs_insights) = {
573            let state = self.state.read();
574            let table = get_table(&state.tables, table_name)?;
575            let needs_insights = table.contributor_insights_status == "ENABLED";
576
577            let result = match table.find_item_index(&key) {
578                Some(idx) => {
579                    let item = &table.items[idx];
580                    let projected = project_item(item, &body);
581                    json!({ "Item": projected })
582                }
583                None => json!({}),
584            };
585            (result, needs_insights)
586        };
587        // --- Read lock released ---
588
589        // Only acquire write lock if contributor insights tracking is enabled
590        if needs_insights {
591            let mut state = self.state.write();
592            if let Some(table) = state.tables.get_mut(table_name) {
593                table.record_key_access(&key);
594            }
595        }
596
597        Self::ok_json(result)
598    }
599
600    fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
601        let body = Self::parse_body(req)?;
602
603        validate_optional_enum_value(
604            "conditionalOperator",
605            &body["ConditionalOperator"],
606            &["AND", "OR"],
607        )?;
608        validate_optional_enum_value(
609            "returnConsumedCapacity",
610            &body["ReturnConsumedCapacity"],
611            &["INDEXES", "TOTAL", "NONE"],
612        )?;
613        validate_optional_enum_value(
614            "returnValues",
615            &body["ReturnValues"],
616            &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
617        )?;
618        validate_optional_enum_value(
619            "returnItemCollectionMetrics",
620            &body["ReturnItemCollectionMetrics"],
621            &["SIZE", "NONE"],
622        )?;
623        validate_optional_enum_value(
624            "returnValuesOnConditionCheckFailure",
625            &body["ReturnValuesOnConditionCheckFailure"],
626            &["ALL_OLD", "NONE"],
627        )?;
628
629        let table_name = require_str(&body, "TableName")?;
630        let key = require_object(&body, "Key")?;
631
632        let (result, kinesis_info) = {
633            let mut state = self.state.write();
634            let table = get_table_mut(&mut state.tables, table_name)?;
635
636            let condition = body["ConditionExpression"].as_str();
637            let expr_attr_names = parse_expression_attribute_names(&body);
638            let expr_attr_values = parse_expression_attribute_values(&body);
639
640            let existing_idx = table.find_item_index(&key);
641
642            if let Some(cond) = condition {
643                let existing = existing_idx.map(|i| &table.items[i]);
644                evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
645            }
646
647            let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
648
649            let mut result = json!({});
650            let mut kinesis_info = None;
651
652            if let Some(idx) = existing_idx {
653                let old_item = table.items[idx].clone();
654                if return_values == "ALL_OLD" {
655                    result["Attributes"] = json!(old_item.clone());
656                }
657
658                // Generate stream record before removing
659                if table.stream_enabled {
660                    if let Some(record) = crate::streams::generate_stream_record(
661                        table,
662                        "REMOVE",
663                        key.clone(),
664                        Some(old_item.clone()),
665                        None,
666                    ) {
667                        crate::streams::add_stream_record(table, record);
668                    }
669                }
670
671                // Capture kinesis delivery info
672                if table
673                    .kinesis_destinations
674                    .iter()
675                    .any(|d| d.destination_status == "ACTIVE")
676                {
677                    kinesis_info = Some((table.clone(), key.clone(), Some(old_item)));
678                }
679
680                table.items.remove(idx);
681                table.recalculate_stats();
682            }
683
684            let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
685            let return_icm = body["ReturnItemCollectionMetrics"]
686                .as_str()
687                .unwrap_or("NONE");
688
689            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
690                result["ConsumedCapacity"] = json!({
691                    "TableName": table_name,
692                    "CapacityUnits": 1.0,
693                });
694            }
695
696            if return_icm == "SIZE" {
697                result["ItemCollectionMetrics"] = json!({});
698            }
699
700            (result, kinesis_info)
701        };
702
703        // Deliver to Kinesis destinations outside the lock
704        if let Some((table, keys, old_image)) = kinesis_info {
705            self.deliver_to_kinesis_destinations(&table, "REMOVE", &keys, old_image.as_ref(), None);
706        }
707
708        Self::ok_json(result)
709    }
710
711    fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
712        let body = Self::parse_body(req)?;
713        let table_name = require_str(&body, "TableName")?;
714        let key = require_object(&body, "Key")?;
715
716        let mut state = self.state.write();
717        let table = get_table_mut(&mut state.tables, table_name)?;
718
719        validate_key_attributes_in_key(table, &key)?;
720
721        let condition = body["ConditionExpression"].as_str();
722        let expr_attr_names = parse_expression_attribute_names(&body);
723        let expr_attr_values = parse_expression_attribute_values(&body);
724        let update_expression = body["UpdateExpression"].as_str();
725
726        let existing_idx = table.find_item_index(&key);
727
728        if let Some(cond) = condition {
729            let existing = existing_idx.map(|i| &table.items[i]);
730            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
731        }
732
733        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
734
735        let is_insert = existing_idx.is_none();
736        let idx = match existing_idx {
737            Some(i) => i,
738            None => {
739                let mut new_item = HashMap::new();
740                for (k, v) in &key {
741                    new_item.insert(k.clone(), v.clone());
742                }
743                table.items.push(new_item);
744                table.items.len() - 1
745            }
746        };
747
748        // Capture old item for stream/kinesis (before update)
749        let needs_change_capture = table.stream_enabled
750            || table
751                .kinesis_destinations
752                .iter()
753                .any(|d| d.destination_status == "ACTIVE");
754        let old_item_for_stream = if needs_change_capture {
755            Some(table.items[idx].clone())
756        } else {
757            None
758        };
759
760        let old_item = if return_values == "ALL_OLD" {
761            Some(table.items[idx].clone())
762        } else {
763            None
764        };
765
766        if let Some(expr) = update_expression {
767            apply_update_expression(
768                &mut table.items[idx],
769                expr,
770                &expr_attr_names,
771                &expr_attr_values,
772            )?;
773        }
774
775        let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
776            Some(table.items[idx].clone())
777        } else {
778            None
779        };
780
781        let event_name = if is_insert { "INSERT" } else { "MODIFY" };
782        let new_item_for_stream = table.items[idx].clone();
783
784        // Generate stream record after update
785        if table.stream_enabled {
786            if let Some(record) = crate::streams::generate_stream_record(
787                table,
788                event_name,
789                key.clone(),
790                old_item_for_stream.clone(),
791                Some(new_item_for_stream.clone()),
792            ) {
793                crate::streams::add_stream_record(table, record);
794            }
795        }
796
797        // Capture kinesis delivery info
798        let kinesis_info = if table
799            .kinesis_destinations
800            .iter()
801            .any(|d| d.destination_status == "ACTIVE")
802        {
803            Some((
804                table.clone(),
805                event_name.to_string(),
806                key.clone(),
807                old_item_for_stream,
808                Some(new_item_for_stream),
809            ))
810        } else {
811            None
812        };
813
814        table.recalculate_stats();
815
816        // Release the write lock (drop `state`)
817        drop(state);
818
819        // Deliver to Kinesis destinations outside the lock
820        if let Some((tbl, ev, keys, old_image, new_image)) = kinesis_info {
821            self.deliver_to_kinesis_destinations(
822                &tbl,
823                &ev,
824                &keys,
825                old_image.as_ref(),
826                new_image.as_ref(),
827            );
828        }
829
830        let mut result = json!({});
831        if let Some(old) = old_item {
832            result["Attributes"] = json!(old);
833        } else if let Some(new) = new_item {
834            result["Attributes"] = json!(new);
835        }
836
837        Self::ok_json(result)
838    }
839
840    // ── Query & Scan ────────────────────────────────────────────────────
841
842    fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
843        let body = Self::parse_body(req)?;
844        let table_name = require_str(&body, "TableName")?;
845
846        let state = self.state.read();
847        let table = get_table(&state.tables, table_name)?;
848
849        let expr_attr_names = parse_expression_attribute_names(&body);
850        let expr_attr_values = parse_expression_attribute_values(&body);
851
852        let key_condition = body["KeyConditionExpression"].as_str();
853        let filter_expression = body["FilterExpression"].as_str();
854        let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
855        let limit = body["Limit"].as_i64().map(|l| l as usize);
856        let index_name = body["IndexName"].as_str();
857        let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
858            parse_key_map(&body["ExclusiveStartKey"]);
859
860        let (items_to_scan, hash_key_name, range_key_name): (
861            &[HashMap<String, AttributeValue>],
862            String,
863            Option<String>,
864        ) = if let Some(idx_name) = index_name {
865            if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
866                let hk = gsi
867                    .key_schema
868                    .iter()
869                    .find(|k| k.key_type == "HASH")
870                    .map(|k| k.attribute_name.clone())
871                    .unwrap_or_default();
872                let rk = gsi
873                    .key_schema
874                    .iter()
875                    .find(|k| k.key_type == "RANGE")
876                    .map(|k| k.attribute_name.clone());
877                (&table.items, hk, rk)
878            } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
879                let hk = lsi
880                    .key_schema
881                    .iter()
882                    .find(|k| k.key_type == "HASH")
883                    .map(|k| k.attribute_name.clone())
884                    .unwrap_or_default();
885                let rk = lsi
886                    .key_schema
887                    .iter()
888                    .find(|k| k.key_type == "RANGE")
889                    .map(|k| k.attribute_name.clone());
890                (&table.items, hk, rk)
891            } else {
892                return Err(AwsServiceError::aws_error(
893                    StatusCode::BAD_REQUEST,
894                    "ValidationException",
895                    format!("The table does not have the specified index: {idx_name}"),
896                ));
897            }
898        } else {
899            (
900                &table.items[..],
901                table.hash_key_name().to_string(),
902                table.range_key_name().map(|s| s.to_string()),
903            )
904        };
905
906        let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
907            .iter()
908            .filter(|item| {
909                if let Some(kc) = key_condition {
910                    evaluate_key_condition(
911                        kc,
912                        item,
913                        &hash_key_name,
914                        range_key_name.as_deref(),
915                        &expr_attr_names,
916                        &expr_attr_values,
917                    )
918                } else {
919                    true
920                }
921            })
922            .collect();
923
924        if let Some(ref rk) = range_key_name {
925            matched.sort_by(|a, b| {
926                let av = a.get(rk.as_str());
927                let bv = b.get(rk.as_str());
928                compare_attribute_values(av, bv)
929            });
930            if !scan_forward {
931                matched.reverse();
932            }
933        }
934
935        // For GSI queries, we need the table's primary key attributes to uniquely
936        // identify items (GSI keys are not unique).
937        let table_pk_hash = table.hash_key_name().to_string();
938        let table_pk_range = table.range_key_name().map(|s| s.to_string());
939        let is_gsi_query = index_name.is_some()
940            && (hash_key_name != table_pk_hash
941                || range_key_name.as_deref() != table_pk_range.as_deref());
942
943        // Apply ExclusiveStartKey: skip items up to and including the start key.
944        // For GSI queries the start key contains both index keys and table PK, so
945        // we must match on ALL of them to find the exact item.
946        if let Some(ref start_key) = exclusive_start_key {
947            if let Some(pos) = matched.iter().position(|item| {
948                let index_match =
949                    item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref());
950                if is_gsi_query {
951                    index_match
952                        && item_matches_key(
953                            item,
954                            start_key,
955                            &table_pk_hash,
956                            table_pk_range.as_deref(),
957                        )
958                } else {
959                    index_match
960                }
961            }) {
962                matched = matched.split_off(pos + 1);
963            }
964        }
965
966        if let Some(filter) = filter_expression {
967            matched.retain(|item| {
968                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
969            });
970        }
971
972        let scanned_count = matched.len();
973
974        let has_more = if let Some(lim) = limit {
975            let more = matched.len() > lim;
976            matched.truncate(lim);
977            more
978        } else {
979            false
980        };
981
982        // Build LastEvaluatedKey from the last returned item if there are more results.
983        // For GSI queries, include both the index keys and the table's primary key
984        // so the item can be uniquely identified on resume.
985        let last_evaluated_key = if has_more {
986            matched.last().map(|item| {
987                let mut key =
988                    extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref());
989                if is_gsi_query {
990                    let table_key =
991                        extract_key_for_schema(item, &table_pk_hash, table_pk_range.as_deref());
992                    key.extend(table_key);
993                }
994                key
995            })
996        } else {
997            None
998        };
999
1000        // Collect partition key values for contributor insights
1001        let insights_enabled = table.contributor_insights_status == "ENABLED";
1002        let pk_name = table.hash_key_name().to_string();
1003        let accessed_keys: Vec<String> = if insights_enabled {
1004            matched
1005                .iter()
1006                .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1007                .collect()
1008        } else {
1009            Vec::new()
1010        };
1011
1012        let items: Vec<Value> = matched
1013            .iter()
1014            .map(|item| {
1015                let projected = project_item(item, &body);
1016                json!(projected)
1017            })
1018            .collect();
1019
1020        let mut result = json!({
1021            "Items": items,
1022            "Count": items.len(),
1023            "ScannedCount": scanned_count,
1024        });
1025
1026        if let Some(lek) = last_evaluated_key {
1027            result["LastEvaluatedKey"] = json!(lek);
1028        }
1029
1030        drop(state);
1031
1032        if !accessed_keys.is_empty() {
1033            let mut state = self.state.write();
1034            if let Some(table) = state.tables.get_mut(table_name) {
1035                // Re-check insights status after acquiring write lock in case it
1036                // was disabled between the read and write lock acquisitions.
1037                if table.contributor_insights_status == "ENABLED" {
1038                    for key_str in accessed_keys {
1039                        *table
1040                            .contributor_insights_counters
1041                            .entry(key_str)
1042                            .or_insert(0) += 1;
1043                    }
1044                }
1045            }
1046        }
1047
1048        Self::ok_json(result)
1049    }
1050
1051    fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1052        let body = Self::parse_body(req)?;
1053        let table_name = require_str(&body, "TableName")?;
1054
1055        let state = self.state.read();
1056        let table = get_table(&state.tables, table_name)?;
1057
1058        let expr_attr_names = parse_expression_attribute_names(&body);
1059        let expr_attr_values = parse_expression_attribute_values(&body);
1060        let filter_expression = body["FilterExpression"].as_str();
1061        let limit = body["Limit"].as_i64().map(|l| l as usize);
1062        let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
1063            parse_key_map(&body["ExclusiveStartKey"]);
1064
1065        let hash_key_name = table.hash_key_name().to_string();
1066        let range_key_name = table.range_key_name().map(|s| s.to_string());
1067
1068        let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
1069
1070        // Apply ExclusiveStartKey: skip items up to and including the start key
1071        if let Some(ref start_key) = exclusive_start_key {
1072            if let Some(pos) = matched.iter().position(|item| {
1073                item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref())
1074            }) {
1075                matched = matched.split_off(pos + 1);
1076            }
1077        }
1078
1079        let scanned_count = matched.len();
1080
1081        if let Some(filter) = filter_expression {
1082            matched.retain(|item| {
1083                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
1084            });
1085        }
1086
1087        let has_more = if let Some(lim) = limit {
1088            let more = matched.len() > lim;
1089            matched.truncate(lim);
1090            more
1091        } else {
1092            false
1093        };
1094
1095        // Build LastEvaluatedKey from the last returned item if there are more results
1096        let last_evaluated_key = if has_more {
1097            matched
1098                .last()
1099                .map(|item| extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref()))
1100        } else {
1101            None
1102        };
1103
1104        // Collect partition key values for contributor insights
1105        let insights_enabled = table.contributor_insights_status == "ENABLED";
1106        let pk_name = table.hash_key_name().to_string();
1107        let accessed_keys: Vec<String> = if insights_enabled {
1108            matched
1109                .iter()
1110                .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1111                .collect()
1112        } else {
1113            Vec::new()
1114        };
1115
1116        let items: Vec<Value> = matched
1117            .iter()
1118            .map(|item| {
1119                let projected = project_item(item, &body);
1120                json!(projected)
1121            })
1122            .collect();
1123
1124        let mut result = json!({
1125            "Items": items,
1126            "Count": items.len(),
1127            "ScannedCount": scanned_count,
1128        });
1129
1130        if let Some(lek) = last_evaluated_key {
1131            result["LastEvaluatedKey"] = json!(lek);
1132        }
1133
1134        drop(state);
1135
1136        if !accessed_keys.is_empty() {
1137            let mut state = self.state.write();
1138            if let Some(table) = state.tables.get_mut(table_name) {
1139                // Re-check insights status after acquiring write lock in case it
1140                // was disabled between the read and write lock acquisitions.
1141                if table.contributor_insights_status == "ENABLED" {
1142                    for key_str in accessed_keys {
1143                        *table
1144                            .contributor_insights_counters
1145                            .entry(key_str)
1146                            .or_insert(0) += 1;
1147                    }
1148                }
1149            }
1150        }
1151
1152        Self::ok_json(result)
1153    }
1154
1155    // ── Batch Operations ────────────────────────────────────────────────
1156
1157    fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1158        let body = Self::parse_body(req)?;
1159
1160        validate_optional_enum_value(
1161            "returnConsumedCapacity",
1162            &body["ReturnConsumedCapacity"],
1163            &["INDEXES", "TOTAL", "NONE"],
1164        )?;
1165
1166        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1167
1168        let request_items = body["RequestItems"]
1169            .as_object()
1170            .ok_or_else(|| {
1171                AwsServiceError::aws_error(
1172                    StatusCode::BAD_REQUEST,
1173                    "ValidationException",
1174                    "RequestItems is required",
1175                )
1176            })?
1177            .clone();
1178
1179        let state = self.state.read();
1180        let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
1181        let mut consumed_capacity: Vec<Value> = Vec::new();
1182
1183        for (table_name, params) in &request_items {
1184            let table = get_table(&state.tables, table_name)?;
1185            let keys = params["Keys"].as_array().ok_or_else(|| {
1186                AwsServiceError::aws_error(
1187                    StatusCode::BAD_REQUEST,
1188                    "ValidationException",
1189                    "Keys is required",
1190                )
1191            })?;
1192
1193            let mut items = Vec::new();
1194            for key_val in keys {
1195                let key: HashMap<String, AttributeValue> =
1196                    serde_json::from_value(key_val.clone()).unwrap_or_default();
1197                if let Some(idx) = table.find_item_index(&key) {
1198                    items.push(json!(table.items[idx]));
1199                }
1200            }
1201            responses.insert(table_name.clone(), items);
1202
1203            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1204                consumed_capacity.push(json!({
1205                    "TableName": table_name,
1206                    "CapacityUnits": 1.0,
1207                }));
1208            }
1209        }
1210
1211        let mut result = json!({
1212            "Responses": responses,
1213            "UnprocessedKeys": {},
1214        });
1215
1216        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1217            result["ConsumedCapacity"] = json!(consumed_capacity);
1218        }
1219
1220        Self::ok_json(result)
1221    }
1222
1223    fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1224        let body = Self::parse_body(req)?;
1225
1226        validate_optional_enum_value(
1227            "returnConsumedCapacity",
1228            &body["ReturnConsumedCapacity"],
1229            &["INDEXES", "TOTAL", "NONE"],
1230        )?;
1231        validate_optional_enum_value(
1232            "returnItemCollectionMetrics",
1233            &body["ReturnItemCollectionMetrics"],
1234            &["SIZE", "NONE"],
1235        )?;
1236
1237        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1238        let return_icm = body["ReturnItemCollectionMetrics"]
1239            .as_str()
1240            .unwrap_or("NONE");
1241
1242        let request_items = body["RequestItems"]
1243            .as_object()
1244            .ok_or_else(|| {
1245                AwsServiceError::aws_error(
1246                    StatusCode::BAD_REQUEST,
1247                    "ValidationException",
1248                    "RequestItems is required",
1249                )
1250            })?
1251            .clone();
1252
1253        let mut state = self.state.write();
1254        let mut consumed_capacity: Vec<Value> = Vec::new();
1255        let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
1256
1257        for (table_name, requests) in &request_items {
1258            let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
1259                AwsServiceError::aws_error(
1260                    StatusCode::BAD_REQUEST,
1261                    "ResourceNotFoundException",
1262                    format!("Requested resource not found: Table: {table_name} not found"),
1263                )
1264            })?;
1265
1266            let reqs = requests.as_array().ok_or_else(|| {
1267                AwsServiceError::aws_error(
1268                    StatusCode::BAD_REQUEST,
1269                    "ValidationException",
1270                    "Request list must be an array",
1271                )
1272            })?;
1273
1274            for request in reqs {
1275                if let Some(put_req) = request.get("PutRequest") {
1276                    let item: HashMap<String, AttributeValue> =
1277                        serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
1278                    let key = extract_key(table, &item);
1279                    if let Some(idx) = table.find_item_index(&key) {
1280                        table.items[idx] = item;
1281                    } else {
1282                        table.items.push(item);
1283                    }
1284                } else if let Some(del_req) = request.get("DeleteRequest") {
1285                    let key: HashMap<String, AttributeValue> =
1286                        serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
1287                    if let Some(idx) = table.find_item_index(&key) {
1288                        table.items.remove(idx);
1289                    }
1290                }
1291            }
1292
1293            table.recalculate_stats();
1294
1295            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1296                consumed_capacity.push(json!({
1297                    "TableName": table_name,
1298                    "CapacityUnits": 1.0,
1299                }));
1300            }
1301
1302            if return_icm == "SIZE" {
1303                item_collection_metrics.insert(table_name.clone(), vec![]);
1304            }
1305        }
1306
1307        let mut result = json!({
1308            "UnprocessedItems": {},
1309        });
1310
1311        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1312            result["ConsumedCapacity"] = json!(consumed_capacity);
1313        }
1314
1315        if return_icm == "SIZE" {
1316            result["ItemCollectionMetrics"] = json!(item_collection_metrics);
1317        }
1318
1319        Self::ok_json(result)
1320    }
1321
1322    // ── Tags ────────────────────────────────────────────────────────────
1323
1324    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1325        let body = Self::parse_body(req)?;
1326        let resource_arn = require_str(&body, "ResourceArn")?;
1327        validate_required("Tags", &body["Tags"])?;
1328
1329        let mut state = self.state.write();
1330        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1331
1332        fakecloud_core::tags::apply_tags(&mut table.tags, &body, "Tags", "Key", "Value").map_err(
1333            |f| {
1334                AwsServiceError::aws_error(
1335                    StatusCode::BAD_REQUEST,
1336                    "ValidationException",
1337                    format!("{f} must be a list"),
1338                )
1339            },
1340        )?;
1341
1342        Self::ok_json(json!({}))
1343    }
1344
1345    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1346        let body = Self::parse_body(req)?;
1347        let resource_arn = require_str(&body, "ResourceArn")?;
1348        validate_required("TagKeys", &body["TagKeys"])?;
1349
1350        let mut state = self.state.write();
1351        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1352
1353        fakecloud_core::tags::remove_tags(&mut table.tags, &body, "TagKeys").map_err(|f| {
1354            AwsServiceError::aws_error(
1355                StatusCode::BAD_REQUEST,
1356                "ValidationException",
1357                format!("{f} must be a list"),
1358            )
1359        })?;
1360
1361        Self::ok_json(json!({}))
1362    }
1363
1364    fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1365        let body = Self::parse_body(req)?;
1366        let resource_arn = require_str(&body, "ResourceArn")?;
1367
1368        let state = self.state.read();
1369        let table = find_table_by_arn(&state.tables, resource_arn)?;
1370
1371        let tags = fakecloud_core::tags::tags_to_json(&table.tags, "Key", "Value");
1372
1373        Self::ok_json(json!({ "Tags": tags }))
1374    }
1375
1376    // ── Transactions ────────────────────────────────────────────────────
1377
1378    fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1379        let body = Self::parse_body(req)?;
1380        validate_optional_enum_value(
1381            "returnConsumedCapacity",
1382            &body["ReturnConsumedCapacity"],
1383            &["INDEXES", "TOTAL", "NONE"],
1384        )?;
1385        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1386            AwsServiceError::aws_error(
1387                StatusCode::BAD_REQUEST,
1388                "ValidationException",
1389                "TransactItems is required",
1390            )
1391        })?;
1392
1393        let state = self.state.read();
1394        let mut responses: Vec<Value> = Vec::new();
1395
1396        for ti in transact_items {
1397            let get = &ti["Get"];
1398            let table_name = get["TableName"].as_str().ok_or_else(|| {
1399                AwsServiceError::aws_error(
1400                    StatusCode::BAD_REQUEST,
1401                    "ValidationException",
1402                    "TableName is required in Get",
1403                )
1404            })?;
1405            let key: HashMap<String, AttributeValue> =
1406                serde_json::from_value(get["Key"].clone()).unwrap_or_default();
1407
1408            let table = get_table(&state.tables, table_name)?;
1409            match table.find_item_index(&key) {
1410                Some(idx) => {
1411                    responses.push(json!({ "Item": table.items[idx] }));
1412                }
1413                None => {
1414                    responses.push(json!({}));
1415                }
1416            }
1417        }
1418
1419        Self::ok_json(json!({ "Responses": responses }))
1420    }
1421
1422    fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1423        let body = Self::parse_body(req)?;
1424        validate_optional_string_length(
1425            "clientRequestToken",
1426            body["ClientRequestToken"].as_str(),
1427            1,
1428            36,
1429        )?;
1430        validate_optional_enum_value(
1431            "returnConsumedCapacity",
1432            &body["ReturnConsumedCapacity"],
1433            &["INDEXES", "TOTAL", "NONE"],
1434        )?;
1435        validate_optional_enum_value(
1436            "returnItemCollectionMetrics",
1437            &body["ReturnItemCollectionMetrics"],
1438            &["SIZE", "NONE"],
1439        )?;
1440        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1441            AwsServiceError::aws_error(
1442                StatusCode::BAD_REQUEST,
1443                "ValidationException",
1444                "TransactItems is required",
1445            )
1446        })?;
1447
1448        let mut state = self.state.write();
1449
1450        // First pass: validate all conditions
1451        let mut cancellation_reasons: Vec<Value> = Vec::new();
1452        let mut any_failed = false;
1453
1454        for ti in transact_items {
1455            if let Some(put) = ti.get("Put") {
1456                let table_name = put["TableName"].as_str().unwrap_or_default();
1457                let item: HashMap<String, AttributeValue> =
1458                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1459                let condition = put["ConditionExpression"].as_str();
1460
1461                if let Some(cond) = condition {
1462                    let table = get_table(&state.tables, table_name)?;
1463                    let expr_attr_names = parse_expression_attribute_names(put);
1464                    let expr_attr_values = parse_expression_attribute_values(put);
1465                    let key = extract_key(table, &item);
1466                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1467                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1468                        .is_err()
1469                    {
1470                        cancellation_reasons.push(json!({
1471                            "Code": "ConditionalCheckFailed",
1472                            "Message": "The conditional request failed"
1473                        }));
1474                        any_failed = true;
1475                        continue;
1476                    }
1477                }
1478                cancellation_reasons.push(json!({ "Code": "None" }));
1479            } else if let Some(delete) = ti.get("Delete") {
1480                let table_name = delete["TableName"].as_str().unwrap_or_default();
1481                let key: HashMap<String, AttributeValue> =
1482                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1483                let condition = delete["ConditionExpression"].as_str();
1484
1485                if let Some(cond) = condition {
1486                    let table = get_table(&state.tables, table_name)?;
1487                    let expr_attr_names = parse_expression_attribute_names(delete);
1488                    let expr_attr_values = parse_expression_attribute_values(delete);
1489                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1490                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1491                        .is_err()
1492                    {
1493                        cancellation_reasons.push(json!({
1494                            "Code": "ConditionalCheckFailed",
1495                            "Message": "The conditional request failed"
1496                        }));
1497                        any_failed = true;
1498                        continue;
1499                    }
1500                }
1501                cancellation_reasons.push(json!({ "Code": "None" }));
1502            } else if let Some(update) = ti.get("Update") {
1503                let table_name = update["TableName"].as_str().unwrap_or_default();
1504                let key: HashMap<String, AttributeValue> =
1505                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1506                let condition = update["ConditionExpression"].as_str();
1507
1508                if let Some(cond) = condition {
1509                    let table = get_table(&state.tables, table_name)?;
1510                    let expr_attr_names = parse_expression_attribute_names(update);
1511                    let expr_attr_values = parse_expression_attribute_values(update);
1512                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1513                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1514                        .is_err()
1515                    {
1516                        cancellation_reasons.push(json!({
1517                            "Code": "ConditionalCheckFailed",
1518                            "Message": "The conditional request failed"
1519                        }));
1520                        any_failed = true;
1521                        continue;
1522                    }
1523                }
1524                cancellation_reasons.push(json!({ "Code": "None" }));
1525            } else if let Some(check) = ti.get("ConditionCheck") {
1526                let table_name = check["TableName"].as_str().unwrap_or_default();
1527                let key: HashMap<String, AttributeValue> =
1528                    serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1529                let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1530
1531                let table = get_table(&state.tables, table_name)?;
1532                let expr_attr_names = parse_expression_attribute_names(check);
1533                let expr_attr_values = parse_expression_attribute_values(check);
1534                let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1535                if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1536                {
1537                    cancellation_reasons.push(json!({
1538                        "Code": "ConditionalCheckFailed",
1539                        "Message": "The conditional request failed"
1540                    }));
1541                    any_failed = true;
1542                    continue;
1543                }
1544                cancellation_reasons.push(json!({ "Code": "None" }));
1545            } else {
1546                cancellation_reasons.push(json!({ "Code": "None" }));
1547            }
1548        }
1549
1550        if any_failed {
1551            let error_body = json!({
1552                "__type": "TransactionCanceledException",
1553                "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1554                "CancellationReasons": cancellation_reasons
1555            });
1556            return Ok(AwsResponse::json(
1557                StatusCode::BAD_REQUEST,
1558                serde_json::to_vec(&error_body).unwrap(),
1559            ));
1560        }
1561
1562        // Second pass: apply all writes
1563        for ti in transact_items {
1564            if let Some(put) = ti.get("Put") {
1565                let table_name = put["TableName"].as_str().unwrap_or_default();
1566                let item: HashMap<String, AttributeValue> =
1567                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1568                let table = get_table_mut(&mut state.tables, table_name)?;
1569                let key = extract_key(table, &item);
1570                if let Some(idx) = table.find_item_index(&key) {
1571                    table.items[idx] = item;
1572                } else {
1573                    table.items.push(item);
1574                }
1575                table.recalculate_stats();
1576            } else if let Some(delete) = ti.get("Delete") {
1577                let table_name = delete["TableName"].as_str().unwrap_or_default();
1578                let key: HashMap<String, AttributeValue> =
1579                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1580                let table = get_table_mut(&mut state.tables, table_name)?;
1581                if let Some(idx) = table.find_item_index(&key) {
1582                    table.items.remove(idx);
1583                }
1584                table.recalculate_stats();
1585            } else if let Some(update) = ti.get("Update") {
1586                let table_name = update["TableName"].as_str().unwrap_or_default();
1587                let key: HashMap<String, AttributeValue> =
1588                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1589                let update_expression = update["UpdateExpression"].as_str();
1590                let expr_attr_names = parse_expression_attribute_names(update);
1591                let expr_attr_values = parse_expression_attribute_values(update);
1592
1593                let table = get_table_mut(&mut state.tables, table_name)?;
1594                let idx = match table.find_item_index(&key) {
1595                    Some(i) => i,
1596                    None => {
1597                        let mut new_item = HashMap::new();
1598                        for (k, v) in &key {
1599                            new_item.insert(k.clone(), v.clone());
1600                        }
1601                        table.items.push(new_item);
1602                        table.items.len() - 1
1603                    }
1604                };
1605
1606                if let Some(expr) = update_expression {
1607                    apply_update_expression(
1608                        &mut table.items[idx],
1609                        expr,
1610                        &expr_attr_names,
1611                        &expr_attr_values,
1612                    )?;
1613                }
1614                table.recalculate_stats();
1615            }
1616            // ConditionCheck: no write needed
1617        }
1618
1619        Self::ok_json(json!({}))
1620    }
1621
1622    fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1623        let body = Self::parse_body(req)?;
1624        let statement = require_str(&body, "Statement")?;
1625        let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1626
1627        execute_partiql_statement(&self.state, statement, &parameters)
1628    }
1629
1630    fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1631        let body = Self::parse_body(req)?;
1632        validate_optional_enum_value(
1633            "returnConsumedCapacity",
1634            &body["ReturnConsumedCapacity"],
1635            &["INDEXES", "TOTAL", "NONE"],
1636        )?;
1637        let statements = body["Statements"].as_array().ok_or_else(|| {
1638            AwsServiceError::aws_error(
1639                StatusCode::BAD_REQUEST,
1640                "ValidationException",
1641                "Statements is required",
1642            )
1643        })?;
1644
1645        let mut responses: Vec<Value> = Vec::new();
1646        for stmt_obj in statements {
1647            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1648            let parameters = stmt_obj["Parameters"]
1649                .as_array()
1650                .cloned()
1651                .unwrap_or_default();
1652
1653            match execute_partiql_statement(&self.state, statement, &parameters) {
1654                Ok(resp) => {
1655                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1656                    responses.push(resp_body);
1657                }
1658                Err(e) => {
1659                    responses.push(json!({
1660                        "Error": {
1661                            "Code": "ValidationException",
1662                            "Message": e.to_string()
1663                        }
1664                    }));
1665                }
1666            }
1667        }
1668
1669        Self::ok_json(json!({ "Responses": responses }))
1670    }
1671
1672    fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1673        let body = Self::parse_body(req)?;
1674        validate_optional_string_length(
1675            "clientRequestToken",
1676            body["ClientRequestToken"].as_str(),
1677            1,
1678            36,
1679        )?;
1680        validate_optional_enum_value(
1681            "returnConsumedCapacity",
1682            &body["ReturnConsumedCapacity"],
1683            &["INDEXES", "TOTAL", "NONE"],
1684        )?;
1685        let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1686            AwsServiceError::aws_error(
1687                StatusCode::BAD_REQUEST,
1688                "ValidationException",
1689                "TransactStatements is required",
1690            )
1691        })?;
1692
1693        // Collect all results; if any fail, return TransactionCanceledException
1694        let mut results: Vec<Result<Value, String>> = Vec::new();
1695        for stmt_obj in transact_statements {
1696            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1697            let parameters = stmt_obj["Parameters"]
1698                .as_array()
1699                .cloned()
1700                .unwrap_or_default();
1701
1702            match execute_partiql_statement(&self.state, statement, &parameters) {
1703                Ok(resp) => {
1704                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1705                    results.push(Ok(resp_body));
1706                }
1707                Err(e) => {
1708                    results.push(Err(e.to_string()));
1709                }
1710            }
1711        }
1712
1713        let any_failed = results.iter().any(|r| r.is_err());
1714        if any_failed {
1715            let reasons: Vec<Value> = results
1716                .iter()
1717                .map(|r| match r {
1718                    Ok(_) => json!({ "Code": "None" }),
1719                    Err(msg) => json!({
1720                        "Code": "ValidationException",
1721                        "Message": msg
1722                    }),
1723                })
1724                .collect();
1725            let error_body = json!({
1726                "__type": "TransactionCanceledException",
1727                "message": "Transaction cancelled due to validation errors",
1728                "CancellationReasons": reasons
1729            });
1730            return Ok(AwsResponse::json(
1731                StatusCode::BAD_REQUEST,
1732                serde_json::to_vec(&error_body).unwrap(),
1733            ));
1734        }
1735
1736        let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1737        Self::ok_json(json!({ "Responses": responses }))
1738    }
1739
1740    // ── TTL ─────────────────────────────────────────────────────────────
1741
1742    fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1743        let body = Self::parse_body(req)?;
1744        let table_name = require_str(&body, "TableName")?;
1745        let spec = &body["TimeToLiveSpecification"];
1746        let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1747            AwsServiceError::aws_error(
1748                StatusCode::BAD_REQUEST,
1749                "ValidationException",
1750                "TimeToLiveSpecification.AttributeName is required",
1751            )
1752        })?;
1753        let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1754            AwsServiceError::aws_error(
1755                StatusCode::BAD_REQUEST,
1756                "ValidationException",
1757                "TimeToLiveSpecification.Enabled is required",
1758            )
1759        })?;
1760
1761        let mut state = self.state.write();
1762        let table = get_table_mut(&mut state.tables, table_name)?;
1763
1764        if enabled {
1765            table.ttl_attribute = Some(attr_name.to_string());
1766            table.ttl_enabled = true;
1767        } else {
1768            table.ttl_enabled = false;
1769        }
1770
1771        Self::ok_json(json!({
1772            "TimeToLiveSpecification": {
1773                "AttributeName": attr_name,
1774                "Enabled": enabled
1775            }
1776        }))
1777    }
1778
1779    fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1780        let body = Self::parse_body(req)?;
1781        let table_name = require_str(&body, "TableName")?;
1782
1783        let state = self.state.read();
1784        let table = get_table(&state.tables, table_name)?;
1785
1786        let status = if table.ttl_enabled {
1787            "ENABLED"
1788        } else {
1789            "DISABLED"
1790        };
1791
1792        let mut desc = json!({
1793            "TimeToLiveDescription": {
1794                "TimeToLiveStatus": status
1795            }
1796        });
1797
1798        if let Some(ref attr) = table.ttl_attribute {
1799            desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1800        }
1801
1802        Self::ok_json(desc)
1803    }
1804
1805    // ── Resource Policies ───────────────────────────────────────────────
1806
1807    fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1808        let body = Self::parse_body(req)?;
1809        let resource_arn = require_str(&body, "ResourceArn")?;
1810        let policy = require_str(&body, "Policy")?;
1811
1812        let mut state = self.state.write();
1813        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1814        table.resource_policy = Some(policy.to_string());
1815
1816        let revision_id = uuid::Uuid::new_v4().to_string();
1817        Self::ok_json(json!({ "RevisionId": revision_id }))
1818    }
1819
1820    fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1821        let body = Self::parse_body(req)?;
1822        let resource_arn = require_str(&body, "ResourceArn")?;
1823
1824        let state = self.state.read();
1825        let table = find_table_by_arn(&state.tables, resource_arn)?;
1826
1827        match &table.resource_policy {
1828            Some(policy) => {
1829                let revision_id = uuid::Uuid::new_v4().to_string();
1830                Self::ok_json(json!({
1831                    "Policy": policy,
1832                    "RevisionId": revision_id
1833                }))
1834            }
1835            None => Self::ok_json(json!({ "Policy": null })),
1836        }
1837    }
1838
1839    fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1840        let body = Self::parse_body(req)?;
1841        let resource_arn = require_str(&body, "ResourceArn")?;
1842
1843        let mut state = self.state.write();
1844        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1845        table.resource_policy = None;
1846
1847        Self::ok_json(json!({}))
1848    }
1849
1850    // ── Stubs ──────────────────────────────────────────────────────────
1851
1852    fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1853        Self::ok_json(json!({
1854            "Endpoints": [{
1855                "Address": "dynamodb.us-east-1.amazonaws.com",
1856                "CachePeriodInMinutes": 1440
1857            }]
1858        }))
1859    }
1860
1861    fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1862        Self::ok_json(json!({
1863            "AccountMaxReadCapacityUnits": 80000,
1864            "AccountMaxWriteCapacityUnits": 80000,
1865            "TableMaxReadCapacityUnits": 40000,
1866            "TableMaxWriteCapacityUnits": 40000
1867        }))
1868    }
1869
1870    // ── Backups ────────────────────────────────────────────────────────
1871
1872    fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1873        let body = Self::parse_body(req)?;
1874        let table_name = require_str(&body, "TableName")?;
1875        let backup_name = require_str(&body, "BackupName")?;
1876
1877        let mut state = self.state.write();
1878        let table = get_table(&state.tables, table_name)?;
1879
1880        let backup_arn = format!(
1881            "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1882            state.region,
1883            state.account_id,
1884            table_name,
1885            Utc::now().format("%Y%m%d%H%M%S")
1886        );
1887        let now = Utc::now();
1888
1889        let backup = BackupDescription {
1890            backup_arn: backup_arn.clone(),
1891            backup_name: backup_name.to_string(),
1892            table_name: table_name.to_string(),
1893            table_arn: table.arn.clone(),
1894            backup_status: "AVAILABLE".to_string(),
1895            backup_type: "USER".to_string(),
1896            backup_creation_date: now,
1897            key_schema: table.key_schema.clone(),
1898            attribute_definitions: table.attribute_definitions.clone(),
1899            provisioned_throughput: table.provisioned_throughput.clone(),
1900            billing_mode: table.billing_mode.clone(),
1901            item_count: table.item_count,
1902            size_bytes: table.size_bytes,
1903            items: table.items.clone(),
1904        };
1905
1906        state.backups.insert(backup_arn.clone(), backup);
1907
1908        Self::ok_json(json!({
1909            "BackupDetails": {
1910                "BackupArn": backup_arn,
1911                "BackupName": backup_name,
1912                "BackupStatus": "AVAILABLE",
1913                "BackupType": "USER",
1914                "BackupCreationDateTime": now.timestamp() as f64,
1915                "BackupSizeBytes": 0
1916            }
1917        }))
1918    }
1919
1920    fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1921        let body = Self::parse_body(req)?;
1922        let backup_arn = require_str(&body, "BackupArn")?;
1923
1924        let mut state = self.state.write();
1925        let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1926            AwsServiceError::aws_error(
1927                StatusCode::BAD_REQUEST,
1928                "BackupNotFoundException",
1929                format!("Backup not found: {backup_arn}"),
1930            )
1931        })?;
1932
1933        Self::ok_json(json!({
1934            "BackupDescription": {
1935                "BackupDetails": {
1936                    "BackupArn": backup.backup_arn,
1937                    "BackupName": backup.backup_name,
1938                    "BackupStatus": "DELETED",
1939                    "BackupType": backup.backup_type,
1940                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1941                    "BackupSizeBytes": backup.size_bytes
1942                },
1943                "SourceTableDetails": {
1944                    "TableName": backup.table_name,
1945                    "TableArn": backup.table_arn,
1946                    "TableId": uuid::Uuid::new_v4().to_string(),
1947                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1948                        "AttributeName": ks.attribute_name,
1949                        "KeyType": ks.key_type
1950                    })).collect::<Vec<_>>(),
1951                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1952                    "ProvisionedThroughput": {
1953                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1954                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1955                    },
1956                    "ItemCount": backup.item_count,
1957                    "BillingMode": backup.billing_mode,
1958                    "TableSizeBytes": backup.size_bytes
1959                },
1960                "SourceTableFeatureDetails": {}
1961            }
1962        }))
1963    }
1964
1965    fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1966        let body = Self::parse_body(req)?;
1967        let backup_arn = require_str(&body, "BackupArn")?;
1968
1969        let state = self.state.read();
1970        let backup = state.backups.get(backup_arn).ok_or_else(|| {
1971            AwsServiceError::aws_error(
1972                StatusCode::BAD_REQUEST,
1973                "BackupNotFoundException",
1974                format!("Backup not found: {backup_arn}"),
1975            )
1976        })?;
1977
1978        Self::ok_json(json!({
1979            "BackupDescription": {
1980                "BackupDetails": {
1981                    "BackupArn": backup.backup_arn,
1982                    "BackupName": backup.backup_name,
1983                    "BackupStatus": backup.backup_status,
1984                    "BackupType": backup.backup_type,
1985                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1986                    "BackupSizeBytes": backup.size_bytes
1987                },
1988                "SourceTableDetails": {
1989                    "TableName": backup.table_name,
1990                    "TableArn": backup.table_arn,
1991                    "TableId": uuid::Uuid::new_v4().to_string(),
1992                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1993                        "AttributeName": ks.attribute_name,
1994                        "KeyType": ks.key_type
1995                    })).collect::<Vec<_>>(),
1996                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1997                    "ProvisionedThroughput": {
1998                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1999                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
2000                    },
2001                    "ItemCount": backup.item_count,
2002                    "BillingMode": backup.billing_mode,
2003                    "TableSizeBytes": backup.size_bytes
2004                },
2005                "SourceTableFeatureDetails": {}
2006            }
2007        }))
2008    }
2009
2010    fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2011        let body = Self::parse_body(req)?;
2012        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2013        validate_optional_string_length(
2014            "exclusiveStartBackupArn",
2015            body["ExclusiveStartBackupArn"].as_str(),
2016            37,
2017            1024,
2018        )?;
2019        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2020        validate_optional_enum_value(
2021            "backupType",
2022            &body["BackupType"],
2023            &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
2024        )?;
2025        let table_name = body["TableName"].as_str();
2026
2027        let state = self.state.read();
2028        let summaries: Vec<Value> = state
2029            .backups
2030            .values()
2031            .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
2032            .map(|b| {
2033                json!({
2034                    "TableName": b.table_name,
2035                    "TableArn": b.table_arn,
2036                    "BackupArn": b.backup_arn,
2037                    "BackupName": b.backup_name,
2038                    "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
2039                    "BackupStatus": b.backup_status,
2040                    "BackupType": b.backup_type,
2041                    "BackupSizeBytes": b.size_bytes
2042                })
2043            })
2044            .collect();
2045
2046        Self::ok_json(json!({
2047            "BackupSummaries": summaries
2048        }))
2049    }
2050
2051    fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2052        let body = Self::parse_body(req)?;
2053        let backup_arn = require_str(&body, "BackupArn")?;
2054        let target_table_name = require_str(&body, "TargetTableName")?;
2055
2056        let mut state = self.state.write();
2057        let backup = state.backups.get(backup_arn).ok_or_else(|| {
2058            AwsServiceError::aws_error(
2059                StatusCode::BAD_REQUEST,
2060                "BackupNotFoundException",
2061                format!("Backup not found: {backup_arn}"),
2062            )
2063        })?;
2064
2065        if state.tables.contains_key(target_table_name) {
2066            return Err(AwsServiceError::aws_error(
2067                StatusCode::BAD_REQUEST,
2068                "TableAlreadyExistsException",
2069                format!("Table already exists: {target_table_name}"),
2070            ));
2071        }
2072
2073        let now = Utc::now();
2074        let arn = format!(
2075            "arn:aws:dynamodb:{}:{}:table/{}",
2076            state.region, state.account_id, target_table_name
2077        );
2078
2079        let restored_items = backup.items.clone();
2080        let mut table = DynamoTable {
2081            name: target_table_name.to_string(),
2082            arn: arn.clone(),
2083            key_schema: backup.key_schema.clone(),
2084            attribute_definitions: backup.attribute_definitions.clone(),
2085            provisioned_throughput: backup.provisioned_throughput.clone(),
2086            items: restored_items,
2087            gsi: Vec::new(),
2088            lsi: Vec::new(),
2089            tags: HashMap::new(),
2090            created_at: now,
2091            status: "ACTIVE".to_string(),
2092            item_count: 0,
2093            size_bytes: 0,
2094            billing_mode: backup.billing_mode.clone(),
2095            ttl_attribute: None,
2096            ttl_enabled: false,
2097            resource_policy: None,
2098            pitr_enabled: false,
2099            kinesis_destinations: Vec::new(),
2100            contributor_insights_status: "DISABLED".to_string(),
2101            contributor_insights_counters: HashMap::new(),
2102            stream_enabled: false,
2103            stream_view_type: None,
2104            stream_arn: None,
2105            stream_records: Arc::new(RwLock::new(Vec::new())),
2106            sse_type: None,
2107            sse_kms_key_arn: None,
2108        };
2109        table.recalculate_stats();
2110
2111        let desc = build_table_description(&table);
2112        state.tables.insert(target_table_name.to_string(), table);
2113
2114        Self::ok_json(json!({
2115            "TableDescription": desc
2116        }))
2117    }
2118
2119    fn restore_table_to_point_in_time(
2120        &self,
2121        req: &AwsRequest,
2122    ) -> Result<AwsResponse, AwsServiceError> {
2123        let body = Self::parse_body(req)?;
2124        let target_table_name = require_str(&body, "TargetTableName")?;
2125        let source_table_name = body["SourceTableName"].as_str();
2126        let source_table_arn = body["SourceTableArn"].as_str();
2127
2128        let mut state = self.state.write();
2129
2130        // Resolve source table
2131        let source = if let Some(name) = source_table_name {
2132            get_table(&state.tables, name)?.clone()
2133        } else if let Some(arn) = source_table_arn {
2134            find_table_by_arn(&state.tables, arn)?.clone()
2135        } else {
2136            return Err(AwsServiceError::aws_error(
2137                StatusCode::BAD_REQUEST,
2138                "ValidationException",
2139                "SourceTableName or SourceTableArn is required",
2140            ));
2141        };
2142
2143        if state.tables.contains_key(target_table_name) {
2144            return Err(AwsServiceError::aws_error(
2145                StatusCode::BAD_REQUEST,
2146                "TableAlreadyExistsException",
2147                format!("Table already exists: {target_table_name}"),
2148            ));
2149        }
2150
2151        let now = Utc::now();
2152        let arn = format!(
2153            "arn:aws:dynamodb:{}:{}:table/{}",
2154            state.region, state.account_id, target_table_name
2155        );
2156
2157        let mut table = DynamoTable {
2158            name: target_table_name.to_string(),
2159            arn: arn.clone(),
2160            key_schema: source.key_schema.clone(),
2161            attribute_definitions: source.attribute_definitions.clone(),
2162            provisioned_throughput: source.provisioned_throughput.clone(),
2163            items: source.items.clone(),
2164            gsi: Vec::new(),
2165            lsi: Vec::new(),
2166            tags: HashMap::new(),
2167            created_at: now,
2168            status: "ACTIVE".to_string(),
2169            item_count: 0,
2170            size_bytes: 0,
2171            billing_mode: source.billing_mode.clone(),
2172            ttl_attribute: None,
2173            ttl_enabled: false,
2174            resource_policy: None,
2175            pitr_enabled: false,
2176            kinesis_destinations: Vec::new(),
2177            contributor_insights_status: "DISABLED".to_string(),
2178            contributor_insights_counters: HashMap::new(),
2179            stream_enabled: false,
2180            stream_view_type: None,
2181            stream_arn: None,
2182            stream_records: Arc::new(RwLock::new(Vec::new())),
2183            sse_type: None,
2184            sse_kms_key_arn: None,
2185        };
2186        table.recalculate_stats();
2187
2188        let desc = build_table_description(&table);
2189        state.tables.insert(target_table_name.to_string(), table);
2190
2191        Self::ok_json(json!({
2192            "TableDescription": desc
2193        }))
2194    }
2195
2196    fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2197        let body = Self::parse_body(req)?;
2198        let table_name = require_str(&body, "TableName")?;
2199
2200        let pitr_spec = body["PointInTimeRecoverySpecification"]
2201            .as_object()
2202            .ok_or_else(|| {
2203                AwsServiceError::aws_error(
2204                    StatusCode::BAD_REQUEST,
2205                    "ValidationException",
2206                    "PointInTimeRecoverySpecification is required",
2207                )
2208            })?;
2209        let enabled = pitr_spec
2210            .get("PointInTimeRecoveryEnabled")
2211            .and_then(|v| v.as_bool())
2212            .unwrap_or(false);
2213
2214        let mut state = self.state.write();
2215        let table = get_table_mut(&mut state.tables, table_name)?;
2216        table.pitr_enabled = enabled;
2217
2218        let status = if enabled { "ENABLED" } else { "DISABLED" };
2219        Self::ok_json(json!({
2220            "ContinuousBackupsDescription": {
2221                "ContinuousBackupsStatus": status,
2222                "PointInTimeRecoveryDescription": {
2223                    "PointInTimeRecoveryStatus": status,
2224                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2225                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
2226                }
2227            }
2228        }))
2229    }
2230
2231    fn describe_continuous_backups(
2232        &self,
2233        req: &AwsRequest,
2234    ) -> Result<AwsResponse, AwsServiceError> {
2235        let body = Self::parse_body(req)?;
2236        let table_name = require_str(&body, "TableName")?;
2237
2238        let state = self.state.read();
2239        let table = get_table(&state.tables, table_name)?;
2240
2241        let status = if table.pitr_enabled {
2242            "ENABLED"
2243        } else {
2244            "DISABLED"
2245        };
2246        Self::ok_json(json!({
2247            "ContinuousBackupsDescription": {
2248                "ContinuousBackupsStatus": status,
2249                "PointInTimeRecoveryDescription": {
2250                    "PointInTimeRecoveryStatus": status,
2251                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2252                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
2253                }
2254            }
2255        }))
2256    }
2257
2258    // ── Global Tables ──────────────────────────────────────────────────
2259
2260    fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2261        let body = Self::parse_body(req)?;
2262        let global_table_name = require_str(&body, "GlobalTableName")?;
2263        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2264
2265        let replication_group = body["ReplicationGroup"]
2266            .as_array()
2267            .ok_or_else(|| {
2268                AwsServiceError::aws_error(
2269                    StatusCode::BAD_REQUEST,
2270                    "ValidationException",
2271                    "ReplicationGroup is required",
2272                )
2273            })?
2274            .iter()
2275            .filter_map(|r| {
2276                r["RegionName"].as_str().map(|rn| ReplicaDescription {
2277                    region_name: rn.to_string(),
2278                    replica_status: "ACTIVE".to_string(),
2279                })
2280            })
2281            .collect::<Vec<_>>();
2282
2283        let mut state = self.state.write();
2284
2285        if state.global_tables.contains_key(global_table_name) {
2286            return Err(AwsServiceError::aws_error(
2287                StatusCode::BAD_REQUEST,
2288                "GlobalTableAlreadyExistsException",
2289                format!("Global table already exists: {global_table_name}"),
2290            ));
2291        }
2292
2293        let arn = format!(
2294            "arn:aws:dynamodb::{}:global-table/{}",
2295            state.account_id, global_table_name
2296        );
2297        let now = Utc::now();
2298
2299        let gt = GlobalTableDescription {
2300            global_table_name: global_table_name.to_string(),
2301            global_table_arn: arn.clone(),
2302            global_table_status: "ACTIVE".to_string(),
2303            creation_date: now,
2304            replication_group: replication_group.clone(),
2305        };
2306
2307        state
2308            .global_tables
2309            .insert(global_table_name.to_string(), gt);
2310
2311        Self::ok_json(json!({
2312            "GlobalTableDescription": {
2313                "GlobalTableName": global_table_name,
2314                "GlobalTableArn": arn,
2315                "GlobalTableStatus": "ACTIVE",
2316                "CreationDateTime": now.timestamp() as f64,
2317                "ReplicationGroup": replication_group.iter().map(|r| json!({
2318                    "RegionName": r.region_name,
2319                    "ReplicaStatus": r.replica_status
2320                })).collect::<Vec<_>>()
2321            }
2322        }))
2323    }
2324
2325    fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2326        let body = Self::parse_body(req)?;
2327        let global_table_name = require_str(&body, "GlobalTableName")?;
2328        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2329
2330        let state = self.state.read();
2331        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2332            AwsServiceError::aws_error(
2333                StatusCode::BAD_REQUEST,
2334                "GlobalTableNotFoundException",
2335                format!("Global table not found: {global_table_name}"),
2336            )
2337        })?;
2338
2339        Self::ok_json(json!({
2340            "GlobalTableDescription": {
2341                "GlobalTableName": gt.global_table_name,
2342                "GlobalTableArn": gt.global_table_arn,
2343                "GlobalTableStatus": gt.global_table_status,
2344                "CreationDateTime": gt.creation_date.timestamp() as f64,
2345                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2346                    "RegionName": r.region_name,
2347                    "ReplicaStatus": r.replica_status
2348                })).collect::<Vec<_>>()
2349            }
2350        }))
2351    }
2352
2353    fn describe_global_table_settings(
2354        &self,
2355        req: &AwsRequest,
2356    ) -> Result<AwsResponse, AwsServiceError> {
2357        let body = Self::parse_body(req)?;
2358        let global_table_name = require_str(&body, "GlobalTableName")?;
2359        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2360
2361        let state = self.state.read();
2362        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2363            AwsServiceError::aws_error(
2364                StatusCode::BAD_REQUEST,
2365                "GlobalTableNotFoundException",
2366                format!("Global table not found: {global_table_name}"),
2367            )
2368        })?;
2369
2370        let replica_settings: Vec<Value> = gt
2371            .replication_group
2372            .iter()
2373            .map(|r| {
2374                json!({
2375                    "RegionName": r.region_name,
2376                    "ReplicaStatus": r.replica_status,
2377                    "ReplicaProvisionedReadCapacityUnits": 0,
2378                    "ReplicaProvisionedWriteCapacityUnits": 0
2379                })
2380            })
2381            .collect();
2382
2383        Self::ok_json(json!({
2384            "GlobalTableName": gt.global_table_name,
2385            "ReplicaSettings": replica_settings
2386        }))
2387    }
2388
2389    fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2390        let body = Self::parse_body(req)?;
2391        validate_optional_string_length(
2392            "exclusiveStartGlobalTableName",
2393            body["ExclusiveStartGlobalTableName"].as_str(),
2394            3,
2395            255,
2396        )?;
2397        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
2398        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2399
2400        let state = self.state.read();
2401        let tables: Vec<Value> = state
2402            .global_tables
2403            .values()
2404            .take(limit)
2405            .map(|gt| {
2406                json!({
2407                    "GlobalTableName": gt.global_table_name,
2408                    "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2409                        "RegionName": r.region_name
2410                    })).collect::<Vec<_>>()
2411                })
2412            })
2413            .collect();
2414
2415        Self::ok_json(json!({
2416            "GlobalTables": tables
2417        }))
2418    }
2419
2420    fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2421        let body = Self::parse_body(req)?;
2422        let global_table_name = require_str(&body, "GlobalTableName")?;
2423        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2424        validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
2425
2426        let mut state = self.state.write();
2427        let gt = state
2428            .global_tables
2429            .get_mut(global_table_name)
2430            .ok_or_else(|| {
2431                AwsServiceError::aws_error(
2432                    StatusCode::BAD_REQUEST,
2433                    "GlobalTableNotFoundException",
2434                    format!("Global table not found: {global_table_name}"),
2435                )
2436            })?;
2437
2438        if let Some(updates) = body["ReplicaUpdates"].as_array() {
2439            for update in updates {
2440                if let Some(create) = update["Create"].as_object() {
2441                    if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
2442                        gt.replication_group.push(ReplicaDescription {
2443                            region_name: region.to_string(),
2444                            replica_status: "ACTIVE".to_string(),
2445                        });
2446                    }
2447                }
2448                if let Some(delete) = update["Delete"].as_object() {
2449                    if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
2450                        gt.replication_group.retain(|r| r.region_name != region);
2451                    }
2452                }
2453            }
2454        }
2455
2456        Self::ok_json(json!({
2457            "GlobalTableDescription": {
2458                "GlobalTableName": gt.global_table_name,
2459                "GlobalTableArn": gt.global_table_arn,
2460                "GlobalTableStatus": gt.global_table_status,
2461                "CreationDateTime": gt.creation_date.timestamp() as f64,
2462                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2463                    "RegionName": r.region_name,
2464                    "ReplicaStatus": r.replica_status
2465                })).collect::<Vec<_>>()
2466            }
2467        }))
2468    }
2469
2470    fn update_global_table_settings(
2471        &self,
2472        req: &AwsRequest,
2473    ) -> Result<AwsResponse, AwsServiceError> {
2474        let body = Self::parse_body(req)?;
2475        let global_table_name = require_str(&body, "GlobalTableName")?;
2476        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2477        validate_optional_enum_value(
2478            "globalTableBillingMode",
2479            &body["GlobalTableBillingMode"],
2480            &["PROVISIONED", "PAY_PER_REQUEST"],
2481        )?;
2482        validate_optional_range_i64(
2483            "globalTableProvisionedWriteCapacityUnits",
2484            body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
2485            1,
2486            i64::MAX,
2487        )?;
2488
2489        let state = self.state.read();
2490        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2491            AwsServiceError::aws_error(
2492                StatusCode::BAD_REQUEST,
2493                "GlobalTableNotFoundException",
2494                format!("Global table not found: {global_table_name}"),
2495            )
2496        })?;
2497
2498        let replica_settings: Vec<Value> = gt
2499            .replication_group
2500            .iter()
2501            .map(|r| {
2502                json!({
2503                    "RegionName": r.region_name,
2504                    "ReplicaStatus": r.replica_status,
2505                    "ReplicaProvisionedReadCapacityUnits": 0,
2506                    "ReplicaProvisionedWriteCapacityUnits": 0
2507                })
2508            })
2509            .collect();
2510
2511        Self::ok_json(json!({
2512            "GlobalTableName": gt.global_table_name,
2513            "ReplicaSettings": replica_settings
2514        }))
2515    }
2516
2517    fn describe_table_replica_auto_scaling(
2518        &self,
2519        req: &AwsRequest,
2520    ) -> Result<AwsResponse, AwsServiceError> {
2521        let body = Self::parse_body(req)?;
2522        let table_name = require_str(&body, "TableName")?;
2523
2524        let state = self.state.read();
2525        let table = get_table(&state.tables, table_name)?;
2526
2527        Self::ok_json(json!({
2528            "TableAutoScalingDescription": {
2529                "TableName": table.name,
2530                "TableStatus": table.status,
2531                "Replicas": []
2532            }
2533        }))
2534    }
2535
2536    fn update_table_replica_auto_scaling(
2537        &self,
2538        req: &AwsRequest,
2539    ) -> Result<AwsResponse, AwsServiceError> {
2540        let body = Self::parse_body(req)?;
2541        let table_name = require_str(&body, "TableName")?;
2542
2543        let state = self.state.read();
2544        let table = get_table(&state.tables, table_name)?;
2545
2546        Self::ok_json(json!({
2547            "TableAutoScalingDescription": {
2548                "TableName": table.name,
2549                "TableStatus": table.status,
2550                "Replicas": []
2551            }
2552        }))
2553    }
2554
2555    // ── Kinesis Streaming ──────────────────────────────────────────────
2556
2557    fn enable_kinesis_streaming_destination(
2558        &self,
2559        req: &AwsRequest,
2560    ) -> Result<AwsResponse, AwsServiceError> {
2561        let body = Self::parse_body(req)?;
2562        let table_name = require_str(&body, "TableName")?;
2563        let stream_arn = require_str(&body, "StreamArn")?;
2564        let precision = body["EnableKinesisStreamingConfiguration"]
2565            ["ApproximateCreationDateTimePrecision"]
2566            .as_str()
2567            .unwrap_or("MILLISECOND");
2568
2569        let mut state = self.state.write();
2570        let table = get_table_mut(&mut state.tables, table_name)?;
2571
2572        table.kinesis_destinations.push(KinesisDestination {
2573            stream_arn: stream_arn.to_string(),
2574            destination_status: "ACTIVE".to_string(),
2575            approximate_creation_date_time_precision: precision.to_string(),
2576        });
2577
2578        Self::ok_json(json!({
2579            "TableName": table_name,
2580            "StreamArn": stream_arn,
2581            "DestinationStatus": "ACTIVE",
2582            "EnableKinesisStreamingConfiguration": {
2583                "ApproximateCreationDateTimePrecision": precision
2584            }
2585        }))
2586    }
2587
2588    fn disable_kinesis_streaming_destination(
2589        &self,
2590        req: &AwsRequest,
2591    ) -> Result<AwsResponse, AwsServiceError> {
2592        let body = Self::parse_body(req)?;
2593        let table_name = require_str(&body, "TableName")?;
2594        let stream_arn = require_str(&body, "StreamArn")?;
2595
2596        let mut state = self.state.write();
2597        let table = get_table_mut(&mut state.tables, table_name)?;
2598
2599        if let Some(dest) = table
2600            .kinesis_destinations
2601            .iter_mut()
2602            .find(|d| d.stream_arn == stream_arn)
2603        {
2604            dest.destination_status = "DISABLED".to_string();
2605        }
2606
2607        Self::ok_json(json!({
2608            "TableName": table_name,
2609            "StreamArn": stream_arn,
2610            "DestinationStatus": "DISABLED"
2611        }))
2612    }
2613
2614    fn describe_kinesis_streaming_destination(
2615        &self,
2616        req: &AwsRequest,
2617    ) -> Result<AwsResponse, AwsServiceError> {
2618        let body = Self::parse_body(req)?;
2619        let table_name = require_str(&body, "TableName")?;
2620
2621        let state = self.state.read();
2622        let table = get_table(&state.tables, table_name)?;
2623
2624        let destinations: Vec<Value> = table
2625            .kinesis_destinations
2626            .iter()
2627            .map(|d| {
2628                json!({
2629                    "StreamArn": d.stream_arn,
2630                    "DestinationStatus": d.destination_status,
2631                    "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2632                })
2633            })
2634            .collect();
2635
2636        Self::ok_json(json!({
2637            "TableName": table_name,
2638            "KinesisDataStreamDestinations": destinations
2639        }))
2640    }
2641
2642    fn update_kinesis_streaming_destination(
2643        &self,
2644        req: &AwsRequest,
2645    ) -> Result<AwsResponse, AwsServiceError> {
2646        let body = Self::parse_body(req)?;
2647        let table_name = require_str(&body, "TableName")?;
2648        let stream_arn = require_str(&body, "StreamArn")?;
2649        let precision = body["UpdateKinesisStreamingConfiguration"]
2650            ["ApproximateCreationDateTimePrecision"]
2651            .as_str()
2652            .unwrap_or("MILLISECOND");
2653
2654        let mut state = self.state.write();
2655        let table = get_table_mut(&mut state.tables, table_name)?;
2656
2657        if let Some(dest) = table
2658            .kinesis_destinations
2659            .iter_mut()
2660            .find(|d| d.stream_arn == stream_arn)
2661        {
2662            dest.approximate_creation_date_time_precision = precision.to_string();
2663        }
2664
2665        Self::ok_json(json!({
2666            "TableName": table_name,
2667            "StreamArn": stream_arn,
2668            "DestinationStatus": "ACTIVE",
2669            "UpdateKinesisStreamingConfiguration": {
2670                "ApproximateCreationDateTimePrecision": precision
2671            }
2672        }))
2673    }
2674
2675    // ── Contributor Insights ───────────────────────────────────────────
2676
2677    fn describe_contributor_insights(
2678        &self,
2679        req: &AwsRequest,
2680    ) -> Result<AwsResponse, AwsServiceError> {
2681        let body = Self::parse_body(req)?;
2682        let table_name = require_str(&body, "TableName")?;
2683        let index_name = body["IndexName"].as_str();
2684
2685        let state = self.state.read();
2686        let table = get_table(&state.tables, table_name)?;
2687
2688        let top = table.top_contributors(10);
2689        let contributors: Vec<Value> = top
2690            .iter()
2691            .map(|(key, count)| {
2692                json!({
2693                    "Key": key,
2694                    "Count": count
2695                })
2696            })
2697            .collect();
2698
2699        let mut result = json!({
2700            "TableName": table_name,
2701            "ContributorInsightsStatus": table.contributor_insights_status,
2702            "ContributorInsightsRuleList": ["DynamoDBContributorInsights"],
2703            "TopContributors": contributors
2704        });
2705        if let Some(idx) = index_name {
2706            result["IndexName"] = json!(idx);
2707        }
2708
2709        Self::ok_json(result)
2710    }
2711
2712    fn update_contributor_insights(
2713        &self,
2714        req: &AwsRequest,
2715    ) -> Result<AwsResponse, AwsServiceError> {
2716        let body = Self::parse_body(req)?;
2717        let table_name = require_str(&body, "TableName")?;
2718        let action = require_str(&body, "ContributorInsightsAction")?;
2719        let index_name = body["IndexName"].as_str();
2720
2721        let mut state = self.state.write();
2722        let table = get_table_mut(&mut state.tables, table_name)?;
2723
2724        let status = match action {
2725            "ENABLE" => "ENABLED",
2726            "DISABLE" => "DISABLED",
2727            _ => {
2728                return Err(AwsServiceError::aws_error(
2729                    StatusCode::BAD_REQUEST,
2730                    "ValidationException",
2731                    format!("Invalid ContributorInsightsAction: {action}"),
2732                ))
2733            }
2734        };
2735        table.contributor_insights_status = status.to_string();
2736        if status == "DISABLED" {
2737            table.contributor_insights_counters.clear();
2738        }
2739
2740        let mut result = json!({
2741            "TableName": table_name,
2742            "ContributorInsightsStatus": status
2743        });
2744        if let Some(idx) = index_name {
2745            result["IndexName"] = json!(idx);
2746        }
2747
2748        Self::ok_json(result)
2749    }
2750
2751    fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2752        let body = Self::parse_body(req)?;
2753        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2754        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2755        let table_name = body["TableName"].as_str();
2756
2757        let state = self.state.read();
2758        let summaries: Vec<Value> = state
2759            .tables
2760            .values()
2761            .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2762            .map(|t| {
2763                json!({
2764                    "TableName": t.name,
2765                    "ContributorInsightsStatus": t.contributor_insights_status
2766                })
2767            })
2768            .collect();
2769
2770        Self::ok_json(json!({
2771            "ContributorInsightsSummaries": summaries
2772        }))
2773    }
2774
2775    // ── Import/Export ──────────────────────────────────────────────────
2776
2777    fn export_table_to_point_in_time(
2778        &self,
2779        req: &AwsRequest,
2780    ) -> Result<AwsResponse, AwsServiceError> {
2781        let body = Self::parse_body(req)?;
2782        let table_arn = require_str(&body, "TableArn")?;
2783        let s3_bucket = require_str(&body, "S3Bucket")?;
2784        let s3_prefix = body["S3Prefix"].as_str();
2785        let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2786
2787        let state = self.state.read();
2788        // Verify table exists and get items
2789        let table = find_table_by_arn(&state.tables, table_arn)?;
2790        let items = table.items.clone();
2791        let item_count = items.len() as i64;
2792
2793        let now = Utc::now();
2794        let export_arn = format!(
2795            "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2796            state.region,
2797            state.account_id,
2798            table_arn.rsplit('/').next().unwrap_or("unknown"),
2799            uuid::Uuid::new_v4()
2800        );
2801
2802        drop(state);
2803
2804        // Serialize items as JSON Lines and write to S3
2805        let mut json_lines = String::new();
2806        for item in &items {
2807            let item_json = if export_format == "DYNAMODB_JSON" {
2808                json!({ "Item": item })
2809            } else {
2810                json!(item)
2811            };
2812            json_lines.push_str(&serde_json::to_string(&item_json).unwrap_or_default());
2813            json_lines.push('\n');
2814        }
2815        let data_size = json_lines.len() as i64;
2816
2817        // Build S3 key for the export data
2818        let s3_key = if let Some(prefix) = s3_prefix {
2819            format!("{prefix}/data/manifest-files.json")
2820        } else {
2821            "data/manifest-files.json".to_string()
2822        };
2823
2824        // Write to S3 if we have access to S3 state
2825        let mut export_failed = false;
2826        let mut failure_reason = String::new();
2827        if let Some(ref s3_state) = self.s3_state {
2828            let mut s3 = s3_state.write();
2829            if let Some(bucket) = s3.buckets.get_mut(s3_bucket) {
2830                let etag = uuid::Uuid::new_v4().to_string().replace('-', "");
2831                let obj = fakecloud_s3::state::S3Object {
2832                    key: s3_key.clone(),
2833                    data: bytes::Bytes::from(json_lines),
2834                    content_type: "application/json".to_string(),
2835                    etag,
2836                    size: data_size as u64,
2837                    last_modified: now,
2838                    metadata: HashMap::new(),
2839                    storage_class: "STANDARD".to_string(),
2840                    tags: HashMap::new(),
2841                    acl_grants: Vec::new(),
2842                    acl_owner_id: None,
2843                    parts_count: None,
2844                    part_sizes: None,
2845                    sse_algorithm: None,
2846                    sse_kms_key_id: None,
2847                    bucket_key_enabled: None,
2848                    version_id: None,
2849                    is_delete_marker: false,
2850                    content_encoding: None,
2851                    website_redirect_location: None,
2852                    restore_ongoing: None,
2853                    restore_expiry: None,
2854                    checksum_algorithm: None,
2855                    checksum_value: None,
2856                    lock_mode: None,
2857                    lock_retain_until: None,
2858                    lock_legal_hold: None,
2859                };
2860                bucket.objects.insert(s3_key, obj);
2861            } else {
2862                export_failed = true;
2863                failure_reason = format!("S3 bucket does not exist: {s3_bucket}");
2864            }
2865        }
2866
2867        let export_status = if export_failed { "FAILED" } else { "COMPLETED" };
2868
2869        let export = ExportDescription {
2870            export_arn: export_arn.clone(),
2871            export_status: export_status.to_string(),
2872            table_arn: table_arn.to_string(),
2873            s3_bucket: s3_bucket.to_string(),
2874            s3_prefix: s3_prefix.map(|s| s.to_string()),
2875            export_format: export_format.to_string(),
2876            start_time: now,
2877            end_time: now,
2878            export_time: now,
2879            item_count,
2880            billed_size_bytes: data_size,
2881        };
2882
2883        let mut state = self.state.write();
2884        state.exports.insert(export_arn.clone(), export);
2885
2886        let mut response = json!({
2887            "ExportDescription": {
2888                "ExportArn": export_arn,
2889                "ExportStatus": export_status,
2890                "TableArn": table_arn,
2891                "S3Bucket": s3_bucket,
2892                "S3Prefix": s3_prefix,
2893                "ExportFormat": export_format,
2894                "StartTime": now.timestamp() as f64,
2895                "EndTime": now.timestamp() as f64,
2896                "ExportTime": now.timestamp() as f64,
2897                "ItemCount": item_count,
2898                "BilledSizeBytes": data_size
2899            }
2900        });
2901        if export_failed {
2902            response["ExportDescription"]["FailureCode"] = json!("S3NoSuchBucket");
2903            response["ExportDescription"]["FailureMessage"] = json!(failure_reason);
2904        }
2905        Self::ok_json(response)
2906    }
2907
2908    fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2909        let body = Self::parse_body(req)?;
2910        let export_arn = require_str(&body, "ExportArn")?;
2911
2912        let state = self.state.read();
2913        let export = state.exports.get(export_arn).ok_or_else(|| {
2914            AwsServiceError::aws_error(
2915                StatusCode::BAD_REQUEST,
2916                "ExportNotFoundException",
2917                format!("Export not found: {export_arn}"),
2918            )
2919        })?;
2920
2921        Self::ok_json(json!({
2922            "ExportDescription": {
2923                "ExportArn": export.export_arn,
2924                "ExportStatus": export.export_status,
2925                "TableArn": export.table_arn,
2926                "S3Bucket": export.s3_bucket,
2927                "S3Prefix": export.s3_prefix,
2928                "ExportFormat": export.export_format,
2929                "StartTime": export.start_time.timestamp() as f64,
2930                "EndTime": export.end_time.timestamp() as f64,
2931                "ExportTime": export.export_time.timestamp() as f64,
2932                "ItemCount": export.item_count,
2933                "BilledSizeBytes": export.billed_size_bytes
2934            }
2935        }))
2936    }
2937
2938    fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2939        let body = Self::parse_body(req)?;
2940        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2941        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2942        let table_arn = body["TableArn"].as_str();
2943
2944        let state = self.state.read();
2945        let summaries: Vec<Value> = state
2946            .exports
2947            .values()
2948            .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2949            .map(|e| {
2950                json!({
2951                    "ExportArn": e.export_arn,
2952                    "ExportStatus": e.export_status,
2953                    "TableArn": e.table_arn
2954                })
2955            })
2956            .collect();
2957
2958        Self::ok_json(json!({
2959            "ExportSummaries": summaries
2960        }))
2961    }
2962
2963    fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2964        let body = Self::parse_body(req)?;
2965        let input_format = require_str(&body, "InputFormat")?;
2966        let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2967            AwsServiceError::aws_error(
2968                StatusCode::BAD_REQUEST,
2969                "ValidationException",
2970                "S3BucketSource is required",
2971            )
2972        })?;
2973        let s3_bucket = s3_source
2974            .get("S3Bucket")
2975            .and_then(|v| v.as_str())
2976            .unwrap_or("");
2977        let s3_key_prefix = s3_source
2978            .get("S3KeyPrefix")
2979            .and_then(|v| v.as_str())
2980            .unwrap_or("");
2981
2982        let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2983            AwsServiceError::aws_error(
2984                StatusCode::BAD_REQUEST,
2985                "ValidationException",
2986                "TableCreationParameters is required",
2987            )
2988        })?;
2989        let table_name = table_params
2990            .get("TableName")
2991            .and_then(|v| v.as_str())
2992            .ok_or_else(|| {
2993                AwsServiceError::aws_error(
2994                    StatusCode::BAD_REQUEST,
2995                    "ValidationException",
2996                    "TableCreationParameters.TableName is required",
2997                )
2998            })?;
2999
3000        let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
3001        let attribute_definitions = parse_attribute_definitions(
3002            table_params
3003                .get("AttributeDefinitions")
3004                .unwrap_or(&Value::Null),
3005        )?;
3006
3007        // Read items from S3 if we have access
3008        let mut imported_items: Vec<HashMap<String, Value>> = Vec::new();
3009        let mut processed_size_bytes: i64 = 0;
3010        if let Some(ref s3_state) = self.s3_state {
3011            let s3 = s3_state.read();
3012            let bucket = s3.buckets.get(s3_bucket).ok_or_else(|| {
3013                AwsServiceError::aws_error(
3014                    StatusCode::BAD_REQUEST,
3015                    "ImportConflictException",
3016                    format!("S3 bucket does not exist: {s3_bucket}"),
3017                )
3018            })?;
3019            // Find all objects under the prefix and try to parse JSON Lines from each
3020            let prefix = if s3_key_prefix.is_empty() {
3021                String::new()
3022            } else {
3023                s3_key_prefix.to_string()
3024            };
3025            for (key, obj) in &bucket.objects {
3026                if !prefix.is_empty() && !key.starts_with(&prefix) {
3027                    continue;
3028                }
3029                let data = std::str::from_utf8(&obj.data).unwrap_or("");
3030                processed_size_bytes += obj.size as i64;
3031                for line in data.lines() {
3032                    let line = line.trim();
3033                    if line.is_empty() {
3034                        continue;
3035                    }
3036                    if let Ok(parsed) = serde_json::from_str::<Value>(line) {
3037                        // DYNAMODB_JSON format wraps items in {"Item": {...}}
3038                        let item = if input_format == "DYNAMODB_JSON" {
3039                            if let Some(item_obj) = parsed.get("Item") {
3040                                item_obj.as_object().cloned().unwrap_or_default()
3041                            } else {
3042                                parsed.as_object().cloned().unwrap_or_default()
3043                            }
3044                        } else {
3045                            parsed.as_object().cloned().unwrap_or_default()
3046                        };
3047                        if !item.is_empty() {
3048                            imported_items
3049                                .push(item.into_iter().collect::<HashMap<String, Value>>());
3050                        }
3051                    }
3052                }
3053            }
3054        }
3055
3056        let mut state = self.state.write();
3057
3058        if state.tables.contains_key(table_name) {
3059            return Err(AwsServiceError::aws_error(
3060                StatusCode::BAD_REQUEST,
3061                "ResourceInUseException",
3062                format!("Table already exists: {table_name}"),
3063            ));
3064        }
3065
3066        let now = Utc::now();
3067        let table_arn = format!(
3068            "arn:aws:dynamodb:{}:{}:table/{}",
3069            state.region, state.account_id, table_name
3070        );
3071        let import_arn = format!(
3072            "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
3073            state.region,
3074            state.account_id,
3075            table_name,
3076            uuid::Uuid::new_v4()
3077        );
3078
3079        let processed_item_count = imported_items.len() as i64;
3080
3081        let mut table = DynamoTable {
3082            name: table_name.to_string(),
3083            arn: table_arn.clone(),
3084            key_schema,
3085            attribute_definitions,
3086            provisioned_throughput: ProvisionedThroughput {
3087                read_capacity_units: 0,
3088                write_capacity_units: 0,
3089            },
3090            items: imported_items,
3091            gsi: Vec::new(),
3092            lsi: Vec::new(),
3093            tags: HashMap::new(),
3094            created_at: now,
3095            status: "ACTIVE".to_string(),
3096            item_count: 0,
3097            size_bytes: 0,
3098            billing_mode: "PAY_PER_REQUEST".to_string(),
3099            ttl_attribute: None,
3100            ttl_enabled: false,
3101            resource_policy: None,
3102            pitr_enabled: false,
3103            kinesis_destinations: Vec::new(),
3104            contributor_insights_status: "DISABLED".to_string(),
3105            contributor_insights_counters: HashMap::new(),
3106            stream_enabled: false,
3107            stream_view_type: None,
3108            stream_arn: None,
3109            stream_records: Arc::new(RwLock::new(Vec::new())),
3110            sse_type: None,
3111            sse_kms_key_arn: None,
3112        };
3113        table.recalculate_stats();
3114        state.tables.insert(table_name.to_string(), table);
3115
3116        let import_desc = ImportDescription {
3117            import_arn: import_arn.clone(),
3118            import_status: "COMPLETED".to_string(),
3119            table_arn: table_arn.clone(),
3120            table_name: table_name.to_string(),
3121            s3_bucket_source: s3_bucket.to_string(),
3122            input_format: input_format.to_string(),
3123            start_time: now,
3124            end_time: now,
3125            processed_item_count,
3126            processed_size_bytes,
3127        };
3128        state.imports.insert(import_arn.clone(), import_desc);
3129
3130        let table_ref = state.tables.get(table_name).unwrap();
3131        let ks: Vec<Value> = table_ref
3132            .key_schema
3133            .iter()
3134            .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3135            .collect();
3136        let ad: Vec<Value> = table_ref
3137            .attribute_definitions
3138            .iter()
3139            .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3140            .collect();
3141
3142        Self::ok_json(json!({
3143            "ImportTableDescription": {
3144                "ImportArn": import_arn,
3145                "ImportStatus": "COMPLETED",
3146                "TableArn": table_arn,
3147                "TableId": uuid::Uuid::new_v4().to_string(),
3148                "S3BucketSource": {
3149                    "S3Bucket": s3_bucket
3150                },
3151                "InputFormat": input_format,
3152                "TableCreationParameters": {
3153                    "TableName": table_name,
3154                    "KeySchema": ks,
3155                    "AttributeDefinitions": ad
3156                },
3157                "StartTime": now.timestamp() as f64,
3158                "EndTime": now.timestamp() as f64,
3159                "ProcessedItemCount": processed_item_count,
3160                "ProcessedSizeBytes": processed_size_bytes
3161            }
3162        }))
3163    }
3164
3165    fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3166        let body = Self::parse_body(req)?;
3167        let import_arn = require_str(&body, "ImportArn")?;
3168
3169        let state = self.state.read();
3170        let import = state.imports.get(import_arn).ok_or_else(|| {
3171            AwsServiceError::aws_error(
3172                StatusCode::BAD_REQUEST,
3173                "ImportNotFoundException",
3174                format!("Import not found: {import_arn}"),
3175            )
3176        })?;
3177
3178        Self::ok_json(json!({
3179            "ImportTableDescription": {
3180                "ImportArn": import.import_arn,
3181                "ImportStatus": import.import_status,
3182                "TableArn": import.table_arn,
3183                "S3BucketSource": {
3184                    "S3Bucket": import.s3_bucket_source
3185                },
3186                "InputFormat": import.input_format,
3187                "StartTime": import.start_time.timestamp() as f64,
3188                "EndTime": import.end_time.timestamp() as f64,
3189                "ProcessedItemCount": import.processed_item_count,
3190                "ProcessedSizeBytes": import.processed_size_bytes
3191            }
3192        }))
3193    }
3194
3195    fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3196        let body = Self::parse_body(req)?;
3197        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
3198        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
3199        validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
3200        let table_arn = body["TableArn"].as_str();
3201
3202        let state = self.state.read();
3203        let summaries: Vec<Value> = state
3204            .imports
3205            .values()
3206            .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
3207            .map(|i| {
3208                json!({
3209                    "ImportArn": i.import_arn,
3210                    "ImportStatus": i.import_status,
3211                    "TableArn": i.table_arn
3212                })
3213            })
3214            .collect();
3215
3216        Self::ok_json(json!({
3217            "ImportSummaryList": summaries
3218        }))
3219    }
3220}
3221
3222#[async_trait]
3223impl AwsService for DynamoDbService {
3224    fn service_name(&self) -> &str {
3225        "dynamodb"
3226    }
3227
3228    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3229        match req.action.as_str() {
3230            "CreateTable" => self.create_table(&req),
3231            "DeleteTable" => self.delete_table(&req),
3232            "DescribeTable" => self.describe_table(&req),
3233            "ListTables" => self.list_tables(&req),
3234            "UpdateTable" => self.update_table(&req),
3235            "PutItem" => self.put_item(&req),
3236            "GetItem" => self.get_item(&req),
3237            "DeleteItem" => self.delete_item(&req),
3238            "UpdateItem" => self.update_item(&req),
3239            "Query" => self.query(&req),
3240            "Scan" => self.scan(&req),
3241            "BatchGetItem" => self.batch_get_item(&req),
3242            "BatchWriteItem" => self.batch_write_item(&req),
3243            "TagResource" => self.tag_resource(&req),
3244            "UntagResource" => self.untag_resource(&req),
3245            "ListTagsOfResource" => self.list_tags_of_resource(&req),
3246            "TransactGetItems" => self.transact_get_items(&req),
3247            "TransactWriteItems" => self.transact_write_items(&req),
3248            "ExecuteStatement" => self.execute_statement(&req),
3249            "BatchExecuteStatement" => self.batch_execute_statement(&req),
3250            "ExecuteTransaction" => self.execute_transaction(&req),
3251            "UpdateTimeToLive" => self.update_time_to_live(&req),
3252            "DescribeTimeToLive" => self.describe_time_to_live(&req),
3253            "PutResourcePolicy" => self.put_resource_policy(&req),
3254            "GetResourcePolicy" => self.get_resource_policy(&req),
3255            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
3256            // Stubs
3257            "DescribeEndpoints" => self.describe_endpoints(&req),
3258            "DescribeLimits" => self.describe_limits(&req),
3259            // Backups
3260            "CreateBackup" => self.create_backup(&req),
3261            "DeleteBackup" => self.delete_backup(&req),
3262            "DescribeBackup" => self.describe_backup(&req),
3263            "ListBackups" => self.list_backups(&req),
3264            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
3265            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
3266            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
3267            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
3268            // Global tables
3269            "CreateGlobalTable" => self.create_global_table(&req),
3270            "DescribeGlobalTable" => self.describe_global_table(&req),
3271            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
3272            "ListGlobalTables" => self.list_global_tables(&req),
3273            "UpdateGlobalTable" => self.update_global_table(&req),
3274            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
3275            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
3276            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
3277            // Kinesis streaming
3278            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
3279            "DisableKinesisStreamingDestination" => {
3280                self.disable_kinesis_streaming_destination(&req)
3281            }
3282            "DescribeKinesisStreamingDestination" => {
3283                self.describe_kinesis_streaming_destination(&req)
3284            }
3285            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
3286            // Contributor insights
3287            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
3288            "UpdateContributorInsights" => self.update_contributor_insights(&req),
3289            "ListContributorInsights" => self.list_contributor_insights(&req),
3290            // Import/Export
3291            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
3292            "DescribeExport" => self.describe_export(&req),
3293            "ListExports" => self.list_exports(&req),
3294            "ImportTable" => self.import_table(&req),
3295            "DescribeImport" => self.describe_import(&req),
3296            "ListImports" => self.list_imports(&req),
3297            _ => Err(AwsServiceError::action_not_implemented(
3298                "dynamodb",
3299                &req.action,
3300            )),
3301        }
3302    }
3303
3304    fn supported_actions(&self) -> &[&str] {
3305        &[
3306            "CreateTable",
3307            "DeleteTable",
3308            "DescribeTable",
3309            "ListTables",
3310            "UpdateTable",
3311            "PutItem",
3312            "GetItem",
3313            "DeleteItem",
3314            "UpdateItem",
3315            "Query",
3316            "Scan",
3317            "BatchGetItem",
3318            "BatchWriteItem",
3319            "TagResource",
3320            "UntagResource",
3321            "ListTagsOfResource",
3322            "TransactGetItems",
3323            "TransactWriteItems",
3324            "ExecuteStatement",
3325            "BatchExecuteStatement",
3326            "ExecuteTransaction",
3327            "UpdateTimeToLive",
3328            "DescribeTimeToLive",
3329            "PutResourcePolicy",
3330            "GetResourcePolicy",
3331            "DeleteResourcePolicy",
3332            "DescribeEndpoints",
3333            "DescribeLimits",
3334            "CreateBackup",
3335            "DeleteBackup",
3336            "DescribeBackup",
3337            "ListBackups",
3338            "RestoreTableFromBackup",
3339            "RestoreTableToPointInTime",
3340            "UpdateContinuousBackups",
3341            "DescribeContinuousBackups",
3342            "CreateGlobalTable",
3343            "DescribeGlobalTable",
3344            "DescribeGlobalTableSettings",
3345            "ListGlobalTables",
3346            "UpdateGlobalTable",
3347            "UpdateGlobalTableSettings",
3348            "DescribeTableReplicaAutoScaling",
3349            "UpdateTableReplicaAutoScaling",
3350            "EnableKinesisStreamingDestination",
3351            "DisableKinesisStreamingDestination",
3352            "DescribeKinesisStreamingDestination",
3353            "UpdateKinesisStreamingDestination",
3354            "DescribeContributorInsights",
3355            "UpdateContributorInsights",
3356            "ListContributorInsights",
3357            "ExportTableToPointInTime",
3358            "DescribeExport",
3359            "ListExports",
3360            "ImportTable",
3361            "DescribeImport",
3362            "ListImports",
3363        ]
3364    }
3365}
3366
3367// ── Helper functions ────────────────────────────────────────────────────
3368
3369fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
3370    body[field].as_str().ok_or_else(|| {
3371        AwsServiceError::aws_error(
3372            StatusCode::BAD_REQUEST,
3373            "ValidationException",
3374            format!("{field} is required"),
3375        )
3376    })
3377}
3378
3379fn require_object(
3380    body: &Value,
3381    field: &str,
3382) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
3383    let obj = body[field].as_object().ok_or_else(|| {
3384        AwsServiceError::aws_error(
3385            StatusCode::BAD_REQUEST,
3386            "ValidationException",
3387            format!("{field} is required"),
3388        )
3389    })?;
3390    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3391}
3392
3393fn get_table<'a>(
3394    tables: &'a HashMap<String, DynamoTable>,
3395    name: &str,
3396) -> Result<&'a DynamoTable, AwsServiceError> {
3397    tables.get(name).ok_or_else(|| {
3398        AwsServiceError::aws_error(
3399            StatusCode::BAD_REQUEST,
3400            "ResourceNotFoundException",
3401            format!("Requested resource not found: Table: {name} not found"),
3402        )
3403    })
3404}
3405
3406fn get_table_mut<'a>(
3407    tables: &'a mut HashMap<String, DynamoTable>,
3408    name: &str,
3409) -> Result<&'a mut DynamoTable, AwsServiceError> {
3410    tables.get_mut(name).ok_or_else(|| {
3411        AwsServiceError::aws_error(
3412            StatusCode::BAD_REQUEST,
3413            "ResourceNotFoundException",
3414            format!("Requested resource not found: Table: {name} not found"),
3415        )
3416    })
3417}
3418
3419fn find_table_by_arn<'a>(
3420    tables: &'a HashMap<String, DynamoTable>,
3421    arn: &str,
3422) -> Result<&'a DynamoTable, AwsServiceError> {
3423    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
3424        AwsServiceError::aws_error(
3425            StatusCode::BAD_REQUEST,
3426            "ResourceNotFoundException",
3427            format!("Requested resource not found: {arn}"),
3428        )
3429    })
3430}
3431
3432fn find_table_by_arn_mut<'a>(
3433    tables: &'a mut HashMap<String, DynamoTable>,
3434    arn: &str,
3435) -> Result<&'a mut DynamoTable, AwsServiceError> {
3436    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
3437        AwsServiceError::aws_error(
3438            StatusCode::BAD_REQUEST,
3439            "ResourceNotFoundException",
3440            format!("Requested resource not found: {arn}"),
3441        )
3442    })
3443}
3444
3445fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
3446    let arr = val.as_array().ok_or_else(|| {
3447        AwsServiceError::aws_error(
3448            StatusCode::BAD_REQUEST,
3449            "ValidationException",
3450            "KeySchema is required",
3451        )
3452    })?;
3453    Ok(arr
3454        .iter()
3455        .map(|elem| KeySchemaElement {
3456            attribute_name: elem["AttributeName"]
3457                .as_str()
3458                .unwrap_or_default()
3459                .to_string(),
3460            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
3461        })
3462        .collect())
3463}
3464
3465fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
3466    let arr = val.as_array().ok_or_else(|| {
3467        AwsServiceError::aws_error(
3468            StatusCode::BAD_REQUEST,
3469            "ValidationException",
3470            "AttributeDefinitions is required",
3471        )
3472    })?;
3473    Ok(arr
3474        .iter()
3475        .map(|elem| AttributeDefinition {
3476            attribute_name: elem["AttributeName"]
3477                .as_str()
3478                .unwrap_or_default()
3479                .to_string(),
3480            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
3481        })
3482        .collect())
3483}
3484
3485fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
3486    Ok(ProvisionedThroughput {
3487        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
3488        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
3489    })
3490}
3491
3492fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
3493    let Some(arr) = val.as_array() else {
3494        return Vec::new();
3495    };
3496    arr.iter()
3497        .filter_map(|g| {
3498            Some(GlobalSecondaryIndex {
3499                index_name: g["IndexName"].as_str()?.to_string(),
3500                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
3501                projection: parse_projection(&g["Projection"]),
3502                provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
3503                    .ok(),
3504            })
3505        })
3506        .collect()
3507}
3508
3509fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
3510    let Some(arr) = val.as_array() else {
3511        return Vec::new();
3512    };
3513    arr.iter()
3514        .filter_map(|l| {
3515            Some(LocalSecondaryIndex {
3516                index_name: l["IndexName"].as_str()?.to_string(),
3517                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
3518                projection: parse_projection(&l["Projection"]),
3519            })
3520        })
3521        .collect()
3522}
3523
3524fn parse_projection(val: &Value) -> Projection {
3525    Projection {
3526        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
3527        non_key_attributes: val["NonKeyAttributes"]
3528            .as_array()
3529            .map(|arr| {
3530                arr.iter()
3531                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3532                    .collect()
3533            })
3534            .unwrap_or_default(),
3535    }
3536}
3537
3538fn parse_tags(val: &Value) -> HashMap<String, String> {
3539    let mut tags = HashMap::new();
3540    if let Some(arr) = val.as_array() {
3541        for tag in arr {
3542            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
3543                tags.insert(k.to_string(), v.to_string());
3544            }
3545        }
3546    }
3547    tags
3548}
3549
3550fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
3551    let mut names = HashMap::new();
3552    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
3553        for (k, v) in obj {
3554            if let Some(s) = v.as_str() {
3555                names.insert(k.clone(), s.to_string());
3556            }
3557        }
3558    }
3559    names
3560}
3561
3562fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
3563    let mut values = HashMap::new();
3564    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
3565        for (k, v) in obj {
3566            values.insert(k.clone(), v.clone());
3567        }
3568    }
3569    values
3570}
3571
3572fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
3573    if name.starts_with('#') {
3574        expr_attr_names
3575            .get(name)
3576            .cloned()
3577            .unwrap_or_else(|| name.to_string())
3578    } else {
3579        name.to_string()
3580    }
3581}
3582
3583fn extract_key(
3584    table: &DynamoTable,
3585    item: &HashMap<String, AttributeValue>,
3586) -> HashMap<String, AttributeValue> {
3587    let mut key = HashMap::new();
3588    let hash_key = table.hash_key_name();
3589    if let Some(v) = item.get(hash_key) {
3590        key.insert(hash_key.to_string(), v.clone());
3591    }
3592    if let Some(range_key) = table.range_key_name() {
3593        if let Some(v) = item.get(range_key) {
3594            key.insert(range_key.to_string(), v.clone());
3595        }
3596    }
3597    key
3598}
3599
3600/// Parse a JSON object into a key map (used for ExclusiveStartKey).
3601fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
3602    let obj = value.as_object()?;
3603    if obj.is_empty() {
3604        return None;
3605    }
3606    Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3607}
3608
3609/// Check whether an item's key attributes match the given key map.
3610fn item_matches_key(
3611    item: &HashMap<String, AttributeValue>,
3612    key: &HashMap<String, AttributeValue>,
3613    hash_key_name: &str,
3614    range_key_name: Option<&str>,
3615) -> bool {
3616    let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
3617        (Some(a), Some(b)) => a == b,
3618        _ => false,
3619    };
3620    if !hash_match {
3621        return false;
3622    }
3623    match range_key_name {
3624        Some(rk) => match (item.get(rk), key.get(rk)) {
3625            (Some(a), Some(b)) => a == b,
3626            (None, None) => true,
3627            _ => false,
3628        },
3629        None => true,
3630    }
3631}
3632
3633/// Extract the primary key from an item given explicit key attribute names.
3634fn extract_key_for_schema(
3635    item: &HashMap<String, AttributeValue>,
3636    hash_key_name: &str,
3637    range_key_name: Option<&str>,
3638) -> HashMap<String, AttributeValue> {
3639    let mut key = HashMap::new();
3640    if let Some(v) = item.get(hash_key_name) {
3641        key.insert(hash_key_name.to_string(), v.clone());
3642    }
3643    if let Some(rk) = range_key_name {
3644        if let Some(v) = item.get(rk) {
3645            key.insert(rk.to_string(), v.clone());
3646        }
3647    }
3648    key
3649}
3650
3651fn validate_key_in_item(
3652    table: &DynamoTable,
3653    item: &HashMap<String, AttributeValue>,
3654) -> Result<(), AwsServiceError> {
3655    let hash_key = table.hash_key_name();
3656    if !item.contains_key(hash_key) {
3657        return Err(AwsServiceError::aws_error(
3658            StatusCode::BAD_REQUEST,
3659            "ValidationException",
3660            format!("Missing the key {hash_key} in the item"),
3661        ));
3662    }
3663    if let Some(range_key) = table.range_key_name() {
3664        if !item.contains_key(range_key) {
3665            return Err(AwsServiceError::aws_error(
3666                StatusCode::BAD_REQUEST,
3667                "ValidationException",
3668                format!("Missing the key {range_key} in the item"),
3669            ));
3670        }
3671    }
3672    Ok(())
3673}
3674
3675fn validate_key_attributes_in_key(
3676    table: &DynamoTable,
3677    key: &HashMap<String, AttributeValue>,
3678) -> Result<(), AwsServiceError> {
3679    let hash_key = table.hash_key_name();
3680    if !key.contains_key(hash_key) {
3681        return Err(AwsServiceError::aws_error(
3682            StatusCode::BAD_REQUEST,
3683            "ValidationException",
3684            format!("Missing the key {hash_key} in the item"),
3685        ));
3686    }
3687    Ok(())
3688}
3689
3690fn project_item(
3691    item: &HashMap<String, AttributeValue>,
3692    body: &Value,
3693) -> HashMap<String, AttributeValue> {
3694    let projection = body["ProjectionExpression"].as_str();
3695    match projection {
3696        Some(proj) if !proj.is_empty() => {
3697            let expr_attr_names = parse_expression_attribute_names(body);
3698            let attrs: Vec<String> = proj
3699                .split(',')
3700                .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3701                .collect();
3702            let mut result = HashMap::new();
3703            for attr in &attrs {
3704                if let Some(v) = resolve_nested_path(item, attr) {
3705                    insert_nested_value(&mut result, attr, v);
3706                }
3707            }
3708            result
3709        }
3710        _ => item.clone(),
3711    }
3712}
3713
3714/// Resolve expression attribute names within each segment of a projection path.
3715/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
3716fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3717    // Split on dots, resolve each part, rejoin
3718    let mut result = String::new();
3719    for (i, segment) in path.split('.').enumerate() {
3720        if i > 0 {
3721            result.push('.');
3722        }
3723        // A segment might be like "#n" or "people[0]" or "#attr[0]"
3724        if let Some(bracket_pos) = segment.find('[') {
3725            let key_part = &segment[..bracket_pos];
3726            let index_part = &segment[bracket_pos..];
3727            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3728            result.push_str(index_part);
3729        } else {
3730            result.push_str(&resolve_attr_name(segment, expr_attr_names));
3731        }
3732    }
3733    result
3734}
3735
3736/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
3737fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3738    let segments = parse_path_segments(path);
3739    if segments.is_empty() {
3740        return None;
3741    }
3742
3743    let first = &segments[0];
3744    let top_key = match first {
3745        PathSegment::Key(k) => k.as_str(),
3746        _ => return None,
3747    };
3748
3749    let mut current = item.get(top_key)?.clone();
3750
3751    for segment in &segments[1..] {
3752        match segment {
3753            PathSegment::Key(k) => {
3754                // Navigate into a Map: {"M": {"key": ...}}
3755                current = current.get("M")?.get(k)?.clone();
3756            }
3757            PathSegment::Index(idx) => {
3758                // Navigate into a List: {"L": [...]}
3759                current = current.get("L")?.get(*idx)?.clone();
3760            }
3761        }
3762    }
3763
3764    Some(current)
3765}
3766
3767#[derive(Debug)]
3768enum PathSegment {
3769    Key(String),
3770    Index(usize),
3771}
3772
3773/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
3774fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3775    let mut segments = Vec::new();
3776    let mut current = String::new();
3777
3778    let chars: Vec<char> = path.chars().collect();
3779    let mut i = 0;
3780    while i < chars.len() {
3781        match chars[i] {
3782            '.' => {
3783                if !current.is_empty() {
3784                    segments.push(PathSegment::Key(current.clone()));
3785                    current.clear();
3786                }
3787            }
3788            '[' => {
3789                if !current.is_empty() {
3790                    segments.push(PathSegment::Key(current.clone()));
3791                    current.clear();
3792                }
3793                i += 1;
3794                let mut num = String::new();
3795                while i < chars.len() && chars[i] != ']' {
3796                    num.push(chars[i]);
3797                    i += 1;
3798                }
3799                if let Ok(idx) = num.parse::<usize>() {
3800                    segments.push(PathSegment::Index(idx));
3801                }
3802                // skip ']'
3803            }
3804            c => {
3805                current.push(c);
3806            }
3807        }
3808        i += 1;
3809    }
3810    if !current.is_empty() {
3811        segments.push(PathSegment::Key(current));
3812    }
3813    segments
3814}
3815
3816/// Insert a value at a nested path in the result HashMap.
3817/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
3818fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3819    // Simple case: no nesting
3820    if !path.contains('.') && !path.contains('[') {
3821        result.insert(path.to_string(), value);
3822        return;
3823    }
3824
3825    let segments = parse_path_segments(path);
3826    if segments.is_empty() {
3827        return;
3828    }
3829
3830    let top_key = match &segments[0] {
3831        PathSegment::Key(k) => k.clone(),
3832        _ => return,
3833    };
3834
3835    if segments.len() == 1 {
3836        result.insert(top_key, value);
3837        return;
3838    }
3839
3840    // For nested paths, wrap the value back into the nested structure
3841    let wrapped = wrap_value_in_path(&segments[1..], value);
3842    // Merge into existing value if present
3843    let existing = result.remove(&top_key);
3844    let merged = match existing {
3845        Some(existing) => merge_attribute_values(existing, wrapped),
3846        None => wrapped,
3847    };
3848    result.insert(top_key, merged);
3849}
3850
3851/// Wrap a value in the nested path structure.
3852fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3853    if segments.is_empty() {
3854        return value;
3855    }
3856    let inner = wrap_value_in_path(&segments[1..], value);
3857    match &segments[0] {
3858        PathSegment::Key(k) => {
3859            json!({"M": {k.clone(): inner}})
3860        }
3861        PathSegment::Index(idx) => {
3862            let mut arr = vec![Value::Null; idx + 1];
3863            arr[*idx] = inner;
3864            json!({"L": arr})
3865        }
3866    }
3867}
3868
3869/// Merge two attribute values (for overlapping projections).
3870fn merge_attribute_values(a: Value, b: Value) -> Value {
3871    if let (Some(a_map), Some(b_map)) = (
3872        a.get("M").and_then(|v| v.as_object()),
3873        b.get("M").and_then(|v| v.as_object()),
3874    ) {
3875        let mut merged = a_map.clone();
3876        for (k, v) in b_map {
3877            if let Some(existing) = merged.get(k) {
3878                merged.insert(
3879                    k.clone(),
3880                    merge_attribute_values(existing.clone(), v.clone()),
3881                );
3882            } else {
3883                merged.insert(k.clone(), v.clone());
3884            }
3885        }
3886        json!({"M": merged})
3887    } else {
3888        b
3889    }
3890}
3891
3892fn evaluate_condition(
3893    condition: &str,
3894    existing: Option<&HashMap<String, AttributeValue>>,
3895    expr_attr_names: &HashMap<String, String>,
3896    expr_attr_values: &HashMap<String, Value>,
3897) -> Result<(), AwsServiceError> {
3898    let cond = condition.trim();
3899
3900    if let Some(inner) = extract_function_arg(cond, "attribute_not_exists") {
3901        let attr = resolve_attr_name(inner, expr_attr_names);
3902        match existing {
3903            Some(item) if item.contains_key(&attr) => {
3904                return Err(AwsServiceError::aws_error(
3905                    StatusCode::BAD_REQUEST,
3906                    "ConditionalCheckFailedException",
3907                    "The conditional request failed",
3908                ));
3909            }
3910            _ => return Ok(()),
3911        }
3912    }
3913
3914    if let Some(inner) = extract_function_arg(cond, "attribute_exists") {
3915        let attr = resolve_attr_name(inner, expr_attr_names);
3916        match existing {
3917            Some(item) if item.contains_key(&attr) => return Ok(()),
3918            _ => {
3919                return Err(AwsServiceError::aws_error(
3920                    StatusCode::BAD_REQUEST,
3921                    "ConditionalCheckFailedException",
3922                    "The conditional request failed",
3923                ));
3924            }
3925        }
3926    }
3927
3928    if let Some((left, op, right)) = parse_simple_comparison(cond) {
3929        let attr_name = resolve_attr_name(left.trim(), expr_attr_names);
3930        let expected = expr_attr_values.get(right.trim());
3931        let actual = existing.and_then(|item| item.get(&attr_name));
3932
3933        let result = match op {
3934            "=" => actual == expected,
3935            "<>" => actual != expected,
3936            _ => true,
3937        };
3938
3939        if !result {
3940            return Err(AwsServiceError::aws_error(
3941                StatusCode::BAD_REQUEST,
3942                "ConditionalCheckFailedException",
3943                "The conditional request failed",
3944            ));
3945        }
3946    }
3947
3948    Ok(())
3949}
3950
3951fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3952    let prefix = format!("{func_name}(");
3953    if let Some(rest) = expr.strip_prefix(&prefix) {
3954        if let Some(inner) = rest.strip_suffix(')') {
3955            return Some(inner.trim());
3956        }
3957    }
3958    None
3959}
3960
3961fn parse_simple_comparison(expr: &str) -> Option<(&str, &str, &str)> {
3962    for op in &["<>", "=", "<", ">", "<=", ">="] {
3963        if let Some(pos) = expr.find(op) {
3964            let left = &expr[..pos];
3965            let right = &expr[pos + op.len()..];
3966            return Some((left, op, right));
3967        }
3968    }
3969    None
3970}
3971
3972fn evaluate_key_condition(
3973    expr: &str,
3974    item: &HashMap<String, AttributeValue>,
3975    hash_key_name: &str,
3976    _range_key_name: Option<&str>,
3977    expr_attr_names: &HashMap<String, String>,
3978    expr_attr_values: &HashMap<String, Value>,
3979) -> bool {
3980    let parts: Vec<&str> = split_on_and(expr);
3981    for part in &parts {
3982        let part = part.trim();
3983        if !evaluate_single_key_condition(
3984            part,
3985            item,
3986            hash_key_name,
3987            expr_attr_names,
3988            expr_attr_values,
3989        ) {
3990            return false;
3991        }
3992    }
3993    true
3994}
3995
3996fn split_on_and(expr: &str) -> Vec<&str> {
3997    let mut parts = Vec::new();
3998    let mut start = 0;
3999    let len = expr.len();
4000    let mut i = 0;
4001    let mut depth = 0;
4002    while i < len {
4003        let ch = expr.as_bytes()[i];
4004        if ch == b'(' {
4005            depth += 1;
4006        } else if ch == b')' {
4007            if depth > 0 {
4008                depth -= 1;
4009            }
4010        } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
4011            parts.push(&expr[start..i]);
4012            start = i + 5;
4013            i = start;
4014            continue;
4015        }
4016        i += 1;
4017    }
4018    parts.push(&expr[start..]);
4019    parts
4020}
4021
4022fn split_on_or(expr: &str) -> Vec<&str> {
4023    let mut parts = Vec::new();
4024    let mut start = 0;
4025    let len = expr.len();
4026    let mut i = 0;
4027    let mut depth = 0;
4028    while i < len {
4029        let ch = expr.as_bytes()[i];
4030        if ch == b'(' {
4031            depth += 1;
4032        } else if ch == b')' {
4033            if depth > 0 {
4034                depth -= 1;
4035            }
4036        } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
4037            parts.push(&expr[start..i]);
4038            start = i + 4;
4039            i = start;
4040            continue;
4041        }
4042        i += 1;
4043    }
4044    parts.push(&expr[start..]);
4045    parts
4046}
4047
4048fn evaluate_single_key_condition(
4049    part: &str,
4050    item: &HashMap<String, AttributeValue>,
4051    _hash_key_name: &str,
4052    expr_attr_names: &HashMap<String, String>,
4053    expr_attr_values: &HashMap<String, Value>,
4054) -> bool {
4055    let part = part.trim();
4056
4057    // begins_with(attr, :val)
4058    if let Some(rest) = part
4059        .strip_prefix("begins_with(")
4060        .or_else(|| part.strip_prefix("begins_with ("))
4061    {
4062        if let Some(inner) = rest.strip_suffix(')') {
4063            let mut split = inner.splitn(2, ',');
4064            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4065                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4066                let val_ref = val_ref.trim();
4067                let expected = expr_attr_values.get(val_ref);
4068                let actual = item.get(&attr_name);
4069                return match (actual, expected) {
4070                    (Some(a), Some(e)) => {
4071                        let a_str = extract_string_value(a);
4072                        let e_str = extract_string_value(e);
4073                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
4074                    }
4075                    _ => false,
4076                };
4077            }
4078        }
4079        return false;
4080    }
4081
4082    // BETWEEN
4083    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
4084        let attr_part = part[..between_pos].trim();
4085        let attr_name = resolve_attr_name(attr_part, expr_attr_names);
4086        let range_part = &part[between_pos + 7..];
4087        if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
4088            let lo_ref = range_part[..and_pos].trim();
4089            let hi_ref = range_part[and_pos + 5..].trim();
4090            let lo = expr_attr_values.get(lo_ref);
4091            let hi = expr_attr_values.get(hi_ref);
4092            let actual = item.get(&attr_name);
4093            return match (actual, lo, hi) {
4094                (Some(a), Some(l), Some(h)) => {
4095                    compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
4096                        && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
4097                }
4098                _ => false,
4099            };
4100        }
4101    }
4102
4103    // Simple comparison: attr <op> :val
4104    for op in &["<=", ">=", "<>", "=", "<", ">"] {
4105        if let Some(pos) = part.find(op) {
4106            let left = part[..pos].trim();
4107            let right = part[pos + op.len()..].trim();
4108            let attr_name = resolve_attr_name(left, expr_attr_names);
4109            let expected = expr_attr_values.get(right);
4110            let actual = item.get(&attr_name);
4111
4112            return match *op {
4113                "=" => actual == expected,
4114                "<>" => actual != expected,
4115                "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
4116                ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
4117                "<=" => {
4118                    let cmp = compare_attribute_values(actual, expected);
4119                    cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
4120                }
4121                ">=" => {
4122                    let cmp = compare_attribute_values(actual, expected);
4123                    cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
4124                }
4125                _ => true,
4126            };
4127        }
4128    }
4129
4130    true
4131}
4132
4133fn extract_string_value(val: &Value) -> Option<String> {
4134    val.get("S")
4135        .and_then(|v| v.as_str())
4136        .map(|s| s.to_string())
4137        .or_else(|| val.get("N").and_then(|v| v.as_str()).map(|n| n.to_string()))
4138}
4139
4140fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
4141    match (a, b) {
4142        (None, None) => std::cmp::Ordering::Equal,
4143        (None, Some(_)) => std::cmp::Ordering::Less,
4144        (Some(_), None) => std::cmp::Ordering::Greater,
4145        (Some(a), Some(b)) => {
4146            let a_type = attribute_type_and_value(a);
4147            let b_type = attribute_type_and_value(b);
4148            match (a_type, b_type) {
4149                (Some(("S", a_val)), Some(("S", b_val))) => {
4150                    let a_str = a_val.as_str().unwrap_or("");
4151                    let b_str = b_val.as_str().unwrap_or("");
4152                    a_str.cmp(b_str)
4153                }
4154                (Some(("N", a_val)), Some(("N", b_val))) => {
4155                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4156                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4157                    a_num
4158                        .partial_cmp(&b_num)
4159                        .unwrap_or(std::cmp::Ordering::Equal)
4160                }
4161                (Some(("B", a_val)), Some(("B", b_val))) => {
4162                    let a_str = a_val.as_str().unwrap_or("");
4163                    let b_str = b_val.as_str().unwrap_or("");
4164                    a_str.cmp(b_str)
4165                }
4166                _ => std::cmp::Ordering::Equal,
4167            }
4168        }
4169    }
4170}
4171
4172fn evaluate_filter_expression(
4173    expr: &str,
4174    item: &HashMap<String, AttributeValue>,
4175    expr_attr_names: &HashMap<String, String>,
4176    expr_attr_values: &HashMap<String, Value>,
4177) -> bool {
4178    let trimmed = expr.trim();
4179
4180    // Split on OR first (lower precedence), respecting parentheses
4181    let or_parts = split_on_or(trimmed);
4182    if or_parts.len() > 1 {
4183        return or_parts.iter().any(|part| {
4184            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4185        });
4186    }
4187
4188    // Then split on AND (higher precedence), respecting parentheses
4189    let and_parts = split_on_and(trimmed);
4190    if and_parts.len() > 1 {
4191        return and_parts.iter().all(|part| {
4192            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4193        });
4194    }
4195
4196    // Strip outer parentheses if present
4197    let stripped = strip_outer_parens(trimmed);
4198    if stripped != trimmed {
4199        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
4200    }
4201
4202    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
4203}
4204
4205/// Strip matching outer parentheses from an expression.
4206fn strip_outer_parens(expr: &str) -> &str {
4207    let trimmed = expr.trim();
4208    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
4209        return trimmed;
4210    }
4211    // Verify the outer parens actually match each other
4212    let inner = &trimmed[1..trimmed.len() - 1];
4213    let mut depth = 0;
4214    for ch in inner.bytes() {
4215        match ch {
4216            b'(' => depth += 1,
4217            b')' => {
4218                if depth == 0 {
4219                    return trimmed; // closing paren matches something inside, not the outer one
4220                }
4221                depth -= 1;
4222            }
4223            _ => {}
4224        }
4225    }
4226    if depth == 0 {
4227        inner
4228    } else {
4229        trimmed
4230    }
4231}
4232
4233fn evaluate_single_filter_condition(
4234    part: &str,
4235    item: &HashMap<String, AttributeValue>,
4236    expr_attr_names: &HashMap<String, String>,
4237    expr_attr_values: &HashMap<String, Value>,
4238) -> bool {
4239    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
4240        let attr = resolve_attr_name(inner, expr_attr_names);
4241        return item.contains_key(&attr);
4242    }
4243
4244    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
4245        let attr = resolve_attr_name(inner, expr_attr_names);
4246        return !item.contains_key(&attr);
4247    }
4248
4249    if let Some(rest) = part
4250        .strip_prefix("begins_with(")
4251        .or_else(|| part.strip_prefix("begins_with ("))
4252    {
4253        if let Some(inner) = rest.strip_suffix(')') {
4254            let mut split = inner.splitn(2, ',');
4255            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4256                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4257                let expected = expr_attr_values.get(val_ref.trim());
4258                let actual = item.get(&attr_name);
4259                return match (actual, expected) {
4260                    (Some(a), Some(e)) => {
4261                        let a_str = extract_string_value(a);
4262                        let e_str = extract_string_value(e);
4263                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
4264                    }
4265                    _ => false,
4266                };
4267            }
4268        }
4269    }
4270
4271    if let Some(rest) = part
4272        .strip_prefix("contains(")
4273        .or_else(|| part.strip_prefix("contains ("))
4274    {
4275        if let Some(inner) = rest.strip_suffix(')') {
4276            let mut split = inner.splitn(2, ',');
4277            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4278                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4279                let expected = expr_attr_values.get(val_ref.trim());
4280                let actual = item.get(&attr_name);
4281                return match (actual, expected) {
4282                    (Some(a), Some(e)) => {
4283                        let a_str = extract_string_value(a);
4284                        let e_str = extract_string_value(e);
4285                        matches!((a_str, e_str), (Some(a), Some(e)) if a.contains(&e))
4286                    }
4287                    _ => false,
4288                };
4289            }
4290        }
4291    }
4292
4293    evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
4294}
4295
4296fn apply_update_expression(
4297    item: &mut HashMap<String, AttributeValue>,
4298    expr: &str,
4299    expr_attr_names: &HashMap<String, String>,
4300    expr_attr_values: &HashMap<String, Value>,
4301) -> Result<(), AwsServiceError> {
4302    let clauses = parse_update_clauses(expr);
4303    for (action, assignments) in &clauses {
4304        match action.to_ascii_uppercase().as_str() {
4305            "SET" => {
4306                for assignment in assignments {
4307                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4308                }
4309            }
4310            "REMOVE" => {
4311                for attr_ref in assignments {
4312                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4313                    item.remove(&attr);
4314                }
4315            }
4316            "ADD" => {
4317                for assignment in assignments {
4318                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4319                }
4320            }
4321            "DELETE" => {
4322                for assignment in assignments {
4323                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4324                }
4325            }
4326            _ => {}
4327        }
4328    }
4329    Ok(())
4330}
4331
4332fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
4333    let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
4334    let upper = expr.to_ascii_uppercase();
4335    let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
4336    let mut positions: Vec<(usize, &str)> = Vec::new();
4337
4338    for kw in &keywords {
4339        let mut search_from = 0;
4340        while let Some(pos) = upper[search_from..].find(kw) {
4341            let abs_pos = search_from + pos;
4342            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
4343            let after_pos = abs_pos + kw.len();
4344            let after_ok =
4345                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
4346            if before_ok && after_ok {
4347                positions.push((abs_pos, kw));
4348            }
4349            search_from = abs_pos + kw.len();
4350        }
4351    }
4352
4353    positions.sort_by_key(|(pos, _)| *pos);
4354
4355    for (i, &(pos, kw)) in positions.iter().enumerate() {
4356        let start = pos + kw.len();
4357        let end = if i + 1 < positions.len() {
4358            positions[i + 1].0
4359        } else {
4360            expr.len()
4361        };
4362        let content = expr[start..end].trim();
4363        let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
4364        clauses.push((kw.to_string(), assignments));
4365    }
4366
4367    clauses
4368}
4369
4370fn apply_set_assignment(
4371    item: &mut HashMap<String, AttributeValue>,
4372    assignment: &str,
4373    expr_attr_names: &HashMap<String, String>,
4374    expr_attr_values: &HashMap<String, Value>,
4375) -> Result<(), AwsServiceError> {
4376    let Some((left, right)) = assignment.split_once('=') else {
4377        return Ok(());
4378    };
4379
4380    let attr = resolve_attr_name(left.trim(), expr_attr_names);
4381    let right = right.trim();
4382
4383    // if_not_exists(attr, :val)
4384    if let Some(rest) = right
4385        .strip_prefix("if_not_exists(")
4386        .or_else(|| right.strip_prefix("if_not_exists ("))
4387    {
4388        if let Some(inner) = rest.strip_suffix(')') {
4389            let mut split = inner.splitn(2, ',');
4390            if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
4391                let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
4392                if !item.contains_key(&check_name) {
4393                    if let Some(val) = expr_attr_values.get(default_ref.trim()) {
4394                        item.insert(attr, val.clone());
4395                    }
4396                }
4397                return Ok(());
4398            }
4399        }
4400    }
4401
4402    // list_append(a, b)
4403    if let Some(rest) = right
4404        .strip_prefix("list_append(")
4405        .or_else(|| right.strip_prefix("list_append ("))
4406    {
4407        if let Some(inner) = rest.strip_suffix(')') {
4408            let mut split = inner.splitn(2, ',');
4409            if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
4410                let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
4411                let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
4412
4413                let mut merged = Vec::new();
4414                if let Some(Value::Object(obj)) = &a_val {
4415                    if let Some(Value::Array(arr)) = obj.get("L") {
4416                        merged.extend(arr.clone());
4417                    }
4418                }
4419                if let Some(Value::Object(obj)) = &b_val {
4420                    if let Some(Value::Array(arr)) = obj.get("L") {
4421                        merged.extend(arr.clone());
4422                    }
4423                }
4424
4425                item.insert(attr, json!({"L": merged}));
4426                return Ok(());
4427            }
4428        }
4429    }
4430
4431    // Arithmetic: attr + :val or attr - :val
4432    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
4433        let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
4434        let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
4435
4436        let left_num = extract_number(&left_val).unwrap_or(0.0);
4437        let right_num = extract_number(&right_val).unwrap_or(0.0);
4438
4439        let result = if is_add {
4440            left_num + right_num
4441        } else {
4442            left_num - right_num
4443        };
4444
4445        let num_str = if result == result.trunc() {
4446            format!("{}", result as i64)
4447        } else {
4448            format!("{result}")
4449        };
4450
4451        item.insert(attr, json!({"N": num_str}));
4452        return Ok(());
4453    }
4454
4455    // Simple assignment
4456    let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
4457    if let Some(v) = val {
4458        item.insert(attr, v);
4459    }
4460
4461    Ok(())
4462}
4463
4464fn resolve_value(
4465    reference: &str,
4466    item: &HashMap<String, AttributeValue>,
4467    expr_attr_names: &HashMap<String, String>,
4468    expr_attr_values: &HashMap<String, Value>,
4469) -> Option<Value> {
4470    let reference = reference.trim();
4471    if reference.starts_with(':') {
4472        expr_attr_values.get(reference).cloned()
4473    } else {
4474        let attr_name = resolve_attr_name(reference, expr_attr_names);
4475        item.get(&attr_name).cloned()
4476    }
4477}
4478
4479fn extract_number(val: &Option<Value>) -> Option<f64> {
4480    val.as_ref()
4481        .and_then(|v| v.get("N"))
4482        .and_then(|n| n.as_str())
4483        .and_then(|s| s.parse().ok())
4484}
4485
4486fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
4487    let mut depth = 0;
4488    for (i, c) in expr.char_indices() {
4489        match c {
4490            '(' => depth += 1,
4491            ')' => depth -= 1,
4492            '+' if depth == 0 && i > 0 => {
4493                return Some((&expr[..i], &expr[i + 1..], true));
4494            }
4495            '-' if depth == 0 && i > 0 => {
4496                return Some((&expr[..i], &expr[i + 1..], false));
4497            }
4498            _ => {}
4499        }
4500    }
4501    None
4502}
4503
4504fn apply_add_assignment(
4505    item: &mut HashMap<String, AttributeValue>,
4506    assignment: &str,
4507    expr_attr_names: &HashMap<String, String>,
4508    expr_attr_values: &HashMap<String, Value>,
4509) -> Result<(), AwsServiceError> {
4510    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4511    if parts.len() != 2 {
4512        return Ok(());
4513    }
4514
4515    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4516    let val_ref = parts[1].trim();
4517    let add_val = expr_attr_values.get(val_ref);
4518
4519    if let Some(add_val) = add_val {
4520        if let Some(existing) = item.get(&attr) {
4521            if let (Some(existing_num), Some(add_num)) = (
4522                extract_number(&Some(existing.clone())),
4523                extract_number(&Some(add_val.clone())),
4524            ) {
4525                let result = existing_num + add_num;
4526                let num_str = if result == result.trunc() {
4527                    format!("{}", result as i64)
4528                } else {
4529                    format!("{result}")
4530                };
4531                item.insert(attr, json!({"N": num_str}));
4532            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
4533                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
4534                    let mut merged: Vec<Value> = existing_set.clone();
4535                    for v in add_set {
4536                        if !merged.contains(v) {
4537                            merged.push(v.clone());
4538                        }
4539                    }
4540                    item.insert(attr, json!({"SS": merged}));
4541                }
4542            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
4543                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
4544                    let mut merged: Vec<Value> = existing_set.clone();
4545                    for v in add_set {
4546                        if !merged.contains(v) {
4547                            merged.push(v.clone());
4548                        }
4549                    }
4550                    item.insert(attr, json!({"NS": merged}));
4551                }
4552            }
4553        } else {
4554            item.insert(attr, add_val.clone());
4555        }
4556    }
4557
4558    Ok(())
4559}
4560
4561fn apply_delete_assignment(
4562    item: &mut HashMap<String, AttributeValue>,
4563    assignment: &str,
4564    expr_attr_names: &HashMap<String, String>,
4565    expr_attr_values: &HashMap<String, Value>,
4566) -> Result<(), AwsServiceError> {
4567    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4568    if parts.len() != 2 {
4569        return Ok(());
4570    }
4571
4572    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4573    let val_ref = parts[1].trim();
4574    let del_val = expr_attr_values.get(val_ref);
4575
4576    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
4577        if let (Some(existing_set), Some(del_set)) = (
4578            existing.get("SS").and_then(|v| v.as_array()),
4579            del_val.get("SS").and_then(|v| v.as_array()),
4580        ) {
4581            let filtered: Vec<Value> = existing_set
4582                .iter()
4583                .filter(|v| !del_set.contains(v))
4584                .cloned()
4585                .collect();
4586            if filtered.is_empty() {
4587                item.remove(&attr);
4588            } else {
4589                item.insert(attr, json!({"SS": filtered}));
4590            }
4591        } else if let (Some(existing_set), Some(del_set)) = (
4592            existing.get("NS").and_then(|v| v.as_array()),
4593            del_val.get("NS").and_then(|v| v.as_array()),
4594        ) {
4595            let filtered: Vec<Value> = existing_set
4596                .iter()
4597                .filter(|v| !del_set.contains(v))
4598                .cloned()
4599                .collect();
4600            if filtered.is_empty() {
4601                item.remove(&attr);
4602            } else {
4603                item.insert(attr, json!({"NS": filtered}));
4604            }
4605        }
4606    }
4607
4608    Ok(())
4609}
4610
4611#[allow(clippy::too_many_arguments)]
4612fn build_table_description_json(
4613    arn: &str,
4614    key_schema: &[KeySchemaElement],
4615    attribute_definitions: &[AttributeDefinition],
4616    provisioned_throughput: &ProvisionedThroughput,
4617    gsi: &[GlobalSecondaryIndex],
4618    lsi: &[LocalSecondaryIndex],
4619    billing_mode: &str,
4620    created_at: chrono::DateTime<chrono::Utc>,
4621    item_count: i64,
4622    size_bytes: i64,
4623    status: &str,
4624) -> Value {
4625    let table_name = arn.rsplit('/').next().unwrap_or("");
4626    let creation_timestamp =
4627        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
4628
4629    let ks: Vec<Value> = key_schema
4630        .iter()
4631        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4632        .collect();
4633
4634    let ad: Vec<Value> = attribute_definitions
4635        .iter()
4636        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
4637        .collect();
4638
4639    let mut desc = json!({
4640        "TableName": table_name,
4641        "TableArn": arn,
4642        "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
4643        "TableStatus": status,
4644        "KeySchema": ks,
4645        "AttributeDefinitions": ad,
4646        "CreationDateTime": creation_timestamp,
4647        "ItemCount": item_count,
4648        "TableSizeBytes": size_bytes,
4649        "BillingModeSummary": { "BillingMode": billing_mode },
4650    });
4651
4652    if billing_mode != "PAY_PER_REQUEST" {
4653        desc["ProvisionedThroughput"] = json!({
4654            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
4655            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
4656            "NumberOfDecreasesToday": 0,
4657        });
4658    } else {
4659        desc["ProvisionedThroughput"] = json!({
4660            "ReadCapacityUnits": 0,
4661            "WriteCapacityUnits": 0,
4662            "NumberOfDecreasesToday": 0,
4663        });
4664    }
4665
4666    if !gsi.is_empty() {
4667        let gsi_json: Vec<Value> = gsi
4668            .iter()
4669            .map(|g| {
4670                let gks: Vec<Value> = g
4671                    .key_schema
4672                    .iter()
4673                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4674                    .collect();
4675                let mut idx = json!({
4676                    "IndexName": g.index_name,
4677                    "KeySchema": gks,
4678                    "Projection": { "ProjectionType": g.projection.projection_type },
4679                    "IndexStatus": "ACTIVE",
4680                    "IndexArn": format!("{arn}/index/{}", g.index_name),
4681                    "ItemCount": 0,
4682                    "IndexSizeBytes": 0,
4683                });
4684                if !g.projection.non_key_attributes.is_empty() {
4685                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
4686                }
4687                if let Some(ref pt) = g.provisioned_throughput {
4688                    idx["ProvisionedThroughput"] = json!({
4689                        "ReadCapacityUnits": pt.read_capacity_units,
4690                        "WriteCapacityUnits": pt.write_capacity_units,
4691                        "NumberOfDecreasesToday": 0,
4692                    });
4693                }
4694                idx
4695            })
4696            .collect();
4697        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
4698    }
4699
4700    if !lsi.is_empty() {
4701        let lsi_json: Vec<Value> = lsi
4702            .iter()
4703            .map(|l| {
4704                let lks: Vec<Value> = l
4705                    .key_schema
4706                    .iter()
4707                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4708                    .collect();
4709                let mut idx = json!({
4710                    "IndexName": l.index_name,
4711                    "KeySchema": lks,
4712                    "Projection": { "ProjectionType": l.projection.projection_type },
4713                    "IndexArn": format!("{arn}/index/{}", l.index_name),
4714                    "ItemCount": 0,
4715                    "IndexSizeBytes": 0,
4716                });
4717                if !l.projection.non_key_attributes.is_empty() {
4718                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
4719                }
4720                idx
4721            })
4722            .collect();
4723        desc["LocalSecondaryIndexes"] = json!(lsi_json);
4724    }
4725
4726    desc
4727}
4728
4729fn build_table_description(table: &DynamoTable) -> Value {
4730    let mut desc = build_table_description_json(
4731        &table.arn,
4732        &table.key_schema,
4733        &table.attribute_definitions,
4734        &table.provisioned_throughput,
4735        &table.gsi,
4736        &table.lsi,
4737        &table.billing_mode,
4738        table.created_at,
4739        table.item_count,
4740        table.size_bytes,
4741        &table.status,
4742    );
4743
4744    // Add stream specification if streams are enabled
4745    if table.stream_enabled {
4746        if let Some(ref stream_arn) = table.stream_arn {
4747            desc["LatestStreamArn"] = json!(stream_arn);
4748            desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
4749        }
4750        if let Some(ref view_type) = table.stream_view_type {
4751            desc["StreamSpecification"] = json!({
4752                "StreamEnabled": true,
4753                "StreamViewType": view_type,
4754            });
4755        }
4756    }
4757
4758    // Add SSE description
4759    if let Some(ref sse_type) = table.sse_type {
4760        let mut sse_desc = json!({
4761            "Status": "ENABLED",
4762            "SSEType": sse_type,
4763        });
4764        if let Some(ref key_arn) = table.sse_kms_key_arn {
4765            sse_desc["KMSMasterKeyArn"] = json!(key_arn);
4766        }
4767        desc["SSEDescription"] = sse_desc;
4768    } else {
4769        // Default: AWS owned key encryption (always enabled in real AWS)
4770        desc["SSEDescription"] = json!({
4771            "Status": "ENABLED",
4772            "SSEType": "AES256",
4773        });
4774    }
4775
4776    desc
4777}
4778
4779fn execute_partiql_statement(
4780    state: &SharedDynamoDbState,
4781    statement: &str,
4782    parameters: &[Value],
4783) -> Result<AwsResponse, AwsServiceError> {
4784    let trimmed = statement.trim();
4785    let upper = trimmed.to_ascii_uppercase();
4786
4787    if upper.starts_with("SELECT") {
4788        execute_partiql_select(state, trimmed, parameters)
4789    } else if upper.starts_with("INSERT") {
4790        execute_partiql_insert(state, trimmed, parameters)
4791    } else if upper.starts_with("UPDATE") {
4792        execute_partiql_update(state, trimmed, parameters)
4793    } else if upper.starts_with("DELETE") {
4794        execute_partiql_delete(state, trimmed, parameters)
4795    } else {
4796        Err(AwsServiceError::aws_error(
4797            StatusCode::BAD_REQUEST,
4798            "ValidationException",
4799            format!("Unsupported PartiQL statement: {trimmed}"),
4800        ))
4801    }
4802}
4803
4804/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
4805fn execute_partiql_select(
4806    state: &SharedDynamoDbState,
4807    statement: &str,
4808    parameters: &[Value],
4809) -> Result<AwsResponse, AwsServiceError> {
4810    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
4811    let upper = statement.to_ascii_uppercase();
4812    let from_pos = upper.find("FROM").ok_or_else(|| {
4813        AwsServiceError::aws_error(
4814            StatusCode::BAD_REQUEST,
4815            "ValidationException",
4816            "Invalid SELECT statement: missing FROM",
4817        )
4818    })?;
4819
4820    let after_from = statement[from_pos + 4..].trim();
4821    let (table_name, rest) = parse_partiql_table_name(after_from);
4822
4823    let state = state.read();
4824    let table = get_table(&state.tables, &table_name)?;
4825
4826    let rest_upper = rest.trim().to_ascii_uppercase();
4827    if rest_upper.starts_with("WHERE") {
4828        let where_clause = rest.trim()[5..].trim();
4829        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
4830        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
4831        DynamoDbService::ok_json(json!({ "Items": items }))
4832    } else {
4833        // No WHERE, return all items
4834        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
4835        DynamoDbService::ok_json(json!({ "Items": items }))
4836    }
4837}
4838
4839fn execute_partiql_insert(
4840    state: &SharedDynamoDbState,
4841    statement: &str,
4842    parameters: &[Value],
4843) -> Result<AwsResponse, AwsServiceError> {
4844    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
4845    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
4846    let upper = statement.to_ascii_uppercase();
4847    let into_pos = upper.find("INTO").ok_or_else(|| {
4848        AwsServiceError::aws_error(
4849            StatusCode::BAD_REQUEST,
4850            "ValidationException",
4851            "Invalid INSERT statement: missing INTO",
4852        )
4853    })?;
4854
4855    let after_into = statement[into_pos + 4..].trim();
4856    let (table_name, rest) = parse_partiql_table_name(after_into);
4857
4858    let rest_upper = rest.trim().to_ascii_uppercase();
4859    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
4860        AwsServiceError::aws_error(
4861            StatusCode::BAD_REQUEST,
4862            "ValidationException",
4863            "Invalid INSERT statement: missing VALUE",
4864        )
4865    })?;
4866
4867    let value_str = rest.trim()[value_pos + 5..].trim();
4868    let item = parse_partiql_value_object(value_str, parameters)?;
4869
4870    let mut state = state.write();
4871    let table = get_table_mut(&mut state.tables, &table_name)?;
4872    let key = extract_key(table, &item);
4873    if table.find_item_index(&key).is_some() {
4874        // DynamoDB PartiQL INSERT fails if item exists
4875        return Err(AwsServiceError::aws_error(
4876            StatusCode::BAD_REQUEST,
4877            "DuplicateItemException",
4878            "Duplicate primary key exists in table",
4879        ));
4880    } else {
4881        table.items.push(item);
4882    }
4883    table.recalculate_stats();
4884
4885    DynamoDbService::ok_json(json!({}))
4886}
4887
4888fn execute_partiql_update(
4889    state: &SharedDynamoDbState,
4890    statement: &str,
4891    parameters: &[Value],
4892) -> Result<AwsResponse, AwsServiceError> {
4893    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
4894    // or: UPDATE "tablename" SET attr=? WHERE pk=?
4895    let after_update = statement[6..].trim(); // skip "UPDATE"
4896    let (table_name, rest) = parse_partiql_table_name(after_update);
4897
4898    let rest_upper = rest.trim().to_ascii_uppercase();
4899    let set_pos = rest_upper.find("SET").ok_or_else(|| {
4900        AwsServiceError::aws_error(
4901            StatusCode::BAD_REQUEST,
4902            "ValidationException",
4903            "Invalid UPDATE statement: missing SET",
4904        )
4905    })?;
4906
4907    let after_set = rest.trim()[set_pos + 3..].trim();
4908
4909    // Split on WHERE
4910    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
4911    let (set_clause, where_clause) = if let Some(wp) = where_pos {
4912        (&after_set[..wp], after_set[wp + 5..].trim())
4913    } else {
4914        (after_set, "")
4915    };
4916
4917    let mut state = state.write();
4918    let table = get_table_mut(&mut state.tables, &table_name)?;
4919
4920    let matched_indices = if !where_clause.is_empty() {
4921        find_partiql_where_indices(table, where_clause, parameters)?
4922    } else {
4923        (0..table.items.len()).collect()
4924    };
4925
4926    // Parse SET assignments: attr=value, attr2=value2
4927    let param_offset = count_params_in_str(where_clause);
4928    let assignments: Vec<&str> = set_clause.split(',').collect();
4929    for idx in &matched_indices {
4930        let mut local_offset = param_offset;
4931        for assignment in &assignments {
4932            let assignment = assignment.trim();
4933            if let Some((attr, val_str)) = assignment.split_once('=') {
4934                let attr = attr.trim().trim_matches('"');
4935                let val_str = val_str.trim();
4936                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
4937                if let Some(v) = value {
4938                    table.items[*idx].insert(attr.to_string(), v);
4939                }
4940            }
4941        }
4942    }
4943    table.recalculate_stats();
4944
4945    DynamoDbService::ok_json(json!({}))
4946}
4947
4948fn execute_partiql_delete(
4949    state: &SharedDynamoDbState,
4950    statement: &str,
4951    parameters: &[Value],
4952) -> Result<AwsResponse, AwsServiceError> {
4953    // Pattern: DELETE FROM "tablename" WHERE pk='val'
4954    let upper = statement.to_ascii_uppercase();
4955    let from_pos = upper.find("FROM").ok_or_else(|| {
4956        AwsServiceError::aws_error(
4957            StatusCode::BAD_REQUEST,
4958            "ValidationException",
4959            "Invalid DELETE statement: missing FROM",
4960        )
4961    })?;
4962
4963    let after_from = statement[from_pos + 4..].trim();
4964    let (table_name, rest) = parse_partiql_table_name(after_from);
4965
4966    let rest_upper = rest.trim().to_ascii_uppercase();
4967    if !rest_upper.starts_with("WHERE") {
4968        return Err(AwsServiceError::aws_error(
4969            StatusCode::BAD_REQUEST,
4970            "ValidationException",
4971            "DELETE requires a WHERE clause",
4972        ));
4973    }
4974    let where_clause = rest.trim()[5..].trim();
4975
4976    let mut state = state.write();
4977    let table = get_table_mut(&mut state.tables, &table_name)?;
4978
4979    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
4980    // Remove from highest index first to avoid invalidating lower indices
4981    indices.sort_unstable();
4982    indices.reverse();
4983    for idx in indices {
4984        table.items.remove(idx);
4985    }
4986    table.recalculate_stats();
4987
4988    DynamoDbService::ok_json(json!({}))
4989}
4990
4991/// Parse a table name that may be quoted with double quotes.
4992/// Returns (table_name, rest_of_string).
4993fn parse_partiql_table_name(s: &str) -> (String, &str) {
4994    let s = s.trim();
4995    if let Some(stripped) = s.strip_prefix('"') {
4996        // Quoted name
4997        if let Some(end) = stripped.find('"') {
4998            let name = &stripped[..end];
4999            let rest = &stripped[end + 1..];
5000            (name.to_string(), rest)
5001        } else {
5002            let end = s.find(' ').unwrap_or(s.len());
5003            (s[..end].trim_matches('"').to_string(), &s[end..])
5004        }
5005    } else {
5006        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
5007        (s[..end].to_string(), &s[end..])
5008    }
5009}
5010
5011/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
5012/// Returns matching items.
5013fn evaluate_partiql_where<'a>(
5014    table: &'a DynamoTable,
5015    where_clause: &str,
5016    parameters: &[Value],
5017) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
5018    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
5019    Ok(indices.iter().map(|i| &table.items[*i]).collect())
5020}
5021
5022fn find_partiql_where_indices(
5023    table: &DynamoTable,
5024    where_clause: &str,
5025    parameters: &[Value],
5026) -> Result<Vec<usize>, AwsServiceError> {
5027    // Support: col = 'val' AND col2 = 'val2'  or  col = ? AND col2 = ?
5028    // Case-insensitive AND splitting
5029    let upper = where_clause.to_uppercase();
5030    let conditions = if upper.contains(" AND ") {
5031        // Find positions of " AND " case-insensitively and split
5032        let mut parts = Vec::new();
5033        let mut last = 0;
5034        for (i, _) in upper.match_indices(" AND ") {
5035            parts.push(where_clause[last..i].trim());
5036            last = i + 5;
5037        }
5038        parts.push(where_clause[last..].trim());
5039        parts
5040    } else {
5041        vec![where_clause.trim()]
5042    };
5043
5044    let mut param_idx = 0usize;
5045    let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
5046
5047    for cond in &conditions {
5048        let cond = cond.trim();
5049        if let Some((left, right)) = cond.split_once('=') {
5050            let attr = left.trim().trim_matches('"').to_string();
5051            let val_str = right.trim();
5052            let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
5053            if let Some(v) = value {
5054                parsed_conditions.push((attr, v));
5055            }
5056        }
5057    }
5058
5059    let mut indices = Vec::new();
5060    for (i, item) in table.items.iter().enumerate() {
5061        let all_match = parsed_conditions
5062            .iter()
5063            .all(|(attr, expected)| item.get(attr) == Some(expected));
5064        if all_match {
5065            indices.push(i);
5066        }
5067    }
5068
5069    Ok(indices)
5070}
5071
5072/// Parse a PartiQL literal value. Supports:
5073/// - 'string' -> {"S": "string"}
5074/// - 123 -> {"N": "123"}
5075/// - ? -> parameter from list
5076fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
5077    let s = s.trim();
5078    if s == "?" {
5079        let idx = *param_idx;
5080        *param_idx += 1;
5081        parameters.get(idx).cloned()
5082    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
5083        let inner = &s[1..s.len() - 1];
5084        Some(json!({"S": inner}))
5085    } else if let Ok(n) = s.parse::<f64>() {
5086        let num_str = if n == n.trunc() {
5087            format!("{}", n as i64)
5088        } else {
5089            format!("{n}")
5090        };
5091        Some(json!({"N": num_str}))
5092    } else {
5093        None
5094    }
5095}
5096
5097/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
5098fn parse_partiql_value_object(
5099    s: &str,
5100    parameters: &[Value],
5101) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
5102    let s = s.trim();
5103    let inner = s
5104        .strip_prefix('{')
5105        .and_then(|s| s.strip_suffix('}'))
5106        .ok_or_else(|| {
5107            AwsServiceError::aws_error(
5108                StatusCode::BAD_REQUEST,
5109                "ValidationException",
5110                "Invalid VALUE: expected object literal",
5111            )
5112        })?;
5113
5114    let mut item = HashMap::new();
5115    let mut param_idx = 0usize;
5116
5117    // Simple comma-separated key:value parsing
5118    for pair in split_partiql_pairs(inner) {
5119        let pair = pair.trim();
5120        if pair.is_empty() {
5121            continue;
5122        }
5123        if let Some((key_part, val_part)) = pair.split_once(':') {
5124            let key = key_part
5125                .trim()
5126                .trim_matches('\'')
5127                .trim_matches('"')
5128                .to_string();
5129            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
5130                item.insert(key, val);
5131            }
5132        }
5133    }
5134
5135    Ok(item)
5136}
5137
5138/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
5139fn split_partiql_pairs(s: &str) -> Vec<&str> {
5140    let mut parts = Vec::new();
5141    let mut start = 0;
5142    let mut depth = 0;
5143    let mut in_quote = false;
5144
5145    for (i, c) in s.char_indices() {
5146        match c {
5147            '\'' if !in_quote => in_quote = true,
5148            '\'' if in_quote => in_quote = false,
5149            '{' if !in_quote => depth += 1,
5150            '}' if !in_quote => depth -= 1,
5151            ',' if !in_quote && depth == 0 => {
5152                parts.push(&s[start..i]);
5153                start = i + 1;
5154            }
5155            _ => {}
5156        }
5157    }
5158    parts.push(&s[start..]);
5159    parts
5160}
5161
5162/// Count ? parameters in a string.
5163fn count_params_in_str(s: &str) -> usize {
5164    s.chars().filter(|c| *c == '?').count()
5165}
5166
5167#[cfg(test)]
5168mod tests {
5169    use super::*;
5170    use serde_json::json;
5171
5172    #[test]
5173    fn test_parse_update_clauses_set() {
5174        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
5175        assert_eq!(clauses.len(), 1);
5176        assert_eq!(clauses[0].0, "SET");
5177        assert_eq!(clauses[0].1.len(), 2);
5178    }
5179
5180    #[test]
5181    fn test_parse_update_clauses_set_and_remove() {
5182        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
5183        assert_eq!(clauses.len(), 2);
5184        assert_eq!(clauses[0].0, "SET");
5185        assert_eq!(clauses[1].0, "REMOVE");
5186    }
5187
5188    #[test]
5189    fn test_evaluate_key_condition_simple() {
5190        let mut item = HashMap::new();
5191        item.insert("pk".to_string(), json!({"S": "user1"}));
5192        item.insert("sk".to_string(), json!({"S": "order1"}));
5193
5194        let mut expr_values = HashMap::new();
5195        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
5196
5197        assert!(evaluate_key_condition(
5198            "pk = :pk",
5199            &item,
5200            "pk",
5201            Some("sk"),
5202            &HashMap::new(),
5203            &expr_values,
5204        ));
5205    }
5206
5207    #[test]
5208    fn test_compare_attribute_values_numbers() {
5209        let a = json!({"N": "10"});
5210        let b = json!({"N": "20"});
5211        assert_eq!(
5212            compare_attribute_values(Some(&a), Some(&b)),
5213            std::cmp::Ordering::Less
5214        );
5215    }
5216
5217    #[test]
5218    fn test_compare_attribute_values_strings() {
5219        let a = json!({"S": "apple"});
5220        let b = json!({"S": "banana"});
5221        assert_eq!(
5222            compare_attribute_values(Some(&a), Some(&b)),
5223            std::cmp::Ordering::Less
5224        );
5225    }
5226
5227    #[test]
5228    fn test_split_on_and() {
5229        let parts = split_on_and("pk = :pk AND sk > :sk");
5230        assert_eq!(parts.len(), 2);
5231        assert_eq!(parts[0].trim(), "pk = :pk");
5232        assert_eq!(parts[1].trim(), "sk > :sk");
5233    }
5234
5235    #[test]
5236    fn test_split_on_and_respects_parentheses() {
5237        // Before fix: split_on_and would split inside the parens
5238        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
5239        // Should NOT split on the AND inside parentheses
5240        assert_eq!(parts.len(), 1);
5241        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
5242    }
5243
5244    #[test]
5245    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
5246        // (a AND b) OR c — should match when c is true but a is false
5247        let mut item = HashMap::new();
5248        item.insert("x".to_string(), json!({"S": "no"}));
5249        item.insert("y".to_string(), json!({"S": "no"}));
5250        item.insert("z".to_string(), json!({"S": "yes"}));
5251
5252        let mut expr_values = HashMap::new();
5253        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
5254
5255        // x=yes AND y=yes => false, but z=yes => true => overall true
5256        let result = evaluate_filter_expression(
5257            "(x = :yes AND y = :yes) OR z = :yes",
5258            &item,
5259            &HashMap::new(),
5260            &expr_values,
5261        );
5262        assert!(result, "should match because z = :yes is true");
5263
5264        // x=yes AND y=yes => false, z=yes => false => overall false
5265        let mut item2 = HashMap::new();
5266        item2.insert("x".to_string(), json!({"S": "no"}));
5267        item2.insert("y".to_string(), json!({"S": "no"}));
5268        item2.insert("z".to_string(), json!({"S": "no"}));
5269
5270        let result2 = evaluate_filter_expression(
5271            "(x = :yes AND y = :yes) OR z = :yes",
5272            &item2,
5273            &HashMap::new(),
5274            &expr_values,
5275        );
5276        assert!(!result2, "should not match because nothing is true");
5277    }
5278
5279    #[test]
5280    fn test_project_item_nested_path() {
5281        // Item with a list attribute containing maps
5282        let mut item = HashMap::new();
5283        item.insert("pk".to_string(), json!({"S": "key1"}));
5284        item.insert(
5285            "data".to_string(),
5286            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
5287        );
5288
5289        let body = json!({
5290            "ProjectionExpression": "data[0].name"
5291        });
5292
5293        let projected = project_item(&item, &body);
5294        // Should contain data[0].name = "Alice", not the entire data[0] element
5295        let name = projected
5296            .get("data")
5297            .and_then(|v| v.get("L"))
5298            .and_then(|v| v.get(0))
5299            .and_then(|v| v.get("M"))
5300            .and_then(|v| v.get("name"))
5301            .and_then(|v| v.get("S"))
5302            .and_then(|v| v.as_str());
5303        assert_eq!(name, Some("Alice"));
5304
5305        // Should NOT contain the "age" field
5306        let age = projected
5307            .get("data")
5308            .and_then(|v| v.get("L"))
5309            .and_then(|v| v.get(0))
5310            .and_then(|v| v.get("M"))
5311            .and_then(|v| v.get("age"));
5312        assert!(age.is_none(), "age should not be present in projection");
5313    }
5314
5315    #[test]
5316    fn test_resolve_nested_path_map() {
5317        let mut item = HashMap::new();
5318        item.insert(
5319            "info".to_string(),
5320            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
5321        );
5322
5323        let result = resolve_nested_path(&item, "info.address.city");
5324        assert_eq!(result, Some(json!({"S": "NYC"})));
5325    }
5326
5327    #[test]
5328    fn test_resolve_nested_path_list_then_map() {
5329        let mut item = HashMap::new();
5330        item.insert(
5331            "items".to_string(),
5332            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
5333        );
5334
5335        let result = resolve_nested_path(&item, "items[0].sku");
5336        assert_eq!(result, Some(json!({"S": "ABC"})));
5337    }
5338
5339    // -- Integration-style tests using DynamoDbService --
5340
5341    use crate::state::SharedDynamoDbState;
5342    use parking_lot::RwLock;
5343    use std::sync::Arc;
5344
5345    fn make_service() -> DynamoDbService {
5346        let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
5347            "123456789012",
5348            "us-east-1",
5349        )));
5350        DynamoDbService::new(state)
5351    }
5352
5353    fn make_request(action: &str, body: Value) -> AwsRequest {
5354        AwsRequest {
5355            service: "dynamodb".to_string(),
5356            action: action.to_string(),
5357            region: "us-east-1".to_string(),
5358            account_id: "123456789012".to_string(),
5359            request_id: "test-id".to_string(),
5360            headers: http::HeaderMap::new(),
5361            query_params: HashMap::new(),
5362            body: serde_json::to_vec(&body).unwrap().into(),
5363            path_segments: vec![],
5364            raw_path: "/".to_string(),
5365            raw_query: String::new(),
5366            method: http::Method::POST,
5367            is_query_protocol: false,
5368            access_key_id: None,
5369        }
5370    }
5371
5372    fn create_test_table(svc: &DynamoDbService) {
5373        let req = make_request(
5374            "CreateTable",
5375            json!({
5376                "TableName": "test-table",
5377                "KeySchema": [
5378                    { "AttributeName": "pk", "KeyType": "HASH" }
5379                ],
5380                "AttributeDefinitions": [
5381                    { "AttributeName": "pk", "AttributeType": "S" }
5382                ],
5383                "BillingMode": "PAY_PER_REQUEST"
5384            }),
5385        );
5386        svc.create_table(&req).unwrap();
5387    }
5388
5389    #[test]
5390    fn delete_item_return_values_all_old() {
5391        let svc = make_service();
5392        create_test_table(&svc);
5393
5394        // Put an item
5395        let req = make_request(
5396            "PutItem",
5397            json!({
5398                "TableName": "test-table",
5399                "Item": {
5400                    "pk": { "S": "key1" },
5401                    "name": { "S": "Alice" },
5402                    "age": { "N": "30" }
5403                }
5404            }),
5405        );
5406        svc.put_item(&req).unwrap();
5407
5408        // Delete with ReturnValues=ALL_OLD
5409        let req = make_request(
5410            "DeleteItem",
5411            json!({
5412                "TableName": "test-table",
5413                "Key": { "pk": { "S": "key1" } },
5414                "ReturnValues": "ALL_OLD"
5415            }),
5416        );
5417        let resp = svc.delete_item(&req).unwrap();
5418        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5419
5420        // Verify the old item is returned
5421        let attrs = &body["Attributes"];
5422        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
5423        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
5424        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
5425
5426        // Verify the item is actually deleted
5427        let req = make_request(
5428            "GetItem",
5429            json!({
5430                "TableName": "test-table",
5431                "Key": { "pk": { "S": "key1" } }
5432            }),
5433        );
5434        let resp = svc.get_item(&req).unwrap();
5435        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5436        assert!(body.get("Item").is_none(), "item should be deleted");
5437    }
5438
5439    #[test]
5440    fn transact_get_items_returns_existing_and_missing() {
5441        let svc = make_service();
5442        create_test_table(&svc);
5443
5444        // Put one item
5445        let req = make_request(
5446            "PutItem",
5447            json!({
5448                "TableName": "test-table",
5449                "Item": {
5450                    "pk": { "S": "exists" },
5451                    "val": { "S": "hello" }
5452                }
5453            }),
5454        );
5455        svc.put_item(&req).unwrap();
5456
5457        let req = make_request(
5458            "TransactGetItems",
5459            json!({
5460                "TransactItems": [
5461                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
5462                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
5463                ]
5464            }),
5465        );
5466        let resp = svc.transact_get_items(&req).unwrap();
5467        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5468        let responses = body["Responses"].as_array().unwrap();
5469        assert_eq!(responses.len(), 2);
5470        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
5471        assert!(responses[1].get("Item").is_none());
5472    }
5473
5474    #[test]
5475    fn transact_write_items_put_and_delete() {
5476        let svc = make_service();
5477        create_test_table(&svc);
5478
5479        // Put initial item
5480        let req = make_request(
5481            "PutItem",
5482            json!({
5483                "TableName": "test-table",
5484                "Item": {
5485                    "pk": { "S": "to-delete" },
5486                    "val": { "S": "bye" }
5487                }
5488            }),
5489        );
5490        svc.put_item(&req).unwrap();
5491
5492        // TransactWrite: put new + delete existing
5493        let req = make_request(
5494            "TransactWriteItems",
5495            json!({
5496                "TransactItems": [
5497                    {
5498                        "Put": {
5499                            "TableName": "test-table",
5500                            "Item": {
5501                                "pk": { "S": "new-item" },
5502                                "val": { "S": "hi" }
5503                            }
5504                        }
5505                    },
5506                    {
5507                        "Delete": {
5508                            "TableName": "test-table",
5509                            "Key": { "pk": { "S": "to-delete" } }
5510                        }
5511                    }
5512                ]
5513            }),
5514        );
5515        let resp = svc.transact_write_items(&req).unwrap();
5516        assert_eq!(resp.status, StatusCode::OK);
5517
5518        // Verify new item exists
5519        let req = make_request(
5520            "GetItem",
5521            json!({
5522                "TableName": "test-table",
5523                "Key": { "pk": { "S": "new-item" } }
5524            }),
5525        );
5526        let resp = svc.get_item(&req).unwrap();
5527        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5528        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
5529
5530        // Verify deleted item is gone
5531        let req = make_request(
5532            "GetItem",
5533            json!({
5534                "TableName": "test-table",
5535                "Key": { "pk": { "S": "to-delete" } }
5536            }),
5537        );
5538        let resp = svc.get_item(&req).unwrap();
5539        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5540        assert!(body.get("Item").is_none());
5541    }
5542
5543    #[test]
5544    fn transact_write_items_condition_check_failure() {
5545        let svc = make_service();
5546        create_test_table(&svc);
5547
5548        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
5549        let req = make_request(
5550            "TransactWriteItems",
5551            json!({
5552                "TransactItems": [
5553                    {
5554                        "ConditionCheck": {
5555                            "TableName": "test-table",
5556                            "Key": { "pk": { "S": "nonexistent" } },
5557                            "ConditionExpression": "attribute_exists(pk)"
5558                        }
5559                    }
5560                ]
5561            }),
5562        );
5563        let resp = svc.transact_write_items(&req).unwrap();
5564        // Should be a 400 error response
5565        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
5566        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5567        assert_eq!(
5568            body["__type"].as_str().unwrap(),
5569            "TransactionCanceledException"
5570        );
5571        assert!(body["CancellationReasons"].as_array().is_some());
5572    }
5573
5574    #[test]
5575    fn update_and_describe_time_to_live() {
5576        let svc = make_service();
5577        create_test_table(&svc);
5578
5579        // Enable TTL
5580        let req = make_request(
5581            "UpdateTimeToLive",
5582            json!({
5583                "TableName": "test-table",
5584                "TimeToLiveSpecification": {
5585                    "AttributeName": "ttl",
5586                    "Enabled": true
5587                }
5588            }),
5589        );
5590        let resp = svc.update_time_to_live(&req).unwrap();
5591        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5592        assert_eq!(
5593            body["TimeToLiveSpecification"]["AttributeName"]
5594                .as_str()
5595                .unwrap(),
5596            "ttl"
5597        );
5598        assert!(body["TimeToLiveSpecification"]["Enabled"]
5599            .as_bool()
5600            .unwrap());
5601
5602        // Describe TTL
5603        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5604        let resp = svc.describe_time_to_live(&req).unwrap();
5605        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5606        assert_eq!(
5607            body["TimeToLiveDescription"]["TimeToLiveStatus"]
5608                .as_str()
5609                .unwrap(),
5610            "ENABLED"
5611        );
5612        assert_eq!(
5613            body["TimeToLiveDescription"]["AttributeName"]
5614                .as_str()
5615                .unwrap(),
5616            "ttl"
5617        );
5618
5619        // Disable TTL
5620        let req = make_request(
5621            "UpdateTimeToLive",
5622            json!({
5623                "TableName": "test-table",
5624                "TimeToLiveSpecification": {
5625                    "AttributeName": "ttl",
5626                    "Enabled": false
5627                }
5628            }),
5629        );
5630        svc.update_time_to_live(&req).unwrap();
5631
5632        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5633        let resp = svc.describe_time_to_live(&req).unwrap();
5634        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5635        assert_eq!(
5636            body["TimeToLiveDescription"]["TimeToLiveStatus"]
5637                .as_str()
5638                .unwrap(),
5639            "DISABLED"
5640        );
5641    }
5642
5643    #[test]
5644    fn resource_policy_lifecycle() {
5645        let svc = make_service();
5646        create_test_table(&svc);
5647
5648        let table_arn = {
5649            let state = svc.state.read();
5650            state.tables.get("test-table").unwrap().arn.clone()
5651        };
5652
5653        // Put policy
5654        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
5655        let req = make_request(
5656            "PutResourcePolicy",
5657            json!({
5658                "ResourceArn": table_arn,
5659                "Policy": policy_doc
5660            }),
5661        );
5662        let resp = svc.put_resource_policy(&req).unwrap();
5663        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5664        assert!(body["RevisionId"].as_str().is_some());
5665
5666        // Get policy
5667        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5668        let resp = svc.get_resource_policy(&req).unwrap();
5669        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5670        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
5671
5672        // Delete policy
5673        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
5674        svc.delete_resource_policy(&req).unwrap();
5675
5676        // Get should return null now
5677        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5678        let resp = svc.get_resource_policy(&req).unwrap();
5679        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5680        assert!(body["Policy"].is_null());
5681    }
5682
5683    #[test]
5684    fn describe_endpoints() {
5685        let svc = make_service();
5686        let req = make_request("DescribeEndpoints", json!({}));
5687        let resp = svc.describe_endpoints(&req).unwrap();
5688        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5689        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
5690    }
5691
5692    #[test]
5693    fn describe_limits() {
5694        let svc = make_service();
5695        let req = make_request("DescribeLimits", json!({}));
5696        let resp = svc.describe_limits(&req).unwrap();
5697        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5698        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
5699    }
5700
5701    #[test]
5702    fn backup_lifecycle() {
5703        let svc = make_service();
5704        create_test_table(&svc);
5705
5706        // Create backup
5707        let req = make_request(
5708            "CreateBackup",
5709            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
5710        );
5711        let resp = svc.create_backup(&req).unwrap();
5712        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5713        let backup_arn = body["BackupDetails"]["BackupArn"]
5714            .as_str()
5715            .unwrap()
5716            .to_string();
5717        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
5718
5719        // Describe backup
5720        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
5721        let resp = svc.describe_backup(&req).unwrap();
5722        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5723        assert_eq!(
5724            body["BackupDescription"]["BackupDetails"]["BackupName"],
5725            "my-backup"
5726        );
5727
5728        // List backups
5729        let req = make_request("ListBackups", json!({}));
5730        let resp = svc.list_backups(&req).unwrap();
5731        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5732        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
5733
5734        // Restore from backup
5735        let req = make_request(
5736            "RestoreTableFromBackup",
5737            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
5738        );
5739        svc.restore_table_from_backup(&req).unwrap();
5740
5741        // Verify restored table exists
5742        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
5743        let resp = svc.describe_table(&req).unwrap();
5744        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5745        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5746
5747        // Delete backup
5748        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
5749        svc.delete_backup(&req).unwrap();
5750
5751        // List should be empty
5752        let req = make_request("ListBackups", json!({}));
5753        let resp = svc.list_backups(&req).unwrap();
5754        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5755        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
5756    }
5757
5758    #[test]
5759    fn continuous_backups() {
5760        let svc = make_service();
5761        create_test_table(&svc);
5762
5763        // Initially disabled
5764        let req = make_request(
5765            "DescribeContinuousBackups",
5766            json!({ "TableName": "test-table" }),
5767        );
5768        let resp = svc.describe_continuous_backups(&req).unwrap();
5769        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5770        assert_eq!(
5771            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5772                ["PointInTimeRecoveryStatus"],
5773            "DISABLED"
5774        );
5775
5776        // Enable
5777        let req = make_request(
5778            "UpdateContinuousBackups",
5779            json!({
5780                "TableName": "test-table",
5781                "PointInTimeRecoverySpecification": {
5782                    "PointInTimeRecoveryEnabled": true
5783                }
5784            }),
5785        );
5786        svc.update_continuous_backups(&req).unwrap();
5787
5788        // Verify
5789        let req = make_request(
5790            "DescribeContinuousBackups",
5791            json!({ "TableName": "test-table" }),
5792        );
5793        let resp = svc.describe_continuous_backups(&req).unwrap();
5794        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5795        assert_eq!(
5796            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5797                ["PointInTimeRecoveryStatus"],
5798            "ENABLED"
5799        );
5800    }
5801
5802    #[test]
5803    fn restore_table_to_point_in_time() {
5804        let svc = make_service();
5805        create_test_table(&svc);
5806
5807        let req = make_request(
5808            "RestoreTableToPointInTime",
5809            json!({
5810                "SourceTableName": "test-table",
5811                "TargetTableName": "pitr-restored"
5812            }),
5813        );
5814        svc.restore_table_to_point_in_time(&req).unwrap();
5815
5816        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
5817        let resp = svc.describe_table(&req).unwrap();
5818        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5819        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5820    }
5821
5822    #[test]
5823    fn global_table_lifecycle() {
5824        let svc = make_service();
5825
5826        // Create global table
5827        let req = make_request(
5828            "CreateGlobalTable",
5829            json!({
5830                "GlobalTableName": "my-global",
5831                "ReplicationGroup": [
5832                    { "RegionName": "us-east-1" },
5833                    { "RegionName": "eu-west-1" }
5834                ]
5835            }),
5836        );
5837        let resp = svc.create_global_table(&req).unwrap();
5838        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5839        assert_eq!(
5840            body["GlobalTableDescription"]["GlobalTableStatus"],
5841            "ACTIVE"
5842        );
5843
5844        // Describe
5845        let req = make_request(
5846            "DescribeGlobalTable",
5847            json!({ "GlobalTableName": "my-global" }),
5848        );
5849        let resp = svc.describe_global_table(&req).unwrap();
5850        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5851        assert_eq!(
5852            body["GlobalTableDescription"]["ReplicationGroup"]
5853                .as_array()
5854                .unwrap()
5855                .len(),
5856            2
5857        );
5858
5859        // List
5860        let req = make_request("ListGlobalTables", json!({}));
5861        let resp = svc.list_global_tables(&req).unwrap();
5862        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5863        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
5864
5865        // Update - add a region
5866        let req = make_request(
5867            "UpdateGlobalTable",
5868            json!({
5869                "GlobalTableName": "my-global",
5870                "ReplicaUpdates": [
5871                    { "Create": { "RegionName": "ap-southeast-1" } }
5872                ]
5873            }),
5874        );
5875        let resp = svc.update_global_table(&req).unwrap();
5876        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5877        assert_eq!(
5878            body["GlobalTableDescription"]["ReplicationGroup"]
5879                .as_array()
5880                .unwrap()
5881                .len(),
5882            3
5883        );
5884
5885        // Describe settings
5886        let req = make_request(
5887            "DescribeGlobalTableSettings",
5888            json!({ "GlobalTableName": "my-global" }),
5889        );
5890        let resp = svc.describe_global_table_settings(&req).unwrap();
5891        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5892        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
5893
5894        // Update settings (no-op, just verify no error)
5895        let req = make_request(
5896            "UpdateGlobalTableSettings",
5897            json!({ "GlobalTableName": "my-global" }),
5898        );
5899        svc.update_global_table_settings(&req).unwrap();
5900    }
5901
5902    #[test]
5903    fn table_replica_auto_scaling() {
5904        let svc = make_service();
5905        create_test_table(&svc);
5906
5907        let req = make_request(
5908            "DescribeTableReplicaAutoScaling",
5909            json!({ "TableName": "test-table" }),
5910        );
5911        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
5912        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5913        assert_eq!(
5914            body["TableAutoScalingDescription"]["TableName"],
5915            "test-table"
5916        );
5917
5918        let req = make_request(
5919            "UpdateTableReplicaAutoScaling",
5920            json!({ "TableName": "test-table" }),
5921        );
5922        svc.update_table_replica_auto_scaling(&req).unwrap();
5923    }
5924
5925    #[test]
5926    fn kinesis_streaming_lifecycle() {
5927        let svc = make_service();
5928        create_test_table(&svc);
5929
5930        // Enable
5931        let req = make_request(
5932            "EnableKinesisStreamingDestination",
5933            json!({
5934                "TableName": "test-table",
5935                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5936            }),
5937        );
5938        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
5939        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5940        assert_eq!(body["DestinationStatus"], "ACTIVE");
5941
5942        // Describe
5943        let req = make_request(
5944            "DescribeKinesisStreamingDestination",
5945            json!({ "TableName": "test-table" }),
5946        );
5947        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
5948        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5949        assert_eq!(
5950            body["KinesisDataStreamDestinations"]
5951                .as_array()
5952                .unwrap()
5953                .len(),
5954            1
5955        );
5956
5957        // Update
5958        let req = make_request(
5959            "UpdateKinesisStreamingDestination",
5960            json!({
5961                "TableName": "test-table",
5962                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
5963                "UpdateKinesisStreamingConfiguration": {
5964                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
5965                }
5966            }),
5967        );
5968        svc.update_kinesis_streaming_destination(&req).unwrap();
5969
5970        // Disable
5971        let req = make_request(
5972            "DisableKinesisStreamingDestination",
5973            json!({
5974                "TableName": "test-table",
5975                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5976            }),
5977        );
5978        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
5979        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5980        assert_eq!(body["DestinationStatus"], "DISABLED");
5981    }
5982
5983    #[test]
5984    fn contributor_insights_lifecycle() {
5985        let svc = make_service();
5986        create_test_table(&svc);
5987
5988        // Initially disabled
5989        let req = make_request(
5990            "DescribeContributorInsights",
5991            json!({ "TableName": "test-table" }),
5992        );
5993        let resp = svc.describe_contributor_insights(&req).unwrap();
5994        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5995        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5996
5997        // Enable
5998        let req = make_request(
5999            "UpdateContributorInsights",
6000            json!({
6001                "TableName": "test-table",
6002                "ContributorInsightsAction": "ENABLE"
6003            }),
6004        );
6005        let resp = svc.update_contributor_insights(&req).unwrap();
6006        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6007        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6008
6009        // List
6010        let req = make_request("ListContributorInsights", json!({}));
6011        let resp = svc.list_contributor_insights(&req).unwrap();
6012        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6013        assert_eq!(
6014            body["ContributorInsightsSummaries"]
6015                .as_array()
6016                .unwrap()
6017                .len(),
6018            1
6019        );
6020    }
6021
6022    #[test]
6023    fn export_lifecycle() {
6024        let svc = make_service();
6025        create_test_table(&svc);
6026
6027        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
6028
6029        // Export
6030        let req = make_request(
6031            "ExportTableToPointInTime",
6032            json!({
6033                "TableArn": table_arn,
6034                "S3Bucket": "my-bucket"
6035            }),
6036        );
6037        let resp = svc.export_table_to_point_in_time(&req).unwrap();
6038        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6039        let export_arn = body["ExportDescription"]["ExportArn"]
6040            .as_str()
6041            .unwrap()
6042            .to_string();
6043        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
6044
6045        // Describe
6046        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
6047        let resp = svc.describe_export(&req).unwrap();
6048        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6049        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
6050
6051        // List
6052        let req = make_request("ListExports", json!({}));
6053        let resp = svc.list_exports(&req).unwrap();
6054        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6055        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
6056    }
6057
6058    #[test]
6059    fn import_lifecycle() {
6060        let svc = make_service();
6061
6062        let req = make_request(
6063            "ImportTable",
6064            json!({
6065                "InputFormat": "DYNAMODB_JSON",
6066                "S3BucketSource": { "S3Bucket": "import-bucket" },
6067                "TableCreationParameters": {
6068                    "TableName": "imported-table",
6069                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
6070                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
6071                }
6072            }),
6073        );
6074        let resp = svc.import_table(&req).unwrap();
6075        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6076        let import_arn = body["ImportTableDescription"]["ImportArn"]
6077            .as_str()
6078            .unwrap()
6079            .to_string();
6080        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6081
6082        // Describe import
6083        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
6084        let resp = svc.describe_import(&req).unwrap();
6085        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6086        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6087
6088        // List imports
6089        let req = make_request("ListImports", json!({}));
6090        let resp = svc.list_imports(&req).unwrap();
6091        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6092        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
6093
6094        // Verify the table was created
6095        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
6096        let resp = svc.describe_table(&req).unwrap();
6097        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6098        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6099    }
6100
6101    #[test]
6102    fn backup_restore_preserves_items() {
6103        let svc = make_service();
6104        create_test_table(&svc);
6105
6106        // Put 3 items
6107        for i in 1..=3 {
6108            let req = make_request(
6109                "PutItem",
6110                json!({
6111                    "TableName": "test-table",
6112                    "Item": {
6113                        "pk": { "S": format!("key{i}") },
6114                        "data": { "S": format!("value{i}") }
6115                    }
6116                }),
6117            );
6118            svc.put_item(&req).unwrap();
6119        }
6120
6121        // Create backup
6122        let req = make_request(
6123            "CreateBackup",
6124            json!({
6125                "TableName": "test-table",
6126                "BackupName": "my-backup"
6127            }),
6128        );
6129        let resp = svc.create_backup(&req).unwrap();
6130        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6131        let backup_arn = body["BackupDetails"]["BackupArn"]
6132            .as_str()
6133            .unwrap()
6134            .to_string();
6135
6136        // Delete all items from the original table
6137        for i in 1..=3 {
6138            let req = make_request(
6139                "DeleteItem",
6140                json!({
6141                    "TableName": "test-table",
6142                    "Key": { "pk": { "S": format!("key{i}") } }
6143                }),
6144            );
6145            svc.delete_item(&req).unwrap();
6146        }
6147
6148        // Verify original table is empty
6149        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6150        let resp = svc.scan(&req).unwrap();
6151        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6152        assert_eq!(body["Count"], 0);
6153
6154        // Restore from backup
6155        let req = make_request(
6156            "RestoreTableFromBackup",
6157            json!({
6158                "BackupArn": backup_arn,
6159                "TargetTableName": "restored-table"
6160            }),
6161        );
6162        svc.restore_table_from_backup(&req).unwrap();
6163
6164        // Scan restored table — should have 3 items
6165        let req = make_request("Scan", json!({ "TableName": "restored-table" }));
6166        let resp = svc.scan(&req).unwrap();
6167        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6168        assert_eq!(body["Count"], 3);
6169        assert_eq!(body["Items"].as_array().unwrap().len(), 3);
6170    }
6171
6172    #[test]
6173    fn global_table_replicates_writes() {
6174        let svc = make_service();
6175        create_test_table(&svc);
6176
6177        // Create global table with replicas
6178        let req = make_request(
6179            "CreateGlobalTable",
6180            json!({
6181                "GlobalTableName": "test-table",
6182                "ReplicationGroup": [
6183                    { "RegionName": "us-east-1" },
6184                    { "RegionName": "eu-west-1" }
6185                ]
6186            }),
6187        );
6188        let resp = svc.create_global_table(&req).unwrap();
6189        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6190        assert_eq!(
6191            body["GlobalTableDescription"]["GlobalTableStatus"],
6192            "ACTIVE"
6193        );
6194
6195        // Put an item
6196        let req = make_request(
6197            "PutItem",
6198            json!({
6199                "TableName": "test-table",
6200                "Item": {
6201                    "pk": { "S": "replicated-key" },
6202                    "data": { "S": "replicated-value" }
6203                }
6204            }),
6205        );
6206        svc.put_item(&req).unwrap();
6207
6208        // Verify the item is readable (since all replicas share the same table)
6209        let req = make_request(
6210            "GetItem",
6211            json!({
6212                "TableName": "test-table",
6213                "Key": { "pk": { "S": "replicated-key" } }
6214            }),
6215        );
6216        let resp = svc.get_item(&req).unwrap();
6217        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6218        assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
6219        assert_eq!(body["Item"]["data"]["S"], "replicated-value");
6220    }
6221
6222    #[test]
6223    fn contributor_insights_tracks_access() {
6224        let svc = make_service();
6225        create_test_table(&svc);
6226
6227        // Enable contributor insights
6228        let req = make_request(
6229            "UpdateContributorInsights",
6230            json!({
6231                "TableName": "test-table",
6232                "ContributorInsightsAction": "ENABLE"
6233            }),
6234        );
6235        svc.update_contributor_insights(&req).unwrap();
6236
6237        // Put items with different partition keys
6238        for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
6239            let req = make_request(
6240                "PutItem",
6241                json!({
6242                    "TableName": "test-table",
6243                    "Item": {
6244                        "pk": { "S": key },
6245                        "data": { "S": "value" }
6246                    }
6247                }),
6248            );
6249            svc.put_item(&req).unwrap();
6250        }
6251
6252        // Get items (to also track read access)
6253        for _ in 0..3 {
6254            let req = make_request(
6255                "GetItem",
6256                json!({
6257                    "TableName": "test-table",
6258                    "Key": { "pk": { "S": "alpha" } }
6259                }),
6260            );
6261            svc.get_item(&req).unwrap();
6262        }
6263
6264        // Describe contributor insights — should show top contributors
6265        let req = make_request(
6266            "DescribeContributorInsights",
6267            json!({ "TableName": "test-table" }),
6268        );
6269        let resp = svc.describe_contributor_insights(&req).unwrap();
6270        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6271        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6272
6273        let contributors = body["TopContributors"].as_array().unwrap();
6274        assert!(
6275            !contributors.is_empty(),
6276            "TopContributors should not be empty"
6277        );
6278
6279        // alpha was accessed 3 (put) + 3 (get) = 6 times, beta 2 times
6280        // alpha should be the top contributor
6281        let top = &contributors[0];
6282        assert!(top["Count"].as_u64().unwrap() > 0);
6283
6284        // Verify the rule list is populated
6285        let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
6286        assert!(!rules.is_empty());
6287    }
6288
6289    #[test]
6290    fn contributor_insights_not_tracked_when_disabled() {
6291        let svc = make_service();
6292        create_test_table(&svc);
6293
6294        // Put items without enabling insights
6295        let req = make_request(
6296            "PutItem",
6297            json!({
6298                "TableName": "test-table",
6299                "Item": {
6300                    "pk": { "S": "key1" },
6301                    "data": { "S": "value" }
6302                }
6303            }),
6304        );
6305        svc.put_item(&req).unwrap();
6306
6307        // Describe — should show empty contributors
6308        let req = make_request(
6309            "DescribeContributorInsights",
6310            json!({ "TableName": "test-table" }),
6311        );
6312        let resp = svc.describe_contributor_insights(&req).unwrap();
6313        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6314        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6315
6316        let contributors = body["TopContributors"].as_array().unwrap();
6317        assert!(contributors.is_empty());
6318    }
6319
6320    #[test]
6321    fn contributor_insights_disabled_table_no_counters_after_scan() {
6322        let svc = make_service();
6323        create_test_table(&svc);
6324
6325        // Put items
6326        for key in &["alpha", "beta"] {
6327            let req = make_request(
6328                "PutItem",
6329                json!({
6330                    "TableName": "test-table",
6331                    "Item": { "pk": { "S": key } }
6332                }),
6333            );
6334            svc.put_item(&req).unwrap();
6335        }
6336
6337        // Enable insights, then scan, then disable, then check counters are cleared
6338        let req = make_request(
6339            "UpdateContributorInsights",
6340            json!({
6341                "TableName": "test-table",
6342                "ContributorInsightsAction": "ENABLE"
6343            }),
6344        );
6345        svc.update_contributor_insights(&req).unwrap();
6346
6347        // Scan to trigger counter collection
6348        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6349        svc.scan(&req).unwrap();
6350
6351        // Verify counters were collected
6352        let req = make_request(
6353            "DescribeContributorInsights",
6354            json!({ "TableName": "test-table" }),
6355        );
6356        let resp = svc.describe_contributor_insights(&req).unwrap();
6357        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6358        let contributors = body["TopContributors"].as_array().unwrap();
6359        assert!(
6360            !contributors.is_empty(),
6361            "counters should be non-empty while enabled"
6362        );
6363
6364        // Disable insights (this clears counters)
6365        let req = make_request(
6366            "UpdateContributorInsights",
6367            json!({
6368                "TableName": "test-table",
6369                "ContributorInsightsAction": "DISABLE"
6370            }),
6371        );
6372        svc.update_contributor_insights(&req).unwrap();
6373
6374        // Scan again -- should NOT accumulate counters since insights is disabled
6375        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6376        svc.scan(&req).unwrap();
6377
6378        // Verify counters are still empty
6379        let req = make_request(
6380            "DescribeContributorInsights",
6381            json!({ "TableName": "test-table" }),
6382        );
6383        let resp = svc.describe_contributor_insights(&req).unwrap();
6384        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6385        let contributors = body["TopContributors"].as_array().unwrap();
6386        assert!(
6387            contributors.is_empty(),
6388            "counters should be empty after disabling insights"
6389        );
6390    }
6391
6392    #[test]
6393    fn scan_pagination_with_limit() {
6394        let svc = make_service();
6395        create_test_table(&svc);
6396
6397        // Insert 5 items
6398        for i in 0..5 {
6399            let req = make_request(
6400                "PutItem",
6401                json!({
6402                    "TableName": "test-table",
6403                    "Item": {
6404                        "pk": { "S": format!("item{i}") },
6405                        "data": { "S": format!("value{i}") }
6406                    }
6407                }),
6408            );
6409            svc.put_item(&req).unwrap();
6410        }
6411
6412        // Scan with limit=2
6413        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
6414        let resp = svc.scan(&req).unwrap();
6415        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6416        assert_eq!(body["Count"], 2);
6417        assert!(
6418            body["LastEvaluatedKey"].is_object(),
6419            "should have LastEvaluatedKey when limit < total items"
6420        );
6421        assert!(body["LastEvaluatedKey"]["pk"].is_object());
6422
6423        // Page through all items
6424        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6425        let mut lek = body["LastEvaluatedKey"].clone();
6426
6427        while lek.is_object() {
6428            let req = make_request(
6429                "Scan",
6430                json!({
6431                    "TableName": "test-table",
6432                    "Limit": 2,
6433                    "ExclusiveStartKey": lek
6434                }),
6435            );
6436            let resp = svc.scan(&req).unwrap();
6437            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6438            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6439            lek = body["LastEvaluatedKey"].clone();
6440        }
6441
6442        assert_eq!(
6443            all_items.len(),
6444            5,
6445            "should retrieve all 5 items via pagination"
6446        );
6447    }
6448
6449    #[test]
6450    fn scan_no_pagination_when_all_fit() {
6451        let svc = make_service();
6452        create_test_table(&svc);
6453
6454        for i in 0..3 {
6455            let req = make_request(
6456                "PutItem",
6457                json!({
6458                    "TableName": "test-table",
6459                    "Item": {
6460                        "pk": { "S": format!("item{i}") }
6461                    }
6462                }),
6463            );
6464            svc.put_item(&req).unwrap();
6465        }
6466
6467        // Scan with limit > item count
6468        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
6469        let resp = svc.scan(&req).unwrap();
6470        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6471        assert_eq!(body["Count"], 3);
6472        assert!(
6473            body["LastEvaluatedKey"].is_null(),
6474            "should not have LastEvaluatedKey when all items fit"
6475        );
6476
6477        // Scan without limit
6478        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6479        let resp = svc.scan(&req).unwrap();
6480        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6481        assert_eq!(body["Count"], 3);
6482        assert!(body["LastEvaluatedKey"].is_null());
6483    }
6484
6485    fn create_composite_table(svc: &DynamoDbService) {
6486        let req = make_request(
6487            "CreateTable",
6488            json!({
6489                "TableName": "composite-table",
6490                "KeySchema": [
6491                    { "AttributeName": "pk", "KeyType": "HASH" },
6492                    { "AttributeName": "sk", "KeyType": "RANGE" }
6493                ],
6494                "AttributeDefinitions": [
6495                    { "AttributeName": "pk", "AttributeType": "S" },
6496                    { "AttributeName": "sk", "AttributeType": "S" }
6497                ],
6498                "BillingMode": "PAY_PER_REQUEST"
6499            }),
6500        );
6501        svc.create_table(&req).unwrap();
6502    }
6503
6504    #[test]
6505    fn query_pagination_with_composite_key() {
6506        let svc = make_service();
6507        create_composite_table(&svc);
6508
6509        // Insert 5 items under the same partition key
6510        for i in 0..5 {
6511            let req = make_request(
6512                "PutItem",
6513                json!({
6514                    "TableName": "composite-table",
6515                    "Item": {
6516                        "pk": { "S": "user1" },
6517                        "sk": { "S": format!("item{i:03}") },
6518                        "data": { "S": format!("value{i}") }
6519                    }
6520                }),
6521            );
6522            svc.put_item(&req).unwrap();
6523        }
6524
6525        // Query with limit=2
6526        let req = make_request(
6527            "Query",
6528            json!({
6529                "TableName": "composite-table",
6530                "KeyConditionExpression": "pk = :pk",
6531                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6532                "Limit": 2
6533            }),
6534        );
6535        let resp = svc.query(&req).unwrap();
6536        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6537        assert_eq!(body["Count"], 2);
6538        assert!(body["LastEvaluatedKey"].is_object());
6539        assert!(body["LastEvaluatedKey"]["pk"].is_object());
6540        assert!(body["LastEvaluatedKey"]["sk"].is_object());
6541
6542        // Page through all items
6543        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6544        let mut lek = body["LastEvaluatedKey"].clone();
6545
6546        while lek.is_object() {
6547            let req = make_request(
6548                "Query",
6549                json!({
6550                    "TableName": "composite-table",
6551                    "KeyConditionExpression": "pk = :pk",
6552                    "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6553                    "Limit": 2,
6554                    "ExclusiveStartKey": lek
6555                }),
6556            );
6557            let resp = svc.query(&req).unwrap();
6558            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6559            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6560            lek = body["LastEvaluatedKey"].clone();
6561        }
6562
6563        assert_eq!(
6564            all_items.len(),
6565            5,
6566            "should retrieve all 5 items via pagination"
6567        );
6568
6569        // Verify items came back sorted by sort key
6570        let sks: Vec<String> = all_items
6571            .iter()
6572            .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
6573            .collect();
6574        let mut sorted = sks.clone();
6575        sorted.sort();
6576        assert_eq!(sks, sorted, "items should be sorted by sort key");
6577    }
6578
6579    #[test]
6580    fn query_no_pagination_when_all_fit() {
6581        let svc = make_service();
6582        create_composite_table(&svc);
6583
6584        for i in 0..2 {
6585            let req = make_request(
6586                "PutItem",
6587                json!({
6588                    "TableName": "composite-table",
6589                    "Item": {
6590                        "pk": { "S": "user1" },
6591                        "sk": { "S": format!("item{i}") }
6592                    }
6593                }),
6594            );
6595            svc.put_item(&req).unwrap();
6596        }
6597
6598        let req = make_request(
6599            "Query",
6600            json!({
6601                "TableName": "composite-table",
6602                "KeyConditionExpression": "pk = :pk",
6603                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6604                "Limit": 10
6605            }),
6606        );
6607        let resp = svc.query(&req).unwrap();
6608        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6609        assert_eq!(body["Count"], 2);
6610        assert!(
6611            body["LastEvaluatedKey"].is_null(),
6612            "should not have LastEvaluatedKey when all items fit"
6613        );
6614    }
6615
6616    fn create_gsi_table(svc: &DynamoDbService) {
6617        let req = make_request(
6618            "CreateTable",
6619            json!({
6620                "TableName": "gsi-table",
6621                "KeySchema": [
6622                    { "AttributeName": "pk", "KeyType": "HASH" }
6623                ],
6624                "AttributeDefinitions": [
6625                    { "AttributeName": "pk", "AttributeType": "S" },
6626                    { "AttributeName": "gsi_pk", "AttributeType": "S" },
6627                    { "AttributeName": "gsi_sk", "AttributeType": "S" }
6628                ],
6629                "BillingMode": "PAY_PER_REQUEST",
6630                "GlobalSecondaryIndexes": [
6631                    {
6632                        "IndexName": "gsi-index",
6633                        "KeySchema": [
6634                            { "AttributeName": "gsi_pk", "KeyType": "HASH" },
6635                            { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
6636                        ],
6637                        "Projection": { "ProjectionType": "ALL" }
6638                    }
6639                ]
6640            }),
6641        );
6642        svc.create_table(&req).unwrap();
6643    }
6644
6645    #[test]
6646    fn gsi_query_last_evaluated_key_includes_table_pk() {
6647        let svc = make_service();
6648        create_gsi_table(&svc);
6649
6650        // Insert 3 items with the SAME GSI key but different table PKs
6651        for i in 0..3 {
6652            let req = make_request(
6653                "PutItem",
6654                json!({
6655                    "TableName": "gsi-table",
6656                    "Item": {
6657                        "pk": { "S": format!("item{i}") },
6658                        "gsi_pk": { "S": "shared" },
6659                        "gsi_sk": { "S": "sort" }
6660                    }
6661                }),
6662            );
6663            svc.put_item(&req).unwrap();
6664        }
6665
6666        // Query GSI with Limit=1 to trigger pagination
6667        let req = make_request(
6668            "Query",
6669            json!({
6670                "TableName": "gsi-table",
6671                "IndexName": "gsi-index",
6672                "KeyConditionExpression": "gsi_pk = :v",
6673                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6674                "Limit": 1
6675            }),
6676        );
6677        let resp = svc.query(&req).unwrap();
6678        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6679        assert_eq!(body["Count"], 1);
6680        let lek = &body["LastEvaluatedKey"];
6681        assert!(lek.is_object(), "should have LastEvaluatedKey");
6682        // Must contain the index keys
6683        assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
6684        assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
6685        // Must also contain the table PK
6686        assert!(
6687            lek["pk"].is_object(),
6688            "LEK must contain table PK for GSI queries"
6689        );
6690    }
6691
6692    #[test]
6693    fn gsi_query_pagination_returns_all_items() {
6694        let svc = make_service();
6695        create_gsi_table(&svc);
6696
6697        // Insert 4 items with the SAME GSI key but different table PKs
6698        for i in 0..4 {
6699            let req = make_request(
6700                "PutItem",
6701                json!({
6702                    "TableName": "gsi-table",
6703                    "Item": {
6704                        "pk": { "S": format!("item{i:03}") },
6705                        "gsi_pk": { "S": "shared" },
6706                        "gsi_sk": { "S": "sort" }
6707                    }
6708                }),
6709            );
6710            svc.put_item(&req).unwrap();
6711        }
6712
6713        // Paginate through all items with Limit=2
6714        let mut all_pks = Vec::new();
6715        let mut lek: Option<Value> = None;
6716
6717        loop {
6718            let mut query = json!({
6719                "TableName": "gsi-table",
6720                "IndexName": "gsi-index",
6721                "KeyConditionExpression": "gsi_pk = :v",
6722                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6723                "Limit": 2
6724            });
6725            if let Some(ref start_key) = lek {
6726                query["ExclusiveStartKey"] = start_key.clone();
6727            }
6728
6729            let req = make_request("Query", query);
6730            let resp = svc.query(&req).unwrap();
6731            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6732
6733            for item in body["Items"].as_array().unwrap() {
6734                let pk = item["pk"]["S"].as_str().unwrap().to_string();
6735                all_pks.push(pk);
6736            }
6737
6738            if body["LastEvaluatedKey"].is_object() {
6739                lek = Some(body["LastEvaluatedKey"].clone());
6740            } else {
6741                break;
6742            }
6743        }
6744
6745        all_pks.sort();
6746        assert_eq!(
6747            all_pks,
6748            vec!["item000", "item001", "item002", "item003"],
6749            "pagination should return all items without duplicates"
6750        );
6751    }
6752}