Skip to main content

fakecloud_dynamodb/
service.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Value};
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_core::validation::*;
10
11use crate::state::{
12    attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
13    ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
14    KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
15    ReplicaDescription, SharedDynamoDbState,
16};
17
18pub struct DynamoDbService {
19    state: SharedDynamoDbState,
20}
21
22impl DynamoDbService {
23    pub fn new(state: SharedDynamoDbState) -> Self {
24        Self { state }
25    }
26
27    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
28        serde_json::from_slice(&req.body).map_err(|e| {
29            AwsServiceError::aws_error(
30                StatusCode::BAD_REQUEST,
31                "SerializationException",
32                format!("Invalid JSON: {e}"),
33            )
34        })
35    }
36
37    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
38        Ok(AwsResponse::json(
39            StatusCode::OK,
40            serde_json::to_vec(&body).unwrap(),
41        ))
42    }
43
44    // ── Table Operations ────────────────────────────────────────────────
45
46    fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
47        let body = Self::parse_body(req)?;
48
49        let table_name = body["TableName"]
50            .as_str()
51            .ok_or_else(|| {
52                AwsServiceError::aws_error(
53                    StatusCode::BAD_REQUEST,
54                    "ValidationException",
55                    "TableName is required",
56                )
57            })?
58            .to_string();
59
60        let key_schema = parse_key_schema(&body["KeySchema"])?;
61        let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
62
63        // Validate that key schema attributes are defined
64        for ks in &key_schema {
65            if !attribute_definitions
66                .iter()
67                .any(|ad| ad.attribute_name == ks.attribute_name)
68            {
69                return Err(AwsServiceError::aws_error(
70                    StatusCode::BAD_REQUEST,
71                    "ValidationException",
72                    format!(
73                        "One or more parameter values were invalid: \
74                         Some index key attributes are not defined in AttributeDefinitions. \
75                         Keys: [{}], AttributeDefinitions: [{}]",
76                        ks.attribute_name,
77                        attribute_definitions
78                            .iter()
79                            .map(|ad| ad.attribute_name.as_str())
80                            .collect::<Vec<_>>()
81                            .join(", ")
82                    ),
83                ));
84            }
85        }
86
87        let billing_mode = body["BillingMode"]
88            .as_str()
89            .unwrap_or("PROVISIONED")
90            .to_string();
91
92        let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
93            ProvisionedThroughput {
94                read_capacity_units: 0,
95                write_capacity_units: 0,
96            }
97        } else {
98            parse_provisioned_throughput(&body["ProvisionedThroughput"])?
99        };
100
101        let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
102        let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
103        let tags = parse_tags(&body["Tags"]);
104
105        let mut state = self.state.write();
106
107        if state.tables.contains_key(&table_name) {
108            return Err(AwsServiceError::aws_error(
109                StatusCode::BAD_REQUEST,
110                "ResourceInUseException",
111                format!("Table already exists: {table_name}"),
112            ));
113        }
114
115        let arn = format!(
116            "arn:aws:dynamodb:{}:{}:table/{}",
117            state.region, state.account_id, table_name
118        );
119        let now = Utc::now();
120
121        let table = DynamoTable {
122            name: table_name.clone(),
123            arn: arn.clone(),
124            key_schema: key_schema.clone(),
125            attribute_definitions: attribute_definitions.clone(),
126            provisioned_throughput: provisioned_throughput.clone(),
127            items: Vec::new(),
128            gsi: gsi.clone(),
129            lsi: lsi.clone(),
130            tags,
131            created_at: now,
132            status: "ACTIVE".to_string(),
133            item_count: 0,
134            size_bytes: 0,
135            billing_mode: billing_mode.clone(),
136            ttl_attribute: None,
137            ttl_enabled: false,
138            resource_policy: None,
139            pitr_enabled: false,
140            kinesis_destinations: Vec::new(),
141            contributor_insights_status: "DISABLED".to_string(),
142        };
143
144        state.tables.insert(table_name, table);
145
146        let table_desc = build_table_description_json(
147            &arn,
148            &key_schema,
149            &attribute_definitions,
150            &provisioned_throughput,
151            &gsi,
152            &lsi,
153            &billing_mode,
154            now,
155            0,
156            0,
157            "ACTIVE",
158        );
159
160        Self::ok_json(json!({ "TableDescription": table_desc }))
161    }
162
163    fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
164        let body = Self::parse_body(req)?;
165        let table_name = require_str(&body, "TableName")?;
166
167        let mut state = self.state.write();
168        let table = state.tables.remove(table_name).ok_or_else(|| {
169            AwsServiceError::aws_error(
170                StatusCode::BAD_REQUEST,
171                "ResourceNotFoundException",
172                format!("Requested resource not found: Table: {table_name} not found"),
173            )
174        })?;
175
176        let table_desc = build_table_description_json(
177            &table.arn,
178            &table.key_schema,
179            &table.attribute_definitions,
180            &table.provisioned_throughput,
181            &table.gsi,
182            &table.lsi,
183            &table.billing_mode,
184            table.created_at,
185            table.item_count,
186            table.size_bytes,
187            "DELETING",
188        );
189
190        Self::ok_json(json!({ "TableDescription": table_desc }))
191    }
192
193    fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
194        let body = Self::parse_body(req)?;
195        let table_name = require_str(&body, "TableName")?;
196
197        let state = self.state.read();
198        let table = get_table(&state.tables, table_name)?;
199
200        let table_desc = build_table_description_json(
201            &table.arn,
202            &table.key_schema,
203            &table.attribute_definitions,
204            &table.provisioned_throughput,
205            &table.gsi,
206            &table.lsi,
207            &table.billing_mode,
208            table.created_at,
209            table.item_count,
210            table.size_bytes,
211            &table.status,
212        );
213
214        Self::ok_json(json!({ "Table": table_desc }))
215    }
216
217    fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
218        let body = Self::parse_body(req)?;
219
220        validate_optional_string_length(
221            "exclusiveStartTableName",
222            body["ExclusiveStartTableName"].as_str(),
223            3,
224            255,
225        )?;
226        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
227
228        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
229        let exclusive_start = body["ExclusiveStartTableName"]
230            .as_str()
231            .map(|s| s.to_string());
232
233        let state = self.state.read();
234        let mut names: Vec<&String> = state.tables.keys().collect();
235        names.sort();
236
237        let start_idx = match &exclusive_start {
238            Some(start) => names
239                .iter()
240                .position(|n| n.as_str() > start.as_str())
241                .unwrap_or(names.len()),
242            None => 0,
243        };
244
245        let page: Vec<&str> = names
246            .iter()
247            .skip(start_idx)
248            .take(limit)
249            .map(|n| n.as_str())
250            .collect();
251
252        let mut result = json!({ "TableNames": page });
253
254        if start_idx + limit < names.len() {
255            if let Some(last) = page.last() {
256                result["LastEvaluatedTableName"] = json!(last);
257            }
258        }
259
260        Self::ok_json(result)
261    }
262
263    fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
264        let body = Self::parse_body(req)?;
265        let table_name = require_str(&body, "TableName")?;
266
267        let mut state = self.state.write();
268        let table = state.tables.get_mut(table_name).ok_or_else(|| {
269            AwsServiceError::aws_error(
270                StatusCode::BAD_REQUEST,
271                "ResourceNotFoundException",
272                format!("Requested resource not found: Table: {table_name} not found"),
273            )
274        })?;
275
276        if let Some(pt) = body.get("ProvisionedThroughput") {
277            if let Ok(throughput) = parse_provisioned_throughput(pt) {
278                table.provisioned_throughput = throughput;
279            }
280        }
281
282        if let Some(bm) = body["BillingMode"].as_str() {
283            table.billing_mode = bm.to_string();
284        }
285
286        let table_desc = build_table_description_json(
287            &table.arn,
288            &table.key_schema,
289            &table.attribute_definitions,
290            &table.provisioned_throughput,
291            &table.gsi,
292            &table.lsi,
293            &table.billing_mode,
294            table.created_at,
295            table.item_count,
296            table.size_bytes,
297            &table.status,
298        );
299
300        Self::ok_json(json!({ "TableDescription": table_desc }))
301    }
302
303    // ── Item Operations ─────────────────────────────────────────────────
304
305    fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
306        let body = Self::parse_body(req)?;
307        let table_name = require_str(&body, "TableName")?;
308        let item = require_object(&body, "Item")?;
309
310        let mut state = self.state.write();
311        let table = get_table_mut(&mut state.tables, table_name)?;
312
313        validate_key_in_item(table, &item)?;
314
315        let condition = body["ConditionExpression"].as_str();
316        let expr_attr_names = parse_expression_attribute_names(&body);
317        let expr_attr_values = parse_expression_attribute_values(&body);
318
319        let key = extract_key(table, &item);
320        let existing_idx = table.find_item_index(&key);
321
322        if let Some(cond) = condition {
323            let existing = existing_idx.map(|i| &table.items[i]);
324            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
325        }
326
327        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
328        let old_item = if return_values == "ALL_OLD" {
329            existing_idx.map(|i| table.items[i].clone())
330        } else {
331            None
332        };
333
334        if let Some(idx) = existing_idx {
335            table.items[idx] = item;
336        } else {
337            table.items.push(item);
338        }
339
340        table.recalculate_stats();
341
342        let mut result = json!({});
343        if let Some(old) = old_item {
344            result["Attributes"] = json!(old);
345        }
346
347        Self::ok_json(result)
348    }
349
350    fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
351        let body = Self::parse_body(req)?;
352        let table_name = require_str(&body, "TableName")?;
353        let key = require_object(&body, "Key")?;
354
355        let state = self.state.read();
356        let table = get_table(&state.tables, table_name)?;
357
358        let result = match table.find_item_index(&key) {
359            Some(idx) => {
360                let item = &table.items[idx];
361                let projected = project_item(item, &body);
362                json!({ "Item": projected })
363            }
364            None => json!({}),
365        };
366
367        Self::ok_json(result)
368    }
369
370    fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
371        let body = Self::parse_body(req)?;
372
373        validate_optional_enum_value(
374            "conditionalOperator",
375            &body["ConditionalOperator"],
376            &["AND", "OR"],
377        )?;
378        validate_optional_enum_value(
379            "returnConsumedCapacity",
380            &body["ReturnConsumedCapacity"],
381            &["INDEXES", "TOTAL", "NONE"],
382        )?;
383        validate_optional_enum_value(
384            "returnValues",
385            &body["ReturnValues"],
386            &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
387        )?;
388        validate_optional_enum_value(
389            "returnItemCollectionMetrics",
390            &body["ReturnItemCollectionMetrics"],
391            &["SIZE", "NONE"],
392        )?;
393        validate_optional_enum_value(
394            "returnValuesOnConditionCheckFailure",
395            &body["ReturnValuesOnConditionCheckFailure"],
396            &["ALL_OLD", "NONE"],
397        )?;
398
399        let table_name = require_str(&body, "TableName")?;
400        let key = require_object(&body, "Key")?;
401
402        let mut state = self.state.write();
403        let table = get_table_mut(&mut state.tables, table_name)?;
404
405        let condition = body["ConditionExpression"].as_str();
406        let expr_attr_names = parse_expression_attribute_names(&body);
407        let expr_attr_values = parse_expression_attribute_values(&body);
408
409        let existing_idx = table.find_item_index(&key);
410
411        if let Some(cond) = condition {
412            let existing = existing_idx.map(|i| &table.items[i]);
413            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
414        }
415
416        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
417
418        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
419        let return_icm = body["ReturnItemCollectionMetrics"]
420            .as_str()
421            .unwrap_or("NONE");
422
423        let mut result = json!({});
424
425        if let Some(idx) = existing_idx {
426            if return_values == "ALL_OLD" {
427                result["Attributes"] = json!(table.items[idx]);
428            }
429            table.items.remove(idx);
430            table.recalculate_stats();
431        }
432
433        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
434            result["ConsumedCapacity"] = json!({
435                "TableName": table_name,
436                "CapacityUnits": 1.0,
437            });
438        }
439
440        if return_icm == "SIZE" {
441            result["ItemCollectionMetrics"] = json!({});
442        }
443
444        Self::ok_json(result)
445    }
446
447    fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
448        let body = Self::parse_body(req)?;
449        let table_name = require_str(&body, "TableName")?;
450        let key = require_object(&body, "Key")?;
451
452        let mut state = self.state.write();
453        let table = get_table_mut(&mut state.tables, table_name)?;
454
455        validate_key_attributes_in_key(table, &key)?;
456
457        let condition = body["ConditionExpression"].as_str();
458        let expr_attr_names = parse_expression_attribute_names(&body);
459        let expr_attr_values = parse_expression_attribute_values(&body);
460        let update_expression = body["UpdateExpression"].as_str();
461
462        let existing_idx = table.find_item_index(&key);
463
464        if let Some(cond) = condition {
465            let existing = existing_idx.map(|i| &table.items[i]);
466            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
467        }
468
469        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
470
471        let idx = match existing_idx {
472            Some(i) => i,
473            None => {
474                let mut new_item = HashMap::new();
475                for (k, v) in &key {
476                    new_item.insert(k.clone(), v.clone());
477                }
478                table.items.push(new_item);
479                table.items.len() - 1
480            }
481        };
482
483        let old_item = if return_values == "ALL_OLD" {
484            Some(table.items[idx].clone())
485        } else {
486            None
487        };
488
489        if let Some(expr) = update_expression {
490            apply_update_expression(
491                &mut table.items[idx],
492                expr,
493                &expr_attr_names,
494                &expr_attr_values,
495            )?;
496        }
497
498        let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
499            Some(table.items[idx].clone())
500        } else {
501            None
502        };
503
504        table.recalculate_stats();
505
506        let mut result = json!({});
507        if let Some(old) = old_item {
508            result["Attributes"] = json!(old);
509        } else if let Some(new) = new_item {
510            result["Attributes"] = json!(new);
511        }
512
513        Self::ok_json(result)
514    }
515
516    // ── Query & Scan ────────────────────────────────────────────────────
517
518    fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
519        let body = Self::parse_body(req)?;
520        let table_name = require_str(&body, "TableName")?;
521
522        let state = self.state.read();
523        let table = get_table(&state.tables, table_name)?;
524
525        let expr_attr_names = parse_expression_attribute_names(&body);
526        let expr_attr_values = parse_expression_attribute_values(&body);
527
528        let key_condition = body["KeyConditionExpression"].as_str();
529        let filter_expression = body["FilterExpression"].as_str();
530        let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
531        let limit = body["Limit"].as_i64().map(|l| l as usize);
532        let index_name = body["IndexName"].as_str();
533
534        let (items_to_scan, hash_key_name, range_key_name): (
535            &[HashMap<String, AttributeValue>],
536            String,
537            Option<String>,
538        ) = if let Some(idx_name) = index_name {
539            if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
540                let hk = gsi
541                    .key_schema
542                    .iter()
543                    .find(|k| k.key_type == "HASH")
544                    .map(|k| k.attribute_name.clone())
545                    .unwrap_or_default();
546                let rk = gsi
547                    .key_schema
548                    .iter()
549                    .find(|k| k.key_type == "RANGE")
550                    .map(|k| k.attribute_name.clone());
551                (&table.items, hk, rk)
552            } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
553                let hk = lsi
554                    .key_schema
555                    .iter()
556                    .find(|k| k.key_type == "HASH")
557                    .map(|k| k.attribute_name.clone())
558                    .unwrap_or_default();
559                let rk = lsi
560                    .key_schema
561                    .iter()
562                    .find(|k| k.key_type == "RANGE")
563                    .map(|k| k.attribute_name.clone());
564                (&table.items, hk, rk)
565            } else {
566                return Err(AwsServiceError::aws_error(
567                    StatusCode::BAD_REQUEST,
568                    "ValidationException",
569                    format!("The table does not have the specified index: {idx_name}"),
570                ));
571            }
572        } else {
573            (
574                &table.items[..],
575                table.hash_key_name().to_string(),
576                table.range_key_name().map(|s| s.to_string()),
577            )
578        };
579
580        let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
581            .iter()
582            .filter(|item| {
583                if let Some(kc) = key_condition {
584                    evaluate_key_condition(
585                        kc,
586                        item,
587                        &hash_key_name,
588                        range_key_name.as_deref(),
589                        &expr_attr_names,
590                        &expr_attr_values,
591                    )
592                } else {
593                    true
594                }
595            })
596            .collect();
597
598        if let Some(ref rk) = range_key_name {
599            matched.sort_by(|a, b| {
600                let av = a.get(rk.as_str());
601                let bv = b.get(rk.as_str());
602                compare_attribute_values(av, bv)
603            });
604            if !scan_forward {
605                matched.reverse();
606            }
607        }
608
609        if let Some(filter) = filter_expression {
610            matched.retain(|item| {
611                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
612            });
613        }
614
615        let scanned_count = matched.len();
616
617        if let Some(lim) = limit {
618            matched.truncate(lim);
619        }
620
621        let items: Vec<Value> = matched
622            .iter()
623            .map(|item| {
624                let projected = project_item(item, &body);
625                json!(projected)
626            })
627            .collect();
628
629        Self::ok_json(json!({
630            "Items": items,
631            "Count": items.len(),
632            "ScannedCount": scanned_count,
633        }))
634    }
635
636    fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
637        let body = Self::parse_body(req)?;
638        let table_name = require_str(&body, "TableName")?;
639
640        let state = self.state.read();
641        let table = get_table(&state.tables, table_name)?;
642
643        let expr_attr_names = parse_expression_attribute_names(&body);
644        let expr_attr_values = parse_expression_attribute_values(&body);
645        let filter_expression = body["FilterExpression"].as_str();
646        let limit = body["Limit"].as_i64().map(|l| l as usize);
647
648        let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
649        let scanned_count = matched.len();
650
651        if let Some(filter) = filter_expression {
652            matched.retain(|item| {
653                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
654            });
655        }
656
657        if let Some(lim) = limit {
658            matched.truncate(lim);
659        }
660
661        let items: Vec<Value> = matched
662            .iter()
663            .map(|item| {
664                let projected = project_item(item, &body);
665                json!(projected)
666            })
667            .collect();
668
669        Self::ok_json(json!({
670            "Items": items,
671            "Count": items.len(),
672            "ScannedCount": scanned_count,
673        }))
674    }
675
676    // ── Batch Operations ────────────────────────────────────────────────
677
678    fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
679        let body = Self::parse_body(req)?;
680
681        validate_optional_enum_value(
682            "returnConsumedCapacity",
683            &body["ReturnConsumedCapacity"],
684            &["INDEXES", "TOTAL", "NONE"],
685        )?;
686
687        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
688
689        let request_items = body["RequestItems"]
690            .as_object()
691            .ok_or_else(|| {
692                AwsServiceError::aws_error(
693                    StatusCode::BAD_REQUEST,
694                    "ValidationException",
695                    "RequestItems is required",
696                )
697            })?
698            .clone();
699
700        let state = self.state.read();
701        let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
702        let mut consumed_capacity: Vec<Value> = Vec::new();
703
704        for (table_name, params) in &request_items {
705            let table = get_table(&state.tables, table_name)?;
706            let keys = params["Keys"].as_array().ok_or_else(|| {
707                AwsServiceError::aws_error(
708                    StatusCode::BAD_REQUEST,
709                    "ValidationException",
710                    "Keys is required",
711                )
712            })?;
713
714            let mut items = Vec::new();
715            for key_val in keys {
716                let key: HashMap<String, AttributeValue> =
717                    serde_json::from_value(key_val.clone()).unwrap_or_default();
718                if let Some(idx) = table.find_item_index(&key) {
719                    items.push(json!(table.items[idx]));
720                }
721            }
722            responses.insert(table_name.clone(), items);
723
724            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
725                consumed_capacity.push(json!({
726                    "TableName": table_name,
727                    "CapacityUnits": 1.0,
728                }));
729            }
730        }
731
732        let mut result = json!({
733            "Responses": responses,
734            "UnprocessedKeys": {},
735        });
736
737        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
738            result["ConsumedCapacity"] = json!(consumed_capacity);
739        }
740
741        Self::ok_json(result)
742    }
743
744    fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
745        let body = Self::parse_body(req)?;
746
747        validate_optional_enum_value(
748            "returnConsumedCapacity",
749            &body["ReturnConsumedCapacity"],
750            &["INDEXES", "TOTAL", "NONE"],
751        )?;
752        validate_optional_enum_value(
753            "returnItemCollectionMetrics",
754            &body["ReturnItemCollectionMetrics"],
755            &["SIZE", "NONE"],
756        )?;
757
758        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
759        let return_icm = body["ReturnItemCollectionMetrics"]
760            .as_str()
761            .unwrap_or("NONE");
762
763        let request_items = body["RequestItems"]
764            .as_object()
765            .ok_or_else(|| {
766                AwsServiceError::aws_error(
767                    StatusCode::BAD_REQUEST,
768                    "ValidationException",
769                    "RequestItems is required",
770                )
771            })?
772            .clone();
773
774        let mut state = self.state.write();
775        let mut consumed_capacity: Vec<Value> = Vec::new();
776        let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
777
778        for (table_name, requests) in &request_items {
779            let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
780                AwsServiceError::aws_error(
781                    StatusCode::BAD_REQUEST,
782                    "ResourceNotFoundException",
783                    format!("Requested resource not found: Table: {table_name} not found"),
784                )
785            })?;
786
787            let reqs = requests.as_array().ok_or_else(|| {
788                AwsServiceError::aws_error(
789                    StatusCode::BAD_REQUEST,
790                    "ValidationException",
791                    "Request list must be an array",
792                )
793            })?;
794
795            for request in reqs {
796                if let Some(put_req) = request.get("PutRequest") {
797                    let item: HashMap<String, AttributeValue> =
798                        serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
799                    let key = extract_key(table, &item);
800                    if let Some(idx) = table.find_item_index(&key) {
801                        table.items[idx] = item;
802                    } else {
803                        table.items.push(item);
804                    }
805                } else if let Some(del_req) = request.get("DeleteRequest") {
806                    let key: HashMap<String, AttributeValue> =
807                        serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
808                    if let Some(idx) = table.find_item_index(&key) {
809                        table.items.remove(idx);
810                    }
811                }
812            }
813
814            table.recalculate_stats();
815
816            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
817                consumed_capacity.push(json!({
818                    "TableName": table_name,
819                    "CapacityUnits": 1.0,
820                }));
821            }
822
823            if return_icm == "SIZE" {
824                item_collection_metrics.insert(table_name.clone(), vec![]);
825            }
826        }
827
828        let mut result = json!({
829            "UnprocessedItems": {},
830        });
831
832        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
833            result["ConsumedCapacity"] = json!(consumed_capacity);
834        }
835
836        if return_icm == "SIZE" {
837            result["ItemCollectionMetrics"] = json!(item_collection_metrics);
838        }
839
840        Self::ok_json(result)
841    }
842
843    // ── Tags ────────────────────────────────────────────────────────────
844
845    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
846        let body = Self::parse_body(req)?;
847        let resource_arn = require_str(&body, "ResourceArn")?;
848        let tags_arr = body["Tags"].as_array().ok_or_else(|| {
849            AwsServiceError::aws_error(
850                StatusCode::BAD_REQUEST,
851                "ValidationException",
852                "Tags is required",
853            )
854        })?;
855
856        let mut state = self.state.write();
857        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
858
859        for tag in tags_arr {
860            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
861                table.tags.insert(k.to_string(), v.to_string());
862            }
863        }
864
865        Self::ok_json(json!({}))
866    }
867
868    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
869        let body = Self::parse_body(req)?;
870        let resource_arn = require_str(&body, "ResourceArn")?;
871        let tag_keys = body["TagKeys"].as_array().ok_or_else(|| {
872            AwsServiceError::aws_error(
873                StatusCode::BAD_REQUEST,
874                "ValidationException",
875                "TagKeys is required",
876            )
877        })?;
878
879        let mut state = self.state.write();
880        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
881
882        for key in tag_keys {
883            if let Some(k) = key.as_str() {
884                table.tags.remove(k);
885            }
886        }
887
888        Self::ok_json(json!({}))
889    }
890
891    fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
892        let body = Self::parse_body(req)?;
893        let resource_arn = require_str(&body, "ResourceArn")?;
894
895        let state = self.state.read();
896        let table = find_table_by_arn(&state.tables, resource_arn)?;
897
898        let tags: Vec<Value> = table
899            .tags
900            .iter()
901            .map(|(k, v)| json!({"Key": k, "Value": v}))
902            .collect();
903
904        Self::ok_json(json!({ "Tags": tags }))
905    }
906
907    // ── Transactions ────────────────────────────────────────────────────
908
909    fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
910        let body = Self::parse_body(req)?;
911        validate_optional_enum_value(
912            "returnConsumedCapacity",
913            &body["ReturnConsumedCapacity"],
914            &["INDEXES", "TOTAL", "NONE"],
915        )?;
916        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
917            AwsServiceError::aws_error(
918                StatusCode::BAD_REQUEST,
919                "ValidationException",
920                "TransactItems is required",
921            )
922        })?;
923
924        let state = self.state.read();
925        let mut responses: Vec<Value> = Vec::new();
926
927        for ti in transact_items {
928            let get = &ti["Get"];
929            let table_name = get["TableName"].as_str().ok_or_else(|| {
930                AwsServiceError::aws_error(
931                    StatusCode::BAD_REQUEST,
932                    "ValidationException",
933                    "TableName is required in Get",
934                )
935            })?;
936            let key: HashMap<String, AttributeValue> =
937                serde_json::from_value(get["Key"].clone()).unwrap_or_default();
938
939            let table = get_table(&state.tables, table_name)?;
940            match table.find_item_index(&key) {
941                Some(idx) => {
942                    responses.push(json!({ "Item": table.items[idx] }));
943                }
944                None => {
945                    responses.push(json!({}));
946                }
947            }
948        }
949
950        Self::ok_json(json!({ "Responses": responses }))
951    }
952
953    fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
954        let body = Self::parse_body(req)?;
955        validate_optional_string_length(
956            "clientRequestToken",
957            body["ClientRequestToken"].as_str(),
958            1,
959            36,
960        )?;
961        validate_optional_enum_value(
962            "returnConsumedCapacity",
963            &body["ReturnConsumedCapacity"],
964            &["INDEXES", "TOTAL", "NONE"],
965        )?;
966        validate_optional_enum_value(
967            "returnItemCollectionMetrics",
968            &body["ReturnItemCollectionMetrics"],
969            &["SIZE", "NONE"],
970        )?;
971        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
972            AwsServiceError::aws_error(
973                StatusCode::BAD_REQUEST,
974                "ValidationException",
975                "TransactItems is required",
976            )
977        })?;
978
979        let mut state = self.state.write();
980
981        // First pass: validate all conditions
982        let mut cancellation_reasons: Vec<Value> = Vec::new();
983        let mut any_failed = false;
984
985        for ti in transact_items {
986            if let Some(put) = ti.get("Put") {
987                let table_name = put["TableName"].as_str().unwrap_or_default();
988                let item: HashMap<String, AttributeValue> =
989                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
990                let condition = put["ConditionExpression"].as_str();
991
992                if let Some(cond) = condition {
993                    let table = get_table(&state.tables, table_name)?;
994                    let expr_attr_names = parse_expression_attribute_names(put);
995                    let expr_attr_values = parse_expression_attribute_values(put);
996                    let key = extract_key(table, &item);
997                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
998                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
999                        .is_err()
1000                    {
1001                        cancellation_reasons.push(json!({
1002                            "Code": "ConditionalCheckFailed",
1003                            "Message": "The conditional request failed"
1004                        }));
1005                        any_failed = true;
1006                        continue;
1007                    }
1008                }
1009                cancellation_reasons.push(json!({ "Code": "None" }));
1010            } else if let Some(delete) = ti.get("Delete") {
1011                let table_name = delete["TableName"].as_str().unwrap_or_default();
1012                let key: HashMap<String, AttributeValue> =
1013                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1014                let condition = delete["ConditionExpression"].as_str();
1015
1016                if let Some(cond) = condition {
1017                    let table = get_table(&state.tables, table_name)?;
1018                    let expr_attr_names = parse_expression_attribute_names(delete);
1019                    let expr_attr_values = parse_expression_attribute_values(delete);
1020                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1021                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1022                        .is_err()
1023                    {
1024                        cancellation_reasons.push(json!({
1025                            "Code": "ConditionalCheckFailed",
1026                            "Message": "The conditional request failed"
1027                        }));
1028                        any_failed = true;
1029                        continue;
1030                    }
1031                }
1032                cancellation_reasons.push(json!({ "Code": "None" }));
1033            } else if let Some(update) = ti.get("Update") {
1034                let table_name = update["TableName"].as_str().unwrap_or_default();
1035                let key: HashMap<String, AttributeValue> =
1036                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1037                let condition = update["ConditionExpression"].as_str();
1038
1039                if let Some(cond) = condition {
1040                    let table = get_table(&state.tables, table_name)?;
1041                    let expr_attr_names = parse_expression_attribute_names(update);
1042                    let expr_attr_values = parse_expression_attribute_values(update);
1043                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1044                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1045                        .is_err()
1046                    {
1047                        cancellation_reasons.push(json!({
1048                            "Code": "ConditionalCheckFailed",
1049                            "Message": "The conditional request failed"
1050                        }));
1051                        any_failed = true;
1052                        continue;
1053                    }
1054                }
1055                cancellation_reasons.push(json!({ "Code": "None" }));
1056            } else if let Some(check) = ti.get("ConditionCheck") {
1057                let table_name = check["TableName"].as_str().unwrap_or_default();
1058                let key: HashMap<String, AttributeValue> =
1059                    serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1060                let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1061
1062                let table = get_table(&state.tables, table_name)?;
1063                let expr_attr_names = parse_expression_attribute_names(check);
1064                let expr_attr_values = parse_expression_attribute_values(check);
1065                let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1066                if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1067                {
1068                    cancellation_reasons.push(json!({
1069                        "Code": "ConditionalCheckFailed",
1070                        "Message": "The conditional request failed"
1071                    }));
1072                    any_failed = true;
1073                    continue;
1074                }
1075                cancellation_reasons.push(json!({ "Code": "None" }));
1076            } else {
1077                cancellation_reasons.push(json!({ "Code": "None" }));
1078            }
1079        }
1080
1081        if any_failed {
1082            let error_body = json!({
1083                "__type": "TransactionCanceledException",
1084                "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1085                "CancellationReasons": cancellation_reasons
1086            });
1087            return Ok(AwsResponse::json(
1088                StatusCode::BAD_REQUEST,
1089                serde_json::to_vec(&error_body).unwrap(),
1090            ));
1091        }
1092
1093        // Second pass: apply all writes
1094        for ti in transact_items {
1095            if let Some(put) = ti.get("Put") {
1096                let table_name = put["TableName"].as_str().unwrap_or_default();
1097                let item: HashMap<String, AttributeValue> =
1098                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1099                let table = get_table_mut(&mut state.tables, table_name)?;
1100                let key = extract_key(table, &item);
1101                if let Some(idx) = table.find_item_index(&key) {
1102                    table.items[idx] = item;
1103                } else {
1104                    table.items.push(item);
1105                }
1106                table.recalculate_stats();
1107            } else if let Some(delete) = ti.get("Delete") {
1108                let table_name = delete["TableName"].as_str().unwrap_or_default();
1109                let key: HashMap<String, AttributeValue> =
1110                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1111                let table = get_table_mut(&mut state.tables, table_name)?;
1112                if let Some(idx) = table.find_item_index(&key) {
1113                    table.items.remove(idx);
1114                }
1115                table.recalculate_stats();
1116            } else if let Some(update) = ti.get("Update") {
1117                let table_name = update["TableName"].as_str().unwrap_or_default();
1118                let key: HashMap<String, AttributeValue> =
1119                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1120                let update_expression = update["UpdateExpression"].as_str();
1121                let expr_attr_names = parse_expression_attribute_names(update);
1122                let expr_attr_values = parse_expression_attribute_values(update);
1123
1124                let table = get_table_mut(&mut state.tables, table_name)?;
1125                let idx = match table.find_item_index(&key) {
1126                    Some(i) => i,
1127                    None => {
1128                        let mut new_item = HashMap::new();
1129                        for (k, v) in &key {
1130                            new_item.insert(k.clone(), v.clone());
1131                        }
1132                        table.items.push(new_item);
1133                        table.items.len() - 1
1134                    }
1135                };
1136
1137                if let Some(expr) = update_expression {
1138                    apply_update_expression(
1139                        &mut table.items[idx],
1140                        expr,
1141                        &expr_attr_names,
1142                        &expr_attr_values,
1143                    )?;
1144                }
1145                table.recalculate_stats();
1146            }
1147            // ConditionCheck: no write needed
1148        }
1149
1150        Self::ok_json(json!({}))
1151    }
1152
1153    fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1154        let body = Self::parse_body(req)?;
1155        let statement = require_str(&body, "Statement")?;
1156        let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1157
1158        execute_partiql_statement(&self.state, statement, &parameters)
1159    }
1160
1161    fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1162        let body = Self::parse_body(req)?;
1163        validate_optional_enum_value(
1164            "returnConsumedCapacity",
1165            &body["ReturnConsumedCapacity"],
1166            &["INDEXES", "TOTAL", "NONE"],
1167        )?;
1168        let statements = body["Statements"].as_array().ok_or_else(|| {
1169            AwsServiceError::aws_error(
1170                StatusCode::BAD_REQUEST,
1171                "ValidationException",
1172                "Statements is required",
1173            )
1174        })?;
1175
1176        let mut responses: Vec<Value> = Vec::new();
1177        for stmt_obj in statements {
1178            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1179            let parameters = stmt_obj["Parameters"]
1180                .as_array()
1181                .cloned()
1182                .unwrap_or_default();
1183
1184            match execute_partiql_statement(&self.state, statement, &parameters) {
1185                Ok(resp) => {
1186                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1187                    responses.push(resp_body);
1188                }
1189                Err(e) => {
1190                    responses.push(json!({
1191                        "Error": {
1192                            "Code": "ValidationException",
1193                            "Message": e.to_string()
1194                        }
1195                    }));
1196                }
1197            }
1198        }
1199
1200        Self::ok_json(json!({ "Responses": responses }))
1201    }
1202
1203    fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1204        let body = Self::parse_body(req)?;
1205        validate_optional_string_length(
1206            "clientRequestToken",
1207            body["ClientRequestToken"].as_str(),
1208            1,
1209            36,
1210        )?;
1211        validate_optional_enum_value(
1212            "returnConsumedCapacity",
1213            &body["ReturnConsumedCapacity"],
1214            &["INDEXES", "TOTAL", "NONE"],
1215        )?;
1216        let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1217            AwsServiceError::aws_error(
1218                StatusCode::BAD_REQUEST,
1219                "ValidationException",
1220                "TransactStatements is required",
1221            )
1222        })?;
1223
1224        // Collect all results; if any fail, return TransactionCanceledException
1225        let mut results: Vec<Result<Value, String>> = Vec::new();
1226        for stmt_obj in transact_statements {
1227            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1228            let parameters = stmt_obj["Parameters"]
1229                .as_array()
1230                .cloned()
1231                .unwrap_or_default();
1232
1233            match execute_partiql_statement(&self.state, statement, &parameters) {
1234                Ok(resp) => {
1235                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1236                    results.push(Ok(resp_body));
1237                }
1238                Err(e) => {
1239                    results.push(Err(e.to_string()));
1240                }
1241            }
1242        }
1243
1244        let any_failed = results.iter().any(|r| r.is_err());
1245        if any_failed {
1246            let reasons: Vec<Value> = results
1247                .iter()
1248                .map(|r| match r {
1249                    Ok(_) => json!({ "Code": "None" }),
1250                    Err(msg) => json!({
1251                        "Code": "ValidationException",
1252                        "Message": msg
1253                    }),
1254                })
1255                .collect();
1256            let error_body = json!({
1257                "__type": "TransactionCanceledException",
1258                "message": "Transaction cancelled due to validation errors",
1259                "CancellationReasons": reasons
1260            });
1261            return Ok(AwsResponse::json(
1262                StatusCode::BAD_REQUEST,
1263                serde_json::to_vec(&error_body).unwrap(),
1264            ));
1265        }
1266
1267        let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1268        Self::ok_json(json!({ "Responses": responses }))
1269    }
1270
1271    // ── TTL ─────────────────────────────────────────────────────────────
1272
1273    fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1274        let body = Self::parse_body(req)?;
1275        let table_name = require_str(&body, "TableName")?;
1276        let spec = &body["TimeToLiveSpecification"];
1277        let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1278            AwsServiceError::aws_error(
1279                StatusCode::BAD_REQUEST,
1280                "ValidationException",
1281                "TimeToLiveSpecification.AttributeName is required",
1282            )
1283        })?;
1284        let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1285            AwsServiceError::aws_error(
1286                StatusCode::BAD_REQUEST,
1287                "ValidationException",
1288                "TimeToLiveSpecification.Enabled is required",
1289            )
1290        })?;
1291
1292        let mut state = self.state.write();
1293        let table = get_table_mut(&mut state.tables, table_name)?;
1294
1295        if enabled {
1296            table.ttl_attribute = Some(attr_name.to_string());
1297            table.ttl_enabled = true;
1298        } else {
1299            table.ttl_enabled = false;
1300        }
1301
1302        Self::ok_json(json!({
1303            "TimeToLiveSpecification": {
1304                "AttributeName": attr_name,
1305                "Enabled": enabled
1306            }
1307        }))
1308    }
1309
1310    fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1311        let body = Self::parse_body(req)?;
1312        let table_name = require_str(&body, "TableName")?;
1313
1314        let state = self.state.read();
1315        let table = get_table(&state.tables, table_name)?;
1316
1317        let status = if table.ttl_enabled {
1318            "ENABLED"
1319        } else {
1320            "DISABLED"
1321        };
1322
1323        let mut desc = json!({
1324            "TimeToLiveDescription": {
1325                "TimeToLiveStatus": status
1326            }
1327        });
1328
1329        if let Some(ref attr) = table.ttl_attribute {
1330            desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1331        }
1332
1333        Self::ok_json(desc)
1334    }
1335
1336    // ── Resource Policies ───────────────────────────────────────────────
1337
1338    fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1339        let body = Self::parse_body(req)?;
1340        let resource_arn = require_str(&body, "ResourceArn")?;
1341        let policy = require_str(&body, "Policy")?;
1342
1343        let mut state = self.state.write();
1344        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1345        table.resource_policy = Some(policy.to_string());
1346
1347        let revision_id = uuid::Uuid::new_v4().to_string();
1348        Self::ok_json(json!({ "RevisionId": revision_id }))
1349    }
1350
1351    fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1352        let body = Self::parse_body(req)?;
1353        let resource_arn = require_str(&body, "ResourceArn")?;
1354
1355        let state = self.state.read();
1356        let table = find_table_by_arn(&state.tables, resource_arn)?;
1357
1358        match &table.resource_policy {
1359            Some(policy) => {
1360                let revision_id = uuid::Uuid::new_v4().to_string();
1361                Self::ok_json(json!({
1362                    "Policy": policy,
1363                    "RevisionId": revision_id
1364                }))
1365            }
1366            None => Self::ok_json(json!({ "Policy": null })),
1367        }
1368    }
1369
1370    fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1371        let body = Self::parse_body(req)?;
1372        let resource_arn = require_str(&body, "ResourceArn")?;
1373
1374        let mut state = self.state.write();
1375        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1376        table.resource_policy = None;
1377
1378        Self::ok_json(json!({}))
1379    }
1380
1381    // ── Stubs ──────────────────────────────────────────────────────────
1382
1383    fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1384        Self::ok_json(json!({
1385            "Endpoints": [{
1386                "Address": "dynamodb.us-east-1.amazonaws.com",
1387                "CachePeriodInMinutes": 1440
1388            }]
1389        }))
1390    }
1391
1392    fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1393        Self::ok_json(json!({
1394            "AccountMaxReadCapacityUnits": 80000,
1395            "AccountMaxWriteCapacityUnits": 80000,
1396            "TableMaxReadCapacityUnits": 40000,
1397            "TableMaxWriteCapacityUnits": 40000
1398        }))
1399    }
1400
1401    // ── Backups ────────────────────────────────────────────────────────
1402
1403    fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1404        let body = Self::parse_body(req)?;
1405        let table_name = require_str(&body, "TableName")?;
1406        let backup_name = require_str(&body, "BackupName")?;
1407
1408        let mut state = self.state.write();
1409        let table = get_table(&state.tables, table_name)?;
1410
1411        let backup_arn = format!(
1412            "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1413            state.region,
1414            state.account_id,
1415            table_name,
1416            Utc::now().format("%Y%m%d%H%M%S")
1417        );
1418        let now = Utc::now();
1419
1420        let backup = BackupDescription {
1421            backup_arn: backup_arn.clone(),
1422            backup_name: backup_name.to_string(),
1423            table_name: table_name.to_string(),
1424            table_arn: table.arn.clone(),
1425            backup_status: "AVAILABLE".to_string(),
1426            backup_type: "USER".to_string(),
1427            backup_creation_date: now,
1428            key_schema: table.key_schema.clone(),
1429            attribute_definitions: table.attribute_definitions.clone(),
1430            provisioned_throughput: table.provisioned_throughput.clone(),
1431            billing_mode: table.billing_mode.clone(),
1432            item_count: table.item_count,
1433            size_bytes: table.size_bytes,
1434        };
1435
1436        state.backups.insert(backup_arn.clone(), backup);
1437
1438        Self::ok_json(json!({
1439            "BackupDetails": {
1440                "BackupArn": backup_arn,
1441                "BackupName": backup_name,
1442                "BackupStatus": "AVAILABLE",
1443                "BackupType": "USER",
1444                "BackupCreationDateTime": now.timestamp() as f64,
1445                "BackupSizeBytes": 0
1446            }
1447        }))
1448    }
1449
1450    fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1451        let body = Self::parse_body(req)?;
1452        let backup_arn = require_str(&body, "BackupArn")?;
1453
1454        let mut state = self.state.write();
1455        let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1456            AwsServiceError::aws_error(
1457                StatusCode::BAD_REQUEST,
1458                "BackupNotFoundException",
1459                format!("Backup not found: {backup_arn}"),
1460            )
1461        })?;
1462
1463        Self::ok_json(json!({
1464            "BackupDescription": {
1465                "BackupDetails": {
1466                    "BackupArn": backup.backup_arn,
1467                    "BackupName": backup.backup_name,
1468                    "BackupStatus": "DELETED",
1469                    "BackupType": backup.backup_type,
1470                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1471                    "BackupSizeBytes": backup.size_bytes
1472                },
1473                "SourceTableDetails": {
1474                    "TableName": backup.table_name,
1475                    "TableArn": backup.table_arn,
1476                    "TableId": uuid::Uuid::new_v4().to_string(),
1477                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1478                        "AttributeName": ks.attribute_name,
1479                        "KeyType": ks.key_type
1480                    })).collect::<Vec<_>>(),
1481                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1482                    "ProvisionedThroughput": {
1483                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1484                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1485                    },
1486                    "ItemCount": backup.item_count,
1487                    "BillingMode": backup.billing_mode,
1488                    "TableSizeBytes": backup.size_bytes
1489                },
1490                "SourceTableFeatureDetails": {}
1491            }
1492        }))
1493    }
1494
1495    fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1496        let body = Self::parse_body(req)?;
1497        let backup_arn = require_str(&body, "BackupArn")?;
1498
1499        let state = self.state.read();
1500        let backup = state.backups.get(backup_arn).ok_or_else(|| {
1501            AwsServiceError::aws_error(
1502                StatusCode::BAD_REQUEST,
1503                "BackupNotFoundException",
1504                format!("Backup not found: {backup_arn}"),
1505            )
1506        })?;
1507
1508        Self::ok_json(json!({
1509            "BackupDescription": {
1510                "BackupDetails": {
1511                    "BackupArn": backup.backup_arn,
1512                    "BackupName": backup.backup_name,
1513                    "BackupStatus": backup.backup_status,
1514                    "BackupType": backup.backup_type,
1515                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1516                    "BackupSizeBytes": backup.size_bytes
1517                },
1518                "SourceTableDetails": {
1519                    "TableName": backup.table_name,
1520                    "TableArn": backup.table_arn,
1521                    "TableId": uuid::Uuid::new_v4().to_string(),
1522                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1523                        "AttributeName": ks.attribute_name,
1524                        "KeyType": ks.key_type
1525                    })).collect::<Vec<_>>(),
1526                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1527                    "ProvisionedThroughput": {
1528                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1529                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1530                    },
1531                    "ItemCount": backup.item_count,
1532                    "BillingMode": backup.billing_mode,
1533                    "TableSizeBytes": backup.size_bytes
1534                },
1535                "SourceTableFeatureDetails": {}
1536            }
1537        }))
1538    }
1539
1540    fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1541        let body = Self::parse_body(req)?;
1542        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
1543        validate_optional_string_length(
1544            "exclusiveStartBackupArn",
1545            body["ExclusiveStartBackupArn"].as_str(),
1546            37,
1547            1024,
1548        )?;
1549        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1550        validate_optional_enum_value(
1551            "backupType",
1552            &body["BackupType"],
1553            &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
1554        )?;
1555        let table_name = body["TableName"].as_str();
1556
1557        let state = self.state.read();
1558        let summaries: Vec<Value> = state
1559            .backups
1560            .values()
1561            .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
1562            .map(|b| {
1563                json!({
1564                    "TableName": b.table_name,
1565                    "TableArn": b.table_arn,
1566                    "BackupArn": b.backup_arn,
1567                    "BackupName": b.backup_name,
1568                    "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
1569                    "BackupStatus": b.backup_status,
1570                    "BackupType": b.backup_type,
1571                    "BackupSizeBytes": b.size_bytes
1572                })
1573            })
1574            .collect();
1575
1576        Self::ok_json(json!({
1577            "BackupSummaries": summaries
1578        }))
1579    }
1580
1581    fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1582        let body = Self::parse_body(req)?;
1583        let backup_arn = require_str(&body, "BackupArn")?;
1584        let target_table_name = require_str(&body, "TargetTableName")?;
1585
1586        let mut state = self.state.write();
1587        let backup = state.backups.get(backup_arn).ok_or_else(|| {
1588            AwsServiceError::aws_error(
1589                StatusCode::BAD_REQUEST,
1590                "BackupNotFoundException",
1591                format!("Backup not found: {backup_arn}"),
1592            )
1593        })?;
1594
1595        if state.tables.contains_key(target_table_name) {
1596            return Err(AwsServiceError::aws_error(
1597                StatusCode::BAD_REQUEST,
1598                "TableAlreadyExistsException",
1599                format!("Table already exists: {target_table_name}"),
1600            ));
1601        }
1602
1603        let now = Utc::now();
1604        let arn = format!(
1605            "arn:aws:dynamodb:{}:{}:table/{}",
1606            state.region, state.account_id, target_table_name
1607        );
1608
1609        let table = DynamoTable {
1610            name: target_table_name.to_string(),
1611            arn: arn.clone(),
1612            key_schema: backup.key_schema.clone(),
1613            attribute_definitions: backup.attribute_definitions.clone(),
1614            provisioned_throughput: backup.provisioned_throughput.clone(),
1615            items: Vec::new(),
1616            gsi: Vec::new(),
1617            lsi: Vec::new(),
1618            tags: HashMap::new(),
1619            created_at: now,
1620            status: "ACTIVE".to_string(),
1621            item_count: 0,
1622            size_bytes: 0,
1623            billing_mode: backup.billing_mode.clone(),
1624            ttl_attribute: None,
1625            ttl_enabled: false,
1626            resource_policy: None,
1627            pitr_enabled: false,
1628            kinesis_destinations: Vec::new(),
1629            contributor_insights_status: "DISABLED".to_string(),
1630        };
1631
1632        let desc = build_table_description(&table);
1633        state.tables.insert(target_table_name.to_string(), table);
1634
1635        Self::ok_json(json!({
1636            "TableDescription": desc
1637        }))
1638    }
1639
1640    fn restore_table_to_point_in_time(
1641        &self,
1642        req: &AwsRequest,
1643    ) -> Result<AwsResponse, AwsServiceError> {
1644        let body = Self::parse_body(req)?;
1645        let target_table_name = require_str(&body, "TargetTableName")?;
1646        let source_table_name = body["SourceTableName"].as_str();
1647        let source_table_arn = body["SourceTableArn"].as_str();
1648
1649        let mut state = self.state.write();
1650
1651        // Resolve source table
1652        let source = if let Some(name) = source_table_name {
1653            get_table(&state.tables, name)?.clone()
1654        } else if let Some(arn) = source_table_arn {
1655            find_table_by_arn(&state.tables, arn)?.clone()
1656        } else {
1657            return Err(AwsServiceError::aws_error(
1658                StatusCode::BAD_REQUEST,
1659                "ValidationException",
1660                "SourceTableName or SourceTableArn is required",
1661            ));
1662        };
1663
1664        if state.tables.contains_key(target_table_name) {
1665            return Err(AwsServiceError::aws_error(
1666                StatusCode::BAD_REQUEST,
1667                "TableAlreadyExistsException",
1668                format!("Table already exists: {target_table_name}"),
1669            ));
1670        }
1671
1672        let now = Utc::now();
1673        let arn = format!(
1674            "arn:aws:dynamodb:{}:{}:table/{}",
1675            state.region, state.account_id, target_table_name
1676        );
1677
1678        let table = DynamoTable {
1679            name: target_table_name.to_string(),
1680            arn: arn.clone(),
1681            key_schema: source.key_schema.clone(),
1682            attribute_definitions: source.attribute_definitions.clone(),
1683            provisioned_throughput: source.provisioned_throughput.clone(),
1684            items: Vec::new(),
1685            gsi: Vec::new(),
1686            lsi: Vec::new(),
1687            tags: HashMap::new(),
1688            created_at: now,
1689            status: "ACTIVE".to_string(),
1690            item_count: 0,
1691            size_bytes: 0,
1692            billing_mode: source.billing_mode.clone(),
1693            ttl_attribute: None,
1694            ttl_enabled: false,
1695            resource_policy: None,
1696            pitr_enabled: false,
1697            kinesis_destinations: Vec::new(),
1698            contributor_insights_status: "DISABLED".to_string(),
1699        };
1700
1701        let desc = build_table_description(&table);
1702        state.tables.insert(target_table_name.to_string(), table);
1703
1704        Self::ok_json(json!({
1705            "TableDescription": desc
1706        }))
1707    }
1708
1709    fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1710        let body = Self::parse_body(req)?;
1711        let table_name = require_str(&body, "TableName")?;
1712
1713        let pitr_spec = body["PointInTimeRecoverySpecification"]
1714            .as_object()
1715            .ok_or_else(|| {
1716                AwsServiceError::aws_error(
1717                    StatusCode::BAD_REQUEST,
1718                    "ValidationException",
1719                    "PointInTimeRecoverySpecification is required",
1720                )
1721            })?;
1722        let enabled = pitr_spec
1723            .get("PointInTimeRecoveryEnabled")
1724            .and_then(|v| v.as_bool())
1725            .unwrap_or(false);
1726
1727        let mut state = self.state.write();
1728        let table = get_table_mut(&mut state.tables, table_name)?;
1729        table.pitr_enabled = enabled;
1730
1731        let status = if enabled { "ENABLED" } else { "DISABLED" };
1732        Self::ok_json(json!({
1733            "ContinuousBackupsDescription": {
1734                "ContinuousBackupsStatus": status,
1735                "PointInTimeRecoveryDescription": {
1736                    "PointInTimeRecoveryStatus": status,
1737                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1738                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
1739                }
1740            }
1741        }))
1742    }
1743
1744    fn describe_continuous_backups(
1745        &self,
1746        req: &AwsRequest,
1747    ) -> Result<AwsResponse, AwsServiceError> {
1748        let body = Self::parse_body(req)?;
1749        let table_name = require_str(&body, "TableName")?;
1750
1751        let state = self.state.read();
1752        let table = get_table(&state.tables, table_name)?;
1753
1754        let status = if table.pitr_enabled {
1755            "ENABLED"
1756        } else {
1757            "DISABLED"
1758        };
1759        Self::ok_json(json!({
1760            "ContinuousBackupsDescription": {
1761                "ContinuousBackupsStatus": status,
1762                "PointInTimeRecoveryDescription": {
1763                    "PointInTimeRecoveryStatus": status,
1764                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1765                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
1766                }
1767            }
1768        }))
1769    }
1770
1771    // ── Global Tables ──────────────────────────────────────────────────
1772
1773    fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1774        let body = Self::parse_body(req)?;
1775        let global_table_name = require_str(&body, "GlobalTableName")?;
1776        validate_string_length("globalTableName", global_table_name, 3, 255)?;
1777
1778        let replication_group = body["ReplicationGroup"]
1779            .as_array()
1780            .ok_or_else(|| {
1781                AwsServiceError::aws_error(
1782                    StatusCode::BAD_REQUEST,
1783                    "ValidationException",
1784                    "ReplicationGroup is required",
1785                )
1786            })?
1787            .iter()
1788            .filter_map(|r| {
1789                r["RegionName"].as_str().map(|rn| ReplicaDescription {
1790                    region_name: rn.to_string(),
1791                    replica_status: "ACTIVE".to_string(),
1792                })
1793            })
1794            .collect::<Vec<_>>();
1795
1796        let mut state = self.state.write();
1797
1798        if state.global_tables.contains_key(global_table_name) {
1799            return Err(AwsServiceError::aws_error(
1800                StatusCode::BAD_REQUEST,
1801                "GlobalTableAlreadyExistsException",
1802                format!("Global table already exists: {global_table_name}"),
1803            ));
1804        }
1805
1806        let arn = format!(
1807            "arn:aws:dynamodb::{}:global-table/{}",
1808            state.account_id, global_table_name
1809        );
1810        let now = Utc::now();
1811
1812        let gt = GlobalTableDescription {
1813            global_table_name: global_table_name.to_string(),
1814            global_table_arn: arn.clone(),
1815            global_table_status: "ACTIVE".to_string(),
1816            creation_date: now,
1817            replication_group: replication_group.clone(),
1818        };
1819
1820        state
1821            .global_tables
1822            .insert(global_table_name.to_string(), gt);
1823
1824        Self::ok_json(json!({
1825            "GlobalTableDescription": {
1826                "GlobalTableName": global_table_name,
1827                "GlobalTableArn": arn,
1828                "GlobalTableStatus": "ACTIVE",
1829                "CreationDateTime": now.timestamp() as f64,
1830                "ReplicationGroup": replication_group.iter().map(|r| json!({
1831                    "RegionName": r.region_name,
1832                    "ReplicaStatus": r.replica_status
1833                })).collect::<Vec<_>>()
1834            }
1835        }))
1836    }
1837
1838    fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1839        let body = Self::parse_body(req)?;
1840        let global_table_name = require_str(&body, "GlobalTableName")?;
1841        validate_string_length("globalTableName", global_table_name, 3, 255)?;
1842
1843        let state = self.state.read();
1844        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
1845            AwsServiceError::aws_error(
1846                StatusCode::BAD_REQUEST,
1847                "GlobalTableNotFoundException",
1848                format!("Global table not found: {global_table_name}"),
1849            )
1850        })?;
1851
1852        Self::ok_json(json!({
1853            "GlobalTableDescription": {
1854                "GlobalTableName": gt.global_table_name,
1855                "GlobalTableArn": gt.global_table_arn,
1856                "GlobalTableStatus": gt.global_table_status,
1857                "CreationDateTime": gt.creation_date.timestamp() as f64,
1858                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1859                    "RegionName": r.region_name,
1860                    "ReplicaStatus": r.replica_status
1861                })).collect::<Vec<_>>()
1862            }
1863        }))
1864    }
1865
1866    fn describe_global_table_settings(
1867        &self,
1868        req: &AwsRequest,
1869    ) -> Result<AwsResponse, AwsServiceError> {
1870        let body = Self::parse_body(req)?;
1871        let global_table_name = require_str(&body, "GlobalTableName")?;
1872        validate_string_length("globalTableName", global_table_name, 3, 255)?;
1873
1874        let state = self.state.read();
1875        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
1876            AwsServiceError::aws_error(
1877                StatusCode::BAD_REQUEST,
1878                "GlobalTableNotFoundException",
1879                format!("Global table not found: {global_table_name}"),
1880            )
1881        })?;
1882
1883        let replica_settings: Vec<Value> = gt
1884            .replication_group
1885            .iter()
1886            .map(|r| {
1887                json!({
1888                    "RegionName": r.region_name,
1889                    "ReplicaStatus": r.replica_status,
1890                    "ReplicaProvisionedReadCapacityUnits": 0,
1891                    "ReplicaProvisionedWriteCapacityUnits": 0
1892                })
1893            })
1894            .collect();
1895
1896        Self::ok_json(json!({
1897            "GlobalTableName": gt.global_table_name,
1898            "ReplicaSettings": replica_settings
1899        }))
1900    }
1901
1902    fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1903        let body = Self::parse_body(req)?;
1904        validate_optional_string_length(
1905            "exclusiveStartGlobalTableName",
1906            body["ExclusiveStartGlobalTableName"].as_str(),
1907            3,
1908            255,
1909        )?;
1910        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
1911        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1912
1913        let state = self.state.read();
1914        let tables: Vec<Value> = state
1915            .global_tables
1916            .values()
1917            .take(limit)
1918            .map(|gt| {
1919                json!({
1920                    "GlobalTableName": gt.global_table_name,
1921                    "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1922                        "RegionName": r.region_name
1923                    })).collect::<Vec<_>>()
1924                })
1925            })
1926            .collect();
1927
1928        Self::ok_json(json!({
1929            "GlobalTables": tables
1930        }))
1931    }
1932
1933    fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1934        let body = Self::parse_body(req)?;
1935        let global_table_name = require_str(&body, "GlobalTableName")?;
1936        validate_string_length("globalTableName", global_table_name, 3, 255)?;
1937        validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
1938
1939        let mut state = self.state.write();
1940        let gt = state
1941            .global_tables
1942            .get_mut(global_table_name)
1943            .ok_or_else(|| {
1944                AwsServiceError::aws_error(
1945                    StatusCode::BAD_REQUEST,
1946                    "GlobalTableNotFoundException",
1947                    format!("Global table not found: {global_table_name}"),
1948                )
1949            })?;
1950
1951        if let Some(updates) = body["ReplicaUpdates"].as_array() {
1952            for update in updates {
1953                if let Some(create) = update["Create"].as_object() {
1954                    if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
1955                        gt.replication_group.push(ReplicaDescription {
1956                            region_name: region.to_string(),
1957                            replica_status: "ACTIVE".to_string(),
1958                        });
1959                    }
1960                }
1961                if let Some(delete) = update["Delete"].as_object() {
1962                    if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
1963                        gt.replication_group.retain(|r| r.region_name != region);
1964                    }
1965                }
1966            }
1967        }
1968
1969        Self::ok_json(json!({
1970            "GlobalTableDescription": {
1971                "GlobalTableName": gt.global_table_name,
1972                "GlobalTableArn": gt.global_table_arn,
1973                "GlobalTableStatus": gt.global_table_status,
1974                "CreationDateTime": gt.creation_date.timestamp() as f64,
1975                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1976                    "RegionName": r.region_name,
1977                    "ReplicaStatus": r.replica_status
1978                })).collect::<Vec<_>>()
1979            }
1980        }))
1981    }
1982
1983    fn update_global_table_settings(
1984        &self,
1985        req: &AwsRequest,
1986    ) -> Result<AwsResponse, AwsServiceError> {
1987        let body = Self::parse_body(req)?;
1988        let global_table_name = require_str(&body, "GlobalTableName")?;
1989        validate_string_length("globalTableName", global_table_name, 3, 255)?;
1990        validate_optional_enum_value(
1991            "globalTableBillingMode",
1992            &body["GlobalTableBillingMode"],
1993            &["PROVISIONED", "PAY_PER_REQUEST"],
1994        )?;
1995        validate_optional_range_i64(
1996            "globalTableProvisionedWriteCapacityUnits",
1997            body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
1998            1,
1999            i64::MAX,
2000        )?;
2001
2002        let state = self.state.read();
2003        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2004            AwsServiceError::aws_error(
2005                StatusCode::BAD_REQUEST,
2006                "GlobalTableNotFoundException",
2007                format!("Global table not found: {global_table_name}"),
2008            )
2009        })?;
2010
2011        let replica_settings: Vec<Value> = gt
2012            .replication_group
2013            .iter()
2014            .map(|r| {
2015                json!({
2016                    "RegionName": r.region_name,
2017                    "ReplicaStatus": r.replica_status,
2018                    "ReplicaProvisionedReadCapacityUnits": 0,
2019                    "ReplicaProvisionedWriteCapacityUnits": 0
2020                })
2021            })
2022            .collect();
2023
2024        Self::ok_json(json!({
2025            "GlobalTableName": gt.global_table_name,
2026            "ReplicaSettings": replica_settings
2027        }))
2028    }
2029
2030    fn describe_table_replica_auto_scaling(
2031        &self,
2032        req: &AwsRequest,
2033    ) -> Result<AwsResponse, AwsServiceError> {
2034        let body = Self::parse_body(req)?;
2035        let table_name = require_str(&body, "TableName")?;
2036
2037        let state = self.state.read();
2038        let table = get_table(&state.tables, table_name)?;
2039
2040        Self::ok_json(json!({
2041            "TableAutoScalingDescription": {
2042                "TableName": table.name,
2043                "TableStatus": table.status,
2044                "Replicas": []
2045            }
2046        }))
2047    }
2048
2049    fn update_table_replica_auto_scaling(
2050        &self,
2051        req: &AwsRequest,
2052    ) -> Result<AwsResponse, AwsServiceError> {
2053        let body = Self::parse_body(req)?;
2054        let table_name = require_str(&body, "TableName")?;
2055
2056        let state = self.state.read();
2057        let table = get_table(&state.tables, table_name)?;
2058
2059        Self::ok_json(json!({
2060            "TableAutoScalingDescription": {
2061                "TableName": table.name,
2062                "TableStatus": table.status,
2063                "Replicas": []
2064            }
2065        }))
2066    }
2067
2068    // ── Kinesis Streaming ──────────────────────────────────────────────
2069
2070    fn enable_kinesis_streaming_destination(
2071        &self,
2072        req: &AwsRequest,
2073    ) -> Result<AwsResponse, AwsServiceError> {
2074        let body = Self::parse_body(req)?;
2075        let table_name = require_str(&body, "TableName")?;
2076        let stream_arn = require_str(&body, "StreamArn")?;
2077        let precision = body["EnableKinesisStreamingConfiguration"]
2078            ["ApproximateCreationDateTimePrecision"]
2079            .as_str()
2080            .unwrap_or("MILLISECOND");
2081
2082        let mut state = self.state.write();
2083        let table = get_table_mut(&mut state.tables, table_name)?;
2084
2085        table.kinesis_destinations.push(KinesisDestination {
2086            stream_arn: stream_arn.to_string(),
2087            destination_status: "ACTIVE".to_string(),
2088            approximate_creation_date_time_precision: precision.to_string(),
2089        });
2090
2091        Self::ok_json(json!({
2092            "TableName": table_name,
2093            "StreamArn": stream_arn,
2094            "DestinationStatus": "ACTIVE",
2095            "EnableKinesisStreamingConfiguration": {
2096                "ApproximateCreationDateTimePrecision": precision
2097            }
2098        }))
2099    }
2100
2101    fn disable_kinesis_streaming_destination(
2102        &self,
2103        req: &AwsRequest,
2104    ) -> Result<AwsResponse, AwsServiceError> {
2105        let body = Self::parse_body(req)?;
2106        let table_name = require_str(&body, "TableName")?;
2107        let stream_arn = require_str(&body, "StreamArn")?;
2108
2109        let mut state = self.state.write();
2110        let table = get_table_mut(&mut state.tables, table_name)?;
2111
2112        if let Some(dest) = table
2113            .kinesis_destinations
2114            .iter_mut()
2115            .find(|d| d.stream_arn == stream_arn)
2116        {
2117            dest.destination_status = "DISABLED".to_string();
2118        }
2119
2120        Self::ok_json(json!({
2121            "TableName": table_name,
2122            "StreamArn": stream_arn,
2123            "DestinationStatus": "DISABLED"
2124        }))
2125    }
2126
2127    fn describe_kinesis_streaming_destination(
2128        &self,
2129        req: &AwsRequest,
2130    ) -> Result<AwsResponse, AwsServiceError> {
2131        let body = Self::parse_body(req)?;
2132        let table_name = require_str(&body, "TableName")?;
2133
2134        let state = self.state.read();
2135        let table = get_table(&state.tables, table_name)?;
2136
2137        let destinations: Vec<Value> = table
2138            .kinesis_destinations
2139            .iter()
2140            .map(|d| {
2141                json!({
2142                    "StreamArn": d.stream_arn,
2143                    "DestinationStatus": d.destination_status,
2144                    "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2145                })
2146            })
2147            .collect();
2148
2149        Self::ok_json(json!({
2150            "TableName": table_name,
2151            "KinesisDataStreamDestinations": destinations
2152        }))
2153    }
2154
2155    fn update_kinesis_streaming_destination(
2156        &self,
2157        req: &AwsRequest,
2158    ) -> Result<AwsResponse, AwsServiceError> {
2159        let body = Self::parse_body(req)?;
2160        let table_name = require_str(&body, "TableName")?;
2161        let stream_arn = require_str(&body, "StreamArn")?;
2162        let precision = body["UpdateKinesisStreamingConfiguration"]
2163            ["ApproximateCreationDateTimePrecision"]
2164            .as_str()
2165            .unwrap_or("MILLISECOND");
2166
2167        let mut state = self.state.write();
2168        let table = get_table_mut(&mut state.tables, table_name)?;
2169
2170        if let Some(dest) = table
2171            .kinesis_destinations
2172            .iter_mut()
2173            .find(|d| d.stream_arn == stream_arn)
2174        {
2175            dest.approximate_creation_date_time_precision = precision.to_string();
2176        }
2177
2178        Self::ok_json(json!({
2179            "TableName": table_name,
2180            "StreamArn": stream_arn,
2181            "DestinationStatus": "ACTIVE",
2182            "UpdateKinesisStreamingConfiguration": {
2183                "ApproximateCreationDateTimePrecision": precision
2184            }
2185        }))
2186    }
2187
2188    // ── Contributor Insights ───────────────────────────────────────────
2189
2190    fn describe_contributor_insights(
2191        &self,
2192        req: &AwsRequest,
2193    ) -> Result<AwsResponse, AwsServiceError> {
2194        let body = Self::parse_body(req)?;
2195        let table_name = require_str(&body, "TableName")?;
2196        let index_name = body["IndexName"].as_str();
2197
2198        let state = self.state.read();
2199        let table = get_table(&state.tables, table_name)?;
2200
2201        let mut result = json!({
2202            "TableName": table_name,
2203            "ContributorInsightsStatus": table.contributor_insights_status,
2204            "ContributorInsightsRuleList": []
2205        });
2206        if let Some(idx) = index_name {
2207            result["IndexName"] = json!(idx);
2208        }
2209
2210        Self::ok_json(result)
2211    }
2212
2213    fn update_contributor_insights(
2214        &self,
2215        req: &AwsRequest,
2216    ) -> Result<AwsResponse, AwsServiceError> {
2217        let body = Self::parse_body(req)?;
2218        let table_name = require_str(&body, "TableName")?;
2219        let action = require_str(&body, "ContributorInsightsAction")?;
2220        let index_name = body["IndexName"].as_str();
2221
2222        let mut state = self.state.write();
2223        let table = get_table_mut(&mut state.tables, table_name)?;
2224
2225        let status = match action {
2226            "ENABLE" => "ENABLED",
2227            "DISABLE" => "DISABLED",
2228            _ => {
2229                return Err(AwsServiceError::aws_error(
2230                    StatusCode::BAD_REQUEST,
2231                    "ValidationException",
2232                    format!("Invalid ContributorInsightsAction: {action}"),
2233                ))
2234            }
2235        };
2236        table.contributor_insights_status = status.to_string();
2237
2238        let mut result = json!({
2239            "TableName": table_name,
2240            "ContributorInsightsStatus": status
2241        });
2242        if let Some(idx) = index_name {
2243            result["IndexName"] = json!(idx);
2244        }
2245
2246        Self::ok_json(result)
2247    }
2248
2249    fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2250        let body = Self::parse_body(req)?;
2251        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2252        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2253        let table_name = body["TableName"].as_str();
2254
2255        let state = self.state.read();
2256        let summaries: Vec<Value> = state
2257            .tables
2258            .values()
2259            .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2260            .map(|t| {
2261                json!({
2262                    "TableName": t.name,
2263                    "ContributorInsightsStatus": t.contributor_insights_status
2264                })
2265            })
2266            .collect();
2267
2268        Self::ok_json(json!({
2269            "ContributorInsightsSummaries": summaries
2270        }))
2271    }
2272
2273    // ── Import/Export ──────────────────────────────────────────────────
2274
2275    fn export_table_to_point_in_time(
2276        &self,
2277        req: &AwsRequest,
2278    ) -> Result<AwsResponse, AwsServiceError> {
2279        let body = Self::parse_body(req)?;
2280        let table_arn = require_str(&body, "TableArn")?;
2281        let s3_bucket = require_str(&body, "S3Bucket")?;
2282        let s3_prefix = body["S3Prefix"].as_str();
2283        let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2284
2285        let state = self.state.read();
2286        // Verify table exists
2287        find_table_by_arn(&state.tables, table_arn)?;
2288
2289        let now = Utc::now();
2290        let export_arn = format!(
2291            "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2292            state.region,
2293            state.account_id,
2294            table_arn.rsplit('/').next().unwrap_or("unknown"),
2295            uuid::Uuid::new_v4()
2296        );
2297
2298        let export = ExportDescription {
2299            export_arn: export_arn.clone(),
2300            export_status: "COMPLETED".to_string(),
2301            table_arn: table_arn.to_string(),
2302            s3_bucket: s3_bucket.to_string(),
2303            s3_prefix: s3_prefix.map(|s| s.to_string()),
2304            export_format: export_format.to_string(),
2305            start_time: now,
2306            end_time: now,
2307            export_time: now,
2308            item_count: 0,
2309            billed_size_bytes: 0,
2310        };
2311
2312        drop(state);
2313        let mut state = self.state.write();
2314        state.exports.insert(export_arn.clone(), export);
2315
2316        Self::ok_json(json!({
2317            "ExportDescription": {
2318                "ExportArn": export_arn,
2319                "ExportStatus": "COMPLETED",
2320                "TableArn": table_arn,
2321                "S3Bucket": s3_bucket,
2322                "S3Prefix": s3_prefix,
2323                "ExportFormat": export_format,
2324                "StartTime": now.timestamp() as f64,
2325                "EndTime": now.timestamp() as f64,
2326                "ExportTime": now.timestamp() as f64,
2327                "ItemCount": 0,
2328                "BilledSizeBytes": 0
2329            }
2330        }))
2331    }
2332
2333    fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2334        let body = Self::parse_body(req)?;
2335        let export_arn = require_str(&body, "ExportArn")?;
2336
2337        let state = self.state.read();
2338        let export = state.exports.get(export_arn).ok_or_else(|| {
2339            AwsServiceError::aws_error(
2340                StatusCode::BAD_REQUEST,
2341                "ExportNotFoundException",
2342                format!("Export not found: {export_arn}"),
2343            )
2344        })?;
2345
2346        Self::ok_json(json!({
2347            "ExportDescription": {
2348                "ExportArn": export.export_arn,
2349                "ExportStatus": export.export_status,
2350                "TableArn": export.table_arn,
2351                "S3Bucket": export.s3_bucket,
2352                "S3Prefix": export.s3_prefix,
2353                "ExportFormat": export.export_format,
2354                "StartTime": export.start_time.timestamp() as f64,
2355                "EndTime": export.end_time.timestamp() as f64,
2356                "ExportTime": export.export_time.timestamp() as f64,
2357                "ItemCount": export.item_count,
2358                "BilledSizeBytes": export.billed_size_bytes
2359            }
2360        }))
2361    }
2362
2363    fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2364        let body = Self::parse_body(req)?;
2365        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2366        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2367        let table_arn = body["TableArn"].as_str();
2368
2369        let state = self.state.read();
2370        let summaries: Vec<Value> = state
2371            .exports
2372            .values()
2373            .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2374            .map(|e| {
2375                json!({
2376                    "ExportArn": e.export_arn,
2377                    "ExportStatus": e.export_status,
2378                    "TableArn": e.table_arn
2379                })
2380            })
2381            .collect();
2382
2383        Self::ok_json(json!({
2384            "ExportSummaries": summaries
2385        }))
2386    }
2387
2388    fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2389        let body = Self::parse_body(req)?;
2390        let input_format = require_str(&body, "InputFormat")?;
2391        let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2392            AwsServiceError::aws_error(
2393                StatusCode::BAD_REQUEST,
2394                "ValidationException",
2395                "S3BucketSource is required",
2396            )
2397        })?;
2398        let s3_bucket = s3_source
2399            .get("S3Bucket")
2400            .and_then(|v| v.as_str())
2401            .unwrap_or("");
2402
2403        let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2404            AwsServiceError::aws_error(
2405                StatusCode::BAD_REQUEST,
2406                "ValidationException",
2407                "TableCreationParameters is required",
2408            )
2409        })?;
2410        let table_name = table_params
2411            .get("TableName")
2412            .and_then(|v| v.as_str())
2413            .ok_or_else(|| {
2414                AwsServiceError::aws_error(
2415                    StatusCode::BAD_REQUEST,
2416                    "ValidationException",
2417                    "TableCreationParameters.TableName is required",
2418                )
2419            })?;
2420
2421        let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
2422        let attribute_definitions = parse_attribute_definitions(
2423            table_params
2424                .get("AttributeDefinitions")
2425                .unwrap_or(&Value::Null),
2426        )?;
2427
2428        let mut state = self.state.write();
2429
2430        if state.tables.contains_key(table_name) {
2431            return Err(AwsServiceError::aws_error(
2432                StatusCode::BAD_REQUEST,
2433                "ResourceInUseException",
2434                format!("Table already exists: {table_name}"),
2435            ));
2436        }
2437
2438        let now = Utc::now();
2439        let table_arn = format!(
2440            "arn:aws:dynamodb:{}:{}:table/{}",
2441            state.region, state.account_id, table_name
2442        );
2443        let import_arn = format!(
2444            "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
2445            state.region,
2446            state.account_id,
2447            table_name,
2448            uuid::Uuid::new_v4()
2449        );
2450
2451        let table = DynamoTable {
2452            name: table_name.to_string(),
2453            arn: table_arn.clone(),
2454            key_schema,
2455            attribute_definitions,
2456            provisioned_throughput: ProvisionedThroughput {
2457                read_capacity_units: 0,
2458                write_capacity_units: 0,
2459            },
2460            items: Vec::new(),
2461            gsi: Vec::new(),
2462            lsi: Vec::new(),
2463            tags: HashMap::new(),
2464            created_at: now,
2465            status: "ACTIVE".to_string(),
2466            item_count: 0,
2467            size_bytes: 0,
2468            billing_mode: "PAY_PER_REQUEST".to_string(),
2469            ttl_attribute: None,
2470            ttl_enabled: false,
2471            resource_policy: None,
2472            pitr_enabled: false,
2473            kinesis_destinations: Vec::new(),
2474            contributor_insights_status: "DISABLED".to_string(),
2475        };
2476        state.tables.insert(table_name.to_string(), table);
2477
2478        let import_desc = ImportDescription {
2479            import_arn: import_arn.clone(),
2480            import_status: "COMPLETED".to_string(),
2481            table_arn: table_arn.clone(),
2482            table_name: table_name.to_string(),
2483            s3_bucket_source: s3_bucket.to_string(),
2484            input_format: input_format.to_string(),
2485            start_time: now,
2486            end_time: now,
2487            processed_item_count: 0,
2488            processed_size_bytes: 0,
2489        };
2490        state.imports.insert(import_arn.clone(), import_desc);
2491
2492        let table_ref = state.tables.get(table_name).unwrap();
2493        let ks: Vec<Value> = table_ref
2494            .key_schema
2495            .iter()
2496            .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2497            .collect();
2498        let ad: Vec<Value> = table_ref
2499            .attribute_definitions
2500            .iter()
2501            .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
2502            .collect();
2503
2504        Self::ok_json(json!({
2505            "ImportTableDescription": {
2506                "ImportArn": import_arn,
2507                "ImportStatus": "COMPLETED",
2508                "TableArn": table_arn,
2509                "TableId": uuid::Uuid::new_v4().to_string(),
2510                "S3BucketSource": {
2511                    "S3Bucket": s3_bucket
2512                },
2513                "InputFormat": input_format,
2514                "TableCreationParameters": {
2515                    "TableName": table_name,
2516                    "KeySchema": ks,
2517                    "AttributeDefinitions": ad
2518                },
2519                "StartTime": now.timestamp() as f64,
2520                "EndTime": now.timestamp() as f64,
2521                "ProcessedItemCount": 0,
2522                "ProcessedSizeBytes": 0
2523            }
2524        }))
2525    }
2526
2527    fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2528        let body = Self::parse_body(req)?;
2529        let import_arn = require_str(&body, "ImportArn")?;
2530
2531        let state = self.state.read();
2532        let import = state.imports.get(import_arn).ok_or_else(|| {
2533            AwsServiceError::aws_error(
2534                StatusCode::BAD_REQUEST,
2535                "ImportNotFoundException",
2536                format!("Import not found: {import_arn}"),
2537            )
2538        })?;
2539
2540        Self::ok_json(json!({
2541            "ImportTableDescription": {
2542                "ImportArn": import.import_arn,
2543                "ImportStatus": import.import_status,
2544                "TableArn": import.table_arn,
2545                "S3BucketSource": {
2546                    "S3Bucket": import.s3_bucket_source
2547                },
2548                "InputFormat": import.input_format,
2549                "StartTime": import.start_time.timestamp() as f64,
2550                "EndTime": import.end_time.timestamp() as f64,
2551                "ProcessedItemCount": import.processed_item_count,
2552                "ProcessedSizeBytes": import.processed_size_bytes
2553            }
2554        }))
2555    }
2556
2557    fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2558        let body = Self::parse_body(req)?;
2559        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2560        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
2561        validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
2562        let table_arn = body["TableArn"].as_str();
2563
2564        let state = self.state.read();
2565        let summaries: Vec<Value> = state
2566            .imports
2567            .values()
2568            .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
2569            .map(|i| {
2570                json!({
2571                    "ImportArn": i.import_arn,
2572                    "ImportStatus": i.import_status,
2573                    "TableArn": i.table_arn
2574                })
2575            })
2576            .collect();
2577
2578        Self::ok_json(json!({
2579            "ImportSummaryList": summaries
2580        }))
2581    }
2582}
2583
2584#[async_trait]
2585impl AwsService for DynamoDbService {
2586    fn service_name(&self) -> &str {
2587        "dynamodb"
2588    }
2589
2590    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2591        match req.action.as_str() {
2592            "CreateTable" => self.create_table(&req),
2593            "DeleteTable" => self.delete_table(&req),
2594            "DescribeTable" => self.describe_table(&req),
2595            "ListTables" => self.list_tables(&req),
2596            "UpdateTable" => self.update_table(&req),
2597            "PutItem" => self.put_item(&req),
2598            "GetItem" => self.get_item(&req),
2599            "DeleteItem" => self.delete_item(&req),
2600            "UpdateItem" => self.update_item(&req),
2601            "Query" => self.query(&req),
2602            "Scan" => self.scan(&req),
2603            "BatchGetItem" => self.batch_get_item(&req),
2604            "BatchWriteItem" => self.batch_write_item(&req),
2605            "TagResource" => self.tag_resource(&req),
2606            "UntagResource" => self.untag_resource(&req),
2607            "ListTagsOfResource" => self.list_tags_of_resource(&req),
2608            "TransactGetItems" => self.transact_get_items(&req),
2609            "TransactWriteItems" => self.transact_write_items(&req),
2610            "ExecuteStatement" => self.execute_statement(&req),
2611            "BatchExecuteStatement" => self.batch_execute_statement(&req),
2612            "ExecuteTransaction" => self.execute_transaction(&req),
2613            "UpdateTimeToLive" => self.update_time_to_live(&req),
2614            "DescribeTimeToLive" => self.describe_time_to_live(&req),
2615            "PutResourcePolicy" => self.put_resource_policy(&req),
2616            "GetResourcePolicy" => self.get_resource_policy(&req),
2617            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
2618            // Stubs
2619            "DescribeEndpoints" => self.describe_endpoints(&req),
2620            "DescribeLimits" => self.describe_limits(&req),
2621            // Backups
2622            "CreateBackup" => self.create_backup(&req),
2623            "DeleteBackup" => self.delete_backup(&req),
2624            "DescribeBackup" => self.describe_backup(&req),
2625            "ListBackups" => self.list_backups(&req),
2626            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
2627            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
2628            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
2629            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
2630            // Global tables
2631            "CreateGlobalTable" => self.create_global_table(&req),
2632            "DescribeGlobalTable" => self.describe_global_table(&req),
2633            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
2634            "ListGlobalTables" => self.list_global_tables(&req),
2635            "UpdateGlobalTable" => self.update_global_table(&req),
2636            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
2637            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
2638            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
2639            // Kinesis streaming
2640            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
2641            "DisableKinesisStreamingDestination" => {
2642                self.disable_kinesis_streaming_destination(&req)
2643            }
2644            "DescribeKinesisStreamingDestination" => {
2645                self.describe_kinesis_streaming_destination(&req)
2646            }
2647            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
2648            // Contributor insights
2649            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
2650            "UpdateContributorInsights" => self.update_contributor_insights(&req),
2651            "ListContributorInsights" => self.list_contributor_insights(&req),
2652            // Import/Export
2653            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
2654            "DescribeExport" => self.describe_export(&req),
2655            "ListExports" => self.list_exports(&req),
2656            "ImportTable" => self.import_table(&req),
2657            "DescribeImport" => self.describe_import(&req),
2658            "ListImports" => self.list_imports(&req),
2659            _ => Err(AwsServiceError::action_not_implemented(
2660                "dynamodb",
2661                &req.action,
2662            )),
2663        }
2664    }
2665
2666    fn supported_actions(&self) -> &[&str] {
2667        &[
2668            "CreateTable",
2669            "DeleteTable",
2670            "DescribeTable",
2671            "ListTables",
2672            "UpdateTable",
2673            "PutItem",
2674            "GetItem",
2675            "DeleteItem",
2676            "UpdateItem",
2677            "Query",
2678            "Scan",
2679            "BatchGetItem",
2680            "BatchWriteItem",
2681            "TagResource",
2682            "UntagResource",
2683            "ListTagsOfResource",
2684            "TransactGetItems",
2685            "TransactWriteItems",
2686            "ExecuteStatement",
2687            "BatchExecuteStatement",
2688            "ExecuteTransaction",
2689            "UpdateTimeToLive",
2690            "DescribeTimeToLive",
2691            "PutResourcePolicy",
2692            "GetResourcePolicy",
2693            "DeleteResourcePolicy",
2694            "DescribeEndpoints",
2695            "DescribeLimits",
2696            "CreateBackup",
2697            "DeleteBackup",
2698            "DescribeBackup",
2699            "ListBackups",
2700            "RestoreTableFromBackup",
2701            "RestoreTableToPointInTime",
2702            "UpdateContinuousBackups",
2703            "DescribeContinuousBackups",
2704            "CreateGlobalTable",
2705            "DescribeGlobalTable",
2706            "DescribeGlobalTableSettings",
2707            "ListGlobalTables",
2708            "UpdateGlobalTable",
2709            "UpdateGlobalTableSettings",
2710            "DescribeTableReplicaAutoScaling",
2711            "UpdateTableReplicaAutoScaling",
2712            "EnableKinesisStreamingDestination",
2713            "DisableKinesisStreamingDestination",
2714            "DescribeKinesisStreamingDestination",
2715            "UpdateKinesisStreamingDestination",
2716            "DescribeContributorInsights",
2717            "UpdateContributorInsights",
2718            "ListContributorInsights",
2719            "ExportTableToPointInTime",
2720            "DescribeExport",
2721            "ListExports",
2722            "ImportTable",
2723            "DescribeImport",
2724            "ListImports",
2725        ]
2726    }
2727}
2728
2729// ── Helper functions ────────────────────────────────────────────────────
2730
2731fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
2732    body[field].as_str().ok_or_else(|| {
2733        AwsServiceError::aws_error(
2734            StatusCode::BAD_REQUEST,
2735            "ValidationException",
2736            format!("{field} is required"),
2737        )
2738    })
2739}
2740
2741fn require_object(
2742    body: &Value,
2743    field: &str,
2744) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
2745    let obj = body[field].as_object().ok_or_else(|| {
2746        AwsServiceError::aws_error(
2747            StatusCode::BAD_REQUEST,
2748            "ValidationException",
2749            format!("{field} is required"),
2750        )
2751    })?;
2752    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
2753}
2754
2755fn get_table<'a>(
2756    tables: &'a HashMap<String, DynamoTable>,
2757    name: &str,
2758) -> Result<&'a DynamoTable, AwsServiceError> {
2759    tables.get(name).ok_or_else(|| {
2760        AwsServiceError::aws_error(
2761            StatusCode::BAD_REQUEST,
2762            "ResourceNotFoundException",
2763            format!("Requested resource not found: Table: {name} not found"),
2764        )
2765    })
2766}
2767
2768fn get_table_mut<'a>(
2769    tables: &'a mut HashMap<String, DynamoTable>,
2770    name: &str,
2771) -> Result<&'a mut DynamoTable, AwsServiceError> {
2772    tables.get_mut(name).ok_or_else(|| {
2773        AwsServiceError::aws_error(
2774            StatusCode::BAD_REQUEST,
2775            "ResourceNotFoundException",
2776            format!("Requested resource not found: Table: {name} not found"),
2777        )
2778    })
2779}
2780
2781fn find_table_by_arn<'a>(
2782    tables: &'a HashMap<String, DynamoTable>,
2783    arn: &str,
2784) -> Result<&'a DynamoTable, AwsServiceError> {
2785    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
2786        AwsServiceError::aws_error(
2787            StatusCode::BAD_REQUEST,
2788            "ResourceNotFoundException",
2789            format!("Requested resource not found: {arn}"),
2790        )
2791    })
2792}
2793
2794fn find_table_by_arn_mut<'a>(
2795    tables: &'a mut HashMap<String, DynamoTable>,
2796    arn: &str,
2797) -> Result<&'a mut DynamoTable, AwsServiceError> {
2798    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
2799        AwsServiceError::aws_error(
2800            StatusCode::BAD_REQUEST,
2801            "ResourceNotFoundException",
2802            format!("Requested resource not found: {arn}"),
2803        )
2804    })
2805}
2806
2807fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
2808    let arr = val.as_array().ok_or_else(|| {
2809        AwsServiceError::aws_error(
2810            StatusCode::BAD_REQUEST,
2811            "ValidationException",
2812            "KeySchema is required",
2813        )
2814    })?;
2815    Ok(arr
2816        .iter()
2817        .map(|elem| KeySchemaElement {
2818            attribute_name: elem["AttributeName"]
2819                .as_str()
2820                .unwrap_or_default()
2821                .to_string(),
2822            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
2823        })
2824        .collect())
2825}
2826
2827fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
2828    let arr = val.as_array().ok_or_else(|| {
2829        AwsServiceError::aws_error(
2830            StatusCode::BAD_REQUEST,
2831            "ValidationException",
2832            "AttributeDefinitions is required",
2833        )
2834    })?;
2835    Ok(arr
2836        .iter()
2837        .map(|elem| AttributeDefinition {
2838            attribute_name: elem["AttributeName"]
2839                .as_str()
2840                .unwrap_or_default()
2841                .to_string(),
2842            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
2843        })
2844        .collect())
2845}
2846
2847fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
2848    Ok(ProvisionedThroughput {
2849        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
2850        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
2851    })
2852}
2853
2854fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
2855    let Some(arr) = val.as_array() else {
2856        return Vec::new();
2857    };
2858    arr.iter()
2859        .filter_map(|g| {
2860            Some(GlobalSecondaryIndex {
2861                index_name: g["IndexName"].as_str()?.to_string(),
2862                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
2863                projection: parse_projection(&g["Projection"]),
2864                provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
2865                    .ok(),
2866            })
2867        })
2868        .collect()
2869}
2870
2871fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
2872    let Some(arr) = val.as_array() else {
2873        return Vec::new();
2874    };
2875    arr.iter()
2876        .filter_map(|l| {
2877            Some(LocalSecondaryIndex {
2878                index_name: l["IndexName"].as_str()?.to_string(),
2879                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
2880                projection: parse_projection(&l["Projection"]),
2881            })
2882        })
2883        .collect()
2884}
2885
2886fn parse_projection(val: &Value) -> Projection {
2887    Projection {
2888        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
2889        non_key_attributes: val["NonKeyAttributes"]
2890            .as_array()
2891            .map(|arr| {
2892                arr.iter()
2893                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2894                    .collect()
2895            })
2896            .unwrap_or_default(),
2897    }
2898}
2899
2900fn parse_tags(val: &Value) -> HashMap<String, String> {
2901    let mut tags = HashMap::new();
2902    if let Some(arr) = val.as_array() {
2903        for tag in arr {
2904            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
2905                tags.insert(k.to_string(), v.to_string());
2906            }
2907        }
2908    }
2909    tags
2910}
2911
2912fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
2913    let mut names = HashMap::new();
2914    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
2915        for (k, v) in obj {
2916            if let Some(s) = v.as_str() {
2917                names.insert(k.clone(), s.to_string());
2918            }
2919        }
2920    }
2921    names
2922}
2923
2924fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
2925    let mut values = HashMap::new();
2926    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
2927        for (k, v) in obj {
2928            values.insert(k.clone(), v.clone());
2929        }
2930    }
2931    values
2932}
2933
2934fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
2935    if name.starts_with('#') {
2936        expr_attr_names
2937            .get(name)
2938            .cloned()
2939            .unwrap_or_else(|| name.to_string())
2940    } else {
2941        name.to_string()
2942    }
2943}
2944
2945fn extract_key(
2946    table: &DynamoTable,
2947    item: &HashMap<String, AttributeValue>,
2948) -> HashMap<String, AttributeValue> {
2949    let mut key = HashMap::new();
2950    let hash_key = table.hash_key_name();
2951    if let Some(v) = item.get(hash_key) {
2952        key.insert(hash_key.to_string(), v.clone());
2953    }
2954    if let Some(range_key) = table.range_key_name() {
2955        if let Some(v) = item.get(range_key) {
2956            key.insert(range_key.to_string(), v.clone());
2957        }
2958    }
2959    key
2960}
2961
2962fn validate_key_in_item(
2963    table: &DynamoTable,
2964    item: &HashMap<String, AttributeValue>,
2965) -> Result<(), AwsServiceError> {
2966    let hash_key = table.hash_key_name();
2967    if !item.contains_key(hash_key) {
2968        return Err(AwsServiceError::aws_error(
2969            StatusCode::BAD_REQUEST,
2970            "ValidationException",
2971            format!("Missing the key {hash_key} in the item"),
2972        ));
2973    }
2974    if let Some(range_key) = table.range_key_name() {
2975        if !item.contains_key(range_key) {
2976            return Err(AwsServiceError::aws_error(
2977                StatusCode::BAD_REQUEST,
2978                "ValidationException",
2979                format!("Missing the key {range_key} in the item"),
2980            ));
2981        }
2982    }
2983    Ok(())
2984}
2985
2986fn validate_key_attributes_in_key(
2987    table: &DynamoTable,
2988    key: &HashMap<String, AttributeValue>,
2989) -> Result<(), AwsServiceError> {
2990    let hash_key = table.hash_key_name();
2991    if !key.contains_key(hash_key) {
2992        return Err(AwsServiceError::aws_error(
2993            StatusCode::BAD_REQUEST,
2994            "ValidationException",
2995            format!("Missing the key {hash_key} in the item"),
2996        ));
2997    }
2998    Ok(())
2999}
3000
3001fn project_item(
3002    item: &HashMap<String, AttributeValue>,
3003    body: &Value,
3004) -> HashMap<String, AttributeValue> {
3005    let projection = body["ProjectionExpression"].as_str();
3006    match projection {
3007        Some(proj) if !proj.is_empty() => {
3008            let expr_attr_names = parse_expression_attribute_names(body);
3009            let attrs: Vec<String> = proj
3010                .split(',')
3011                .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3012                .collect();
3013            let mut result = HashMap::new();
3014            for attr in &attrs {
3015                if let Some(v) = resolve_nested_path(item, attr) {
3016                    insert_nested_value(&mut result, attr, v);
3017                }
3018            }
3019            result
3020        }
3021        _ => item.clone(),
3022    }
3023}
3024
3025/// Resolve expression attribute names within each segment of a projection path.
3026/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
3027fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3028    // Split on dots, resolve each part, rejoin
3029    let mut result = String::new();
3030    for (i, segment) in path.split('.').enumerate() {
3031        if i > 0 {
3032            result.push('.');
3033        }
3034        // A segment might be like "#n" or "people[0]" or "#attr[0]"
3035        if let Some(bracket_pos) = segment.find('[') {
3036            let key_part = &segment[..bracket_pos];
3037            let index_part = &segment[bracket_pos..];
3038            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3039            result.push_str(index_part);
3040        } else {
3041            result.push_str(&resolve_attr_name(segment, expr_attr_names));
3042        }
3043    }
3044    result
3045}
3046
3047/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
3048fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3049    let segments = parse_path_segments(path);
3050    if segments.is_empty() {
3051        return None;
3052    }
3053
3054    let first = &segments[0];
3055    let top_key = match first {
3056        PathSegment::Key(k) => k.as_str(),
3057        _ => return None,
3058    };
3059
3060    let mut current = item.get(top_key)?.clone();
3061
3062    for segment in &segments[1..] {
3063        match segment {
3064            PathSegment::Key(k) => {
3065                // Navigate into a Map: {"M": {"key": ...}}
3066                current = current.get("M")?.get(k)?.clone();
3067            }
3068            PathSegment::Index(idx) => {
3069                // Navigate into a List: {"L": [...]}
3070                current = current.get("L")?.get(*idx)?.clone();
3071            }
3072        }
3073    }
3074
3075    Some(current)
3076}
3077
3078#[derive(Debug)]
3079enum PathSegment {
3080    Key(String),
3081    Index(usize),
3082}
3083
3084/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
3085fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3086    let mut segments = Vec::new();
3087    let mut current = String::new();
3088
3089    let chars: Vec<char> = path.chars().collect();
3090    let mut i = 0;
3091    while i < chars.len() {
3092        match chars[i] {
3093            '.' => {
3094                if !current.is_empty() {
3095                    segments.push(PathSegment::Key(current.clone()));
3096                    current.clear();
3097                }
3098            }
3099            '[' => {
3100                if !current.is_empty() {
3101                    segments.push(PathSegment::Key(current.clone()));
3102                    current.clear();
3103                }
3104                i += 1;
3105                let mut num = String::new();
3106                while i < chars.len() && chars[i] != ']' {
3107                    num.push(chars[i]);
3108                    i += 1;
3109                }
3110                if let Ok(idx) = num.parse::<usize>() {
3111                    segments.push(PathSegment::Index(idx));
3112                }
3113                // skip ']'
3114            }
3115            c => {
3116                current.push(c);
3117            }
3118        }
3119        i += 1;
3120    }
3121    if !current.is_empty() {
3122        segments.push(PathSegment::Key(current));
3123    }
3124    segments
3125}
3126
3127/// Insert a value at a nested path in the result HashMap.
3128/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
3129fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3130    // Simple case: no nesting
3131    if !path.contains('.') && !path.contains('[') {
3132        result.insert(path.to_string(), value);
3133        return;
3134    }
3135
3136    let segments = parse_path_segments(path);
3137    if segments.is_empty() {
3138        return;
3139    }
3140
3141    let top_key = match &segments[0] {
3142        PathSegment::Key(k) => k.clone(),
3143        _ => return,
3144    };
3145
3146    if segments.len() == 1 {
3147        result.insert(top_key, value);
3148        return;
3149    }
3150
3151    // For nested paths, wrap the value back into the nested structure
3152    let wrapped = wrap_value_in_path(&segments[1..], value);
3153    // Merge into existing value if present
3154    let existing = result.remove(&top_key);
3155    let merged = match existing {
3156        Some(existing) => merge_attribute_values(existing, wrapped),
3157        None => wrapped,
3158    };
3159    result.insert(top_key, merged);
3160}
3161
3162/// Wrap a value in the nested path structure.
3163fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3164    if segments.is_empty() {
3165        return value;
3166    }
3167    let inner = wrap_value_in_path(&segments[1..], value);
3168    match &segments[0] {
3169        PathSegment::Key(k) => {
3170            json!({"M": {k.clone(): inner}})
3171        }
3172        PathSegment::Index(idx) => {
3173            let mut arr = vec![Value::Null; idx + 1];
3174            arr[*idx] = inner;
3175            json!({"L": arr})
3176        }
3177    }
3178}
3179
3180/// Merge two attribute values (for overlapping projections).
3181fn merge_attribute_values(a: Value, b: Value) -> Value {
3182    if let (Some(a_map), Some(b_map)) = (
3183        a.get("M").and_then(|v| v.as_object()),
3184        b.get("M").and_then(|v| v.as_object()),
3185    ) {
3186        let mut merged = a_map.clone();
3187        for (k, v) in b_map {
3188            if let Some(existing) = merged.get(k) {
3189                merged.insert(
3190                    k.clone(),
3191                    merge_attribute_values(existing.clone(), v.clone()),
3192                );
3193            } else {
3194                merged.insert(k.clone(), v.clone());
3195            }
3196        }
3197        json!({"M": merged})
3198    } else {
3199        b
3200    }
3201}
3202
3203fn evaluate_condition(
3204    condition: &str,
3205    existing: Option<&HashMap<String, AttributeValue>>,
3206    expr_attr_names: &HashMap<String, String>,
3207    expr_attr_values: &HashMap<String, Value>,
3208) -> Result<(), AwsServiceError> {
3209    let cond = condition.trim();
3210
3211    if let Some(inner) = extract_function_arg(cond, "attribute_not_exists") {
3212        let attr = resolve_attr_name(inner, expr_attr_names);
3213        match existing {
3214            Some(item) if item.contains_key(&attr) => {
3215                return Err(AwsServiceError::aws_error(
3216                    StatusCode::BAD_REQUEST,
3217                    "ConditionalCheckFailedException",
3218                    "The conditional request failed",
3219                ));
3220            }
3221            _ => return Ok(()),
3222        }
3223    }
3224
3225    if let Some(inner) = extract_function_arg(cond, "attribute_exists") {
3226        let attr = resolve_attr_name(inner, expr_attr_names);
3227        match existing {
3228            Some(item) if item.contains_key(&attr) => return Ok(()),
3229            _ => {
3230                return Err(AwsServiceError::aws_error(
3231                    StatusCode::BAD_REQUEST,
3232                    "ConditionalCheckFailedException",
3233                    "The conditional request failed",
3234                ));
3235            }
3236        }
3237    }
3238
3239    if let Some((left, op, right)) = parse_simple_comparison(cond) {
3240        let attr_name = resolve_attr_name(left.trim(), expr_attr_names);
3241        let expected = expr_attr_values.get(right.trim());
3242        let actual = existing.and_then(|item| item.get(&attr_name));
3243
3244        let result = match op {
3245            "=" => actual == expected,
3246            "<>" => actual != expected,
3247            _ => true,
3248        };
3249
3250        if !result {
3251            return Err(AwsServiceError::aws_error(
3252                StatusCode::BAD_REQUEST,
3253                "ConditionalCheckFailedException",
3254                "The conditional request failed",
3255            ));
3256        }
3257    }
3258
3259    Ok(())
3260}
3261
3262fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3263    let prefix = format!("{func_name}(");
3264    if let Some(rest) = expr.strip_prefix(&prefix) {
3265        if let Some(inner) = rest.strip_suffix(')') {
3266            return Some(inner.trim());
3267        }
3268    }
3269    None
3270}
3271
3272fn parse_simple_comparison(expr: &str) -> Option<(&str, &str, &str)> {
3273    for op in &["<>", "=", "<", ">", "<=", ">="] {
3274        if let Some(pos) = expr.find(op) {
3275            let left = &expr[..pos];
3276            let right = &expr[pos + op.len()..];
3277            return Some((left, op, right));
3278        }
3279    }
3280    None
3281}
3282
3283fn evaluate_key_condition(
3284    expr: &str,
3285    item: &HashMap<String, AttributeValue>,
3286    hash_key_name: &str,
3287    _range_key_name: Option<&str>,
3288    expr_attr_names: &HashMap<String, String>,
3289    expr_attr_values: &HashMap<String, Value>,
3290) -> bool {
3291    let parts: Vec<&str> = split_on_and(expr);
3292    for part in &parts {
3293        let part = part.trim();
3294        if !evaluate_single_key_condition(
3295            part,
3296            item,
3297            hash_key_name,
3298            expr_attr_names,
3299            expr_attr_values,
3300        ) {
3301            return false;
3302        }
3303    }
3304    true
3305}
3306
3307fn split_on_and(expr: &str) -> Vec<&str> {
3308    let mut parts = Vec::new();
3309    let mut start = 0;
3310    let len = expr.len();
3311    let mut i = 0;
3312    let mut depth = 0;
3313    while i < len {
3314        let ch = expr.as_bytes()[i];
3315        if ch == b'(' {
3316            depth += 1;
3317        } else if ch == b')' {
3318            if depth > 0 {
3319                depth -= 1;
3320            }
3321        } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
3322            parts.push(&expr[start..i]);
3323            start = i + 5;
3324            i = start;
3325            continue;
3326        }
3327        i += 1;
3328    }
3329    parts.push(&expr[start..]);
3330    parts
3331}
3332
3333fn split_on_or(expr: &str) -> Vec<&str> {
3334    let mut parts = Vec::new();
3335    let mut start = 0;
3336    let len = expr.len();
3337    let mut i = 0;
3338    let mut depth = 0;
3339    while i < len {
3340        let ch = expr.as_bytes()[i];
3341        if ch == b'(' {
3342            depth += 1;
3343        } else if ch == b')' {
3344            if depth > 0 {
3345                depth -= 1;
3346            }
3347        } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
3348            parts.push(&expr[start..i]);
3349            start = i + 4;
3350            i = start;
3351            continue;
3352        }
3353        i += 1;
3354    }
3355    parts.push(&expr[start..]);
3356    parts
3357}
3358
3359fn evaluate_single_key_condition(
3360    part: &str,
3361    item: &HashMap<String, AttributeValue>,
3362    _hash_key_name: &str,
3363    expr_attr_names: &HashMap<String, String>,
3364    expr_attr_values: &HashMap<String, Value>,
3365) -> bool {
3366    let part = part.trim();
3367
3368    // begins_with(attr, :val)
3369    if let Some(rest) = part
3370        .strip_prefix("begins_with(")
3371        .or_else(|| part.strip_prefix("begins_with ("))
3372    {
3373        if let Some(inner) = rest.strip_suffix(')') {
3374            let mut split = inner.splitn(2, ',');
3375            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3376                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3377                let val_ref = val_ref.trim();
3378                let expected = expr_attr_values.get(val_ref);
3379                let actual = item.get(&attr_name);
3380                return match (actual, expected) {
3381                    (Some(a), Some(e)) => {
3382                        let a_str = extract_string_value(a);
3383                        let e_str = extract_string_value(e);
3384                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3385                    }
3386                    _ => false,
3387                };
3388            }
3389        }
3390        return false;
3391    }
3392
3393    // BETWEEN
3394    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
3395        let attr_part = part[..between_pos].trim();
3396        let attr_name = resolve_attr_name(attr_part, expr_attr_names);
3397        let range_part = &part[between_pos + 7..];
3398        if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
3399            let lo_ref = range_part[..and_pos].trim();
3400            let hi_ref = range_part[and_pos + 5..].trim();
3401            let lo = expr_attr_values.get(lo_ref);
3402            let hi = expr_attr_values.get(hi_ref);
3403            let actual = item.get(&attr_name);
3404            return match (actual, lo, hi) {
3405                (Some(a), Some(l), Some(h)) => {
3406                    compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
3407                        && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
3408                }
3409                _ => false,
3410            };
3411        }
3412    }
3413
3414    // Simple comparison: attr <op> :val
3415    for op in &["<=", ">=", "<>", "=", "<", ">"] {
3416        if let Some(pos) = part.find(op) {
3417            let left = part[..pos].trim();
3418            let right = part[pos + op.len()..].trim();
3419            let attr_name = resolve_attr_name(left, expr_attr_names);
3420            let expected = expr_attr_values.get(right);
3421            let actual = item.get(&attr_name);
3422
3423            return match *op {
3424                "=" => actual == expected,
3425                "<>" => actual != expected,
3426                "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
3427                ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
3428                "<=" => {
3429                    let cmp = compare_attribute_values(actual, expected);
3430                    cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
3431                }
3432                ">=" => {
3433                    let cmp = compare_attribute_values(actual, expected);
3434                    cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
3435                }
3436                _ => true,
3437            };
3438        }
3439    }
3440
3441    true
3442}
3443
3444fn extract_string_value(val: &Value) -> Option<String> {
3445    val.get("S")
3446        .and_then(|v| v.as_str())
3447        .map(|s| s.to_string())
3448        .or_else(|| val.get("N").and_then(|v| v.as_str()).map(|n| n.to_string()))
3449}
3450
3451fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
3452    match (a, b) {
3453        (None, None) => std::cmp::Ordering::Equal,
3454        (None, Some(_)) => std::cmp::Ordering::Less,
3455        (Some(_), None) => std::cmp::Ordering::Greater,
3456        (Some(a), Some(b)) => {
3457            let a_type = attribute_type_and_value(a);
3458            let b_type = attribute_type_and_value(b);
3459            match (a_type, b_type) {
3460                (Some(("S", a_val)), Some(("S", b_val))) => {
3461                    let a_str = a_val.as_str().unwrap_or("");
3462                    let b_str = b_val.as_str().unwrap_or("");
3463                    a_str.cmp(b_str)
3464                }
3465                (Some(("N", a_val)), Some(("N", b_val))) => {
3466                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3467                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3468                    a_num
3469                        .partial_cmp(&b_num)
3470                        .unwrap_or(std::cmp::Ordering::Equal)
3471                }
3472                (Some(("B", a_val)), Some(("B", b_val))) => {
3473                    let a_str = a_val.as_str().unwrap_or("");
3474                    let b_str = b_val.as_str().unwrap_or("");
3475                    a_str.cmp(b_str)
3476                }
3477                _ => std::cmp::Ordering::Equal,
3478            }
3479        }
3480    }
3481}
3482
3483fn evaluate_filter_expression(
3484    expr: &str,
3485    item: &HashMap<String, AttributeValue>,
3486    expr_attr_names: &HashMap<String, String>,
3487    expr_attr_values: &HashMap<String, Value>,
3488) -> bool {
3489    let trimmed = expr.trim();
3490
3491    // Split on OR first (lower precedence), respecting parentheses
3492    let or_parts = split_on_or(trimmed);
3493    if or_parts.len() > 1 {
3494        return or_parts.iter().any(|part| {
3495            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3496        });
3497    }
3498
3499    // Then split on AND (higher precedence), respecting parentheses
3500    let and_parts = split_on_and(trimmed);
3501    if and_parts.len() > 1 {
3502        return and_parts.iter().all(|part| {
3503            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3504        });
3505    }
3506
3507    // Strip outer parentheses if present
3508    let stripped = strip_outer_parens(trimmed);
3509    if stripped != trimmed {
3510        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
3511    }
3512
3513    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
3514}
3515
3516/// Strip matching outer parentheses from an expression.
3517fn strip_outer_parens(expr: &str) -> &str {
3518    let trimmed = expr.trim();
3519    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
3520        return trimmed;
3521    }
3522    // Verify the outer parens actually match each other
3523    let inner = &trimmed[1..trimmed.len() - 1];
3524    let mut depth = 0;
3525    for ch in inner.bytes() {
3526        match ch {
3527            b'(' => depth += 1,
3528            b')' => {
3529                if depth == 0 {
3530                    return trimmed; // closing paren matches something inside, not the outer one
3531                }
3532                depth -= 1;
3533            }
3534            _ => {}
3535        }
3536    }
3537    if depth == 0 {
3538        inner
3539    } else {
3540        trimmed
3541    }
3542}
3543
3544fn evaluate_single_filter_condition(
3545    part: &str,
3546    item: &HashMap<String, AttributeValue>,
3547    expr_attr_names: &HashMap<String, String>,
3548    expr_attr_values: &HashMap<String, Value>,
3549) -> bool {
3550    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
3551        let attr = resolve_attr_name(inner, expr_attr_names);
3552        return item.contains_key(&attr);
3553    }
3554
3555    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
3556        let attr = resolve_attr_name(inner, expr_attr_names);
3557        return !item.contains_key(&attr);
3558    }
3559
3560    if let Some(rest) = part
3561        .strip_prefix("begins_with(")
3562        .or_else(|| part.strip_prefix("begins_with ("))
3563    {
3564        if let Some(inner) = rest.strip_suffix(')') {
3565            let mut split = inner.splitn(2, ',');
3566            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3567                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3568                let expected = expr_attr_values.get(val_ref.trim());
3569                let actual = item.get(&attr_name);
3570                return match (actual, expected) {
3571                    (Some(a), Some(e)) => {
3572                        let a_str = extract_string_value(a);
3573                        let e_str = extract_string_value(e);
3574                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3575                    }
3576                    _ => false,
3577                };
3578            }
3579        }
3580    }
3581
3582    if let Some(rest) = part
3583        .strip_prefix("contains(")
3584        .or_else(|| part.strip_prefix("contains ("))
3585    {
3586        if let Some(inner) = rest.strip_suffix(')') {
3587            let mut split = inner.splitn(2, ',');
3588            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3589                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3590                let expected = expr_attr_values.get(val_ref.trim());
3591                let actual = item.get(&attr_name);
3592                return match (actual, expected) {
3593                    (Some(a), Some(e)) => {
3594                        let a_str = extract_string_value(a);
3595                        let e_str = extract_string_value(e);
3596                        matches!((a_str, e_str), (Some(a), Some(e)) if a.contains(&e))
3597                    }
3598                    _ => false,
3599                };
3600            }
3601        }
3602    }
3603
3604    evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
3605}
3606
3607fn apply_update_expression(
3608    item: &mut HashMap<String, AttributeValue>,
3609    expr: &str,
3610    expr_attr_names: &HashMap<String, String>,
3611    expr_attr_values: &HashMap<String, Value>,
3612) -> Result<(), AwsServiceError> {
3613    let clauses = parse_update_clauses(expr);
3614    for (action, assignments) in &clauses {
3615        match action.to_ascii_uppercase().as_str() {
3616            "SET" => {
3617                for assignment in assignments {
3618                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3619                }
3620            }
3621            "REMOVE" => {
3622                for attr_ref in assignments {
3623                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3624                    item.remove(&attr);
3625                }
3626            }
3627            "ADD" => {
3628                for assignment in assignments {
3629                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3630                }
3631            }
3632            "DELETE" => {
3633                for assignment in assignments {
3634                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3635                }
3636            }
3637            _ => {}
3638        }
3639    }
3640    Ok(())
3641}
3642
3643fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
3644    let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
3645    let upper = expr.to_ascii_uppercase();
3646    let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
3647    let mut positions: Vec<(usize, &str)> = Vec::new();
3648
3649    for kw in &keywords {
3650        let mut search_from = 0;
3651        while let Some(pos) = upper[search_from..].find(kw) {
3652            let abs_pos = search_from + pos;
3653            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
3654            let after_pos = abs_pos + kw.len();
3655            let after_ok =
3656                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
3657            if before_ok && after_ok {
3658                positions.push((abs_pos, kw));
3659            }
3660            search_from = abs_pos + kw.len();
3661        }
3662    }
3663
3664    positions.sort_by_key(|(pos, _)| *pos);
3665
3666    for (i, &(pos, kw)) in positions.iter().enumerate() {
3667        let start = pos + kw.len();
3668        let end = if i + 1 < positions.len() {
3669            positions[i + 1].0
3670        } else {
3671            expr.len()
3672        };
3673        let content = expr[start..end].trim();
3674        let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
3675        clauses.push((kw.to_string(), assignments));
3676    }
3677
3678    clauses
3679}
3680
3681fn apply_set_assignment(
3682    item: &mut HashMap<String, AttributeValue>,
3683    assignment: &str,
3684    expr_attr_names: &HashMap<String, String>,
3685    expr_attr_values: &HashMap<String, Value>,
3686) -> Result<(), AwsServiceError> {
3687    let Some((left, right)) = assignment.split_once('=') else {
3688        return Ok(());
3689    };
3690
3691    let attr = resolve_attr_name(left.trim(), expr_attr_names);
3692    let right = right.trim();
3693
3694    // if_not_exists(attr, :val)
3695    if let Some(rest) = right
3696        .strip_prefix("if_not_exists(")
3697        .or_else(|| right.strip_prefix("if_not_exists ("))
3698    {
3699        if let Some(inner) = rest.strip_suffix(')') {
3700            let mut split = inner.splitn(2, ',');
3701            if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
3702                let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
3703                if !item.contains_key(&check_name) {
3704                    if let Some(val) = expr_attr_values.get(default_ref.trim()) {
3705                        item.insert(attr, val.clone());
3706                    }
3707                }
3708                return Ok(());
3709            }
3710        }
3711    }
3712
3713    // list_append(a, b)
3714    if let Some(rest) = right
3715        .strip_prefix("list_append(")
3716        .or_else(|| right.strip_prefix("list_append ("))
3717    {
3718        if let Some(inner) = rest.strip_suffix(')') {
3719            let mut split = inner.splitn(2, ',');
3720            if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
3721                let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
3722                let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
3723
3724                let mut merged = Vec::new();
3725                if let Some(Value::Object(obj)) = &a_val {
3726                    if let Some(Value::Array(arr)) = obj.get("L") {
3727                        merged.extend(arr.clone());
3728                    }
3729                }
3730                if let Some(Value::Object(obj)) = &b_val {
3731                    if let Some(Value::Array(arr)) = obj.get("L") {
3732                        merged.extend(arr.clone());
3733                    }
3734                }
3735
3736                item.insert(attr, json!({"L": merged}));
3737                return Ok(());
3738            }
3739        }
3740    }
3741
3742    // Arithmetic: attr + :val or attr - :val
3743    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
3744        let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
3745        let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
3746
3747        let left_num = extract_number(&left_val).unwrap_or(0.0);
3748        let right_num = extract_number(&right_val).unwrap_or(0.0);
3749
3750        let result = if is_add {
3751            left_num + right_num
3752        } else {
3753            left_num - right_num
3754        };
3755
3756        let num_str = if result == result.trunc() {
3757            format!("{}", result as i64)
3758        } else {
3759            format!("{result}")
3760        };
3761
3762        item.insert(attr, json!({"N": num_str}));
3763        return Ok(());
3764    }
3765
3766    // Simple assignment
3767    let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
3768    if let Some(v) = val {
3769        item.insert(attr, v);
3770    }
3771
3772    Ok(())
3773}
3774
3775fn resolve_value(
3776    reference: &str,
3777    item: &HashMap<String, AttributeValue>,
3778    expr_attr_names: &HashMap<String, String>,
3779    expr_attr_values: &HashMap<String, Value>,
3780) -> Option<Value> {
3781    let reference = reference.trim();
3782    if reference.starts_with(':') {
3783        expr_attr_values.get(reference).cloned()
3784    } else {
3785        let attr_name = resolve_attr_name(reference, expr_attr_names);
3786        item.get(&attr_name).cloned()
3787    }
3788}
3789
3790fn extract_number(val: &Option<Value>) -> Option<f64> {
3791    val.as_ref()
3792        .and_then(|v| v.get("N"))
3793        .and_then(|n| n.as_str())
3794        .and_then(|s| s.parse().ok())
3795}
3796
3797fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
3798    let mut depth = 0;
3799    for (i, c) in expr.char_indices() {
3800        match c {
3801            '(' => depth += 1,
3802            ')' => depth -= 1,
3803            '+' if depth == 0 && i > 0 => {
3804                return Some((&expr[..i], &expr[i + 1..], true));
3805            }
3806            '-' if depth == 0 && i > 0 => {
3807                return Some((&expr[..i], &expr[i + 1..], false));
3808            }
3809            _ => {}
3810        }
3811    }
3812    None
3813}
3814
3815fn apply_add_assignment(
3816    item: &mut HashMap<String, AttributeValue>,
3817    assignment: &str,
3818    expr_attr_names: &HashMap<String, String>,
3819    expr_attr_values: &HashMap<String, Value>,
3820) -> Result<(), AwsServiceError> {
3821    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
3822    if parts.len() != 2 {
3823        return Ok(());
3824    }
3825
3826    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
3827    let val_ref = parts[1].trim();
3828    let add_val = expr_attr_values.get(val_ref);
3829
3830    if let Some(add_val) = add_val {
3831        if let Some(existing) = item.get(&attr) {
3832            if let (Some(existing_num), Some(add_num)) = (
3833                extract_number(&Some(existing.clone())),
3834                extract_number(&Some(add_val.clone())),
3835            ) {
3836                let result = existing_num + add_num;
3837                let num_str = if result == result.trunc() {
3838                    format!("{}", result as i64)
3839                } else {
3840                    format!("{result}")
3841                };
3842                item.insert(attr, json!({"N": num_str}));
3843            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
3844                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
3845                    let mut merged: Vec<Value> = existing_set.clone();
3846                    for v in add_set {
3847                        if !merged.contains(v) {
3848                            merged.push(v.clone());
3849                        }
3850                    }
3851                    item.insert(attr, json!({"SS": merged}));
3852                }
3853            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
3854                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
3855                    let mut merged: Vec<Value> = existing_set.clone();
3856                    for v in add_set {
3857                        if !merged.contains(v) {
3858                            merged.push(v.clone());
3859                        }
3860                    }
3861                    item.insert(attr, json!({"NS": merged}));
3862                }
3863            }
3864        } else {
3865            item.insert(attr, add_val.clone());
3866        }
3867    }
3868
3869    Ok(())
3870}
3871
3872fn apply_delete_assignment(
3873    item: &mut HashMap<String, AttributeValue>,
3874    assignment: &str,
3875    expr_attr_names: &HashMap<String, String>,
3876    expr_attr_values: &HashMap<String, Value>,
3877) -> Result<(), AwsServiceError> {
3878    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
3879    if parts.len() != 2 {
3880        return Ok(());
3881    }
3882
3883    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
3884    let val_ref = parts[1].trim();
3885    let del_val = expr_attr_values.get(val_ref);
3886
3887    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
3888        if let (Some(existing_set), Some(del_set)) = (
3889            existing.get("SS").and_then(|v| v.as_array()),
3890            del_val.get("SS").and_then(|v| v.as_array()),
3891        ) {
3892            let filtered: Vec<Value> = existing_set
3893                .iter()
3894                .filter(|v| !del_set.contains(v))
3895                .cloned()
3896                .collect();
3897            if filtered.is_empty() {
3898                item.remove(&attr);
3899            } else {
3900                item.insert(attr, json!({"SS": filtered}));
3901            }
3902        } else if let (Some(existing_set), Some(del_set)) = (
3903            existing.get("NS").and_then(|v| v.as_array()),
3904            del_val.get("NS").and_then(|v| v.as_array()),
3905        ) {
3906            let filtered: Vec<Value> = existing_set
3907                .iter()
3908                .filter(|v| !del_set.contains(v))
3909                .cloned()
3910                .collect();
3911            if filtered.is_empty() {
3912                item.remove(&attr);
3913            } else {
3914                item.insert(attr, json!({"NS": filtered}));
3915            }
3916        }
3917    }
3918
3919    Ok(())
3920}
3921
3922#[allow(clippy::too_many_arguments)]
3923fn build_table_description_json(
3924    arn: &str,
3925    key_schema: &[KeySchemaElement],
3926    attribute_definitions: &[AttributeDefinition],
3927    provisioned_throughput: &ProvisionedThroughput,
3928    gsi: &[GlobalSecondaryIndex],
3929    lsi: &[LocalSecondaryIndex],
3930    billing_mode: &str,
3931    created_at: chrono::DateTime<chrono::Utc>,
3932    item_count: i64,
3933    size_bytes: i64,
3934    status: &str,
3935) -> Value {
3936    let table_name = arn.rsplit('/').next().unwrap_or("");
3937    let creation_timestamp =
3938        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
3939
3940    let ks: Vec<Value> = key_schema
3941        .iter()
3942        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3943        .collect();
3944
3945    let ad: Vec<Value> = attribute_definitions
3946        .iter()
3947        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3948        .collect();
3949
3950    let mut desc = json!({
3951        "TableName": table_name,
3952        "TableArn": arn,
3953        "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
3954        "TableStatus": status,
3955        "KeySchema": ks,
3956        "AttributeDefinitions": ad,
3957        "CreationDateTime": creation_timestamp,
3958        "ItemCount": item_count,
3959        "TableSizeBytes": size_bytes,
3960        "BillingModeSummary": { "BillingMode": billing_mode },
3961    });
3962
3963    if billing_mode != "PAY_PER_REQUEST" {
3964        desc["ProvisionedThroughput"] = json!({
3965            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
3966            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
3967            "NumberOfDecreasesToday": 0,
3968        });
3969    } else {
3970        desc["ProvisionedThroughput"] = json!({
3971            "ReadCapacityUnits": 0,
3972            "WriteCapacityUnits": 0,
3973            "NumberOfDecreasesToday": 0,
3974        });
3975    }
3976
3977    if !gsi.is_empty() {
3978        let gsi_json: Vec<Value> = gsi
3979            .iter()
3980            .map(|g| {
3981                let gks: Vec<Value> = g
3982                    .key_schema
3983                    .iter()
3984                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3985                    .collect();
3986                let mut idx = json!({
3987                    "IndexName": g.index_name,
3988                    "KeySchema": gks,
3989                    "Projection": { "ProjectionType": g.projection.projection_type },
3990                    "IndexStatus": "ACTIVE",
3991                    "IndexArn": format!("{arn}/index/{}", g.index_name),
3992                    "ItemCount": 0,
3993                    "IndexSizeBytes": 0,
3994                });
3995                if !g.projection.non_key_attributes.is_empty() {
3996                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
3997                }
3998                if let Some(ref pt) = g.provisioned_throughput {
3999                    idx["ProvisionedThroughput"] = json!({
4000                        "ReadCapacityUnits": pt.read_capacity_units,
4001                        "WriteCapacityUnits": pt.write_capacity_units,
4002                        "NumberOfDecreasesToday": 0,
4003                    });
4004                }
4005                idx
4006            })
4007            .collect();
4008        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
4009    }
4010
4011    if !lsi.is_empty() {
4012        let lsi_json: Vec<Value> = lsi
4013            .iter()
4014            .map(|l| {
4015                let lks: Vec<Value> = l
4016                    .key_schema
4017                    .iter()
4018                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4019                    .collect();
4020                let mut idx = json!({
4021                    "IndexName": l.index_name,
4022                    "KeySchema": lks,
4023                    "Projection": { "ProjectionType": l.projection.projection_type },
4024                    "IndexArn": format!("{arn}/index/{}", l.index_name),
4025                    "ItemCount": 0,
4026                    "IndexSizeBytes": 0,
4027                });
4028                if !l.projection.non_key_attributes.is_empty() {
4029                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
4030                }
4031                idx
4032            })
4033            .collect();
4034        desc["LocalSecondaryIndexes"] = json!(lsi_json);
4035    }
4036
4037    desc
4038}
4039
4040fn build_table_description(table: &DynamoTable) -> Value {
4041    build_table_description_json(
4042        &table.arn,
4043        &table.key_schema,
4044        &table.attribute_definitions,
4045        &table.provisioned_throughput,
4046        &table.gsi,
4047        &table.lsi,
4048        &table.billing_mode,
4049        table.created_at,
4050        table.item_count,
4051        table.size_bytes,
4052        &table.status,
4053    )
4054}
4055
4056fn execute_partiql_statement(
4057    state: &SharedDynamoDbState,
4058    statement: &str,
4059    parameters: &[Value],
4060) -> Result<AwsResponse, AwsServiceError> {
4061    let trimmed = statement.trim();
4062    let upper = trimmed.to_ascii_uppercase();
4063
4064    if upper.starts_with("SELECT") {
4065        execute_partiql_select(state, trimmed, parameters)
4066    } else if upper.starts_with("INSERT") {
4067        execute_partiql_insert(state, trimmed, parameters)
4068    } else if upper.starts_with("UPDATE") {
4069        execute_partiql_update(state, trimmed, parameters)
4070    } else if upper.starts_with("DELETE") {
4071        execute_partiql_delete(state, trimmed, parameters)
4072    } else {
4073        Err(AwsServiceError::aws_error(
4074            StatusCode::BAD_REQUEST,
4075            "ValidationException",
4076            format!("Unsupported PartiQL statement: {trimmed}"),
4077        ))
4078    }
4079}
4080
4081/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
4082fn execute_partiql_select(
4083    state: &SharedDynamoDbState,
4084    statement: &str,
4085    parameters: &[Value],
4086) -> Result<AwsResponse, AwsServiceError> {
4087    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
4088    let upper = statement.to_ascii_uppercase();
4089    let from_pos = upper.find("FROM").ok_or_else(|| {
4090        AwsServiceError::aws_error(
4091            StatusCode::BAD_REQUEST,
4092            "ValidationException",
4093            "Invalid SELECT statement: missing FROM",
4094        )
4095    })?;
4096
4097    let after_from = statement[from_pos + 4..].trim();
4098    let (table_name, rest) = parse_partiql_table_name(after_from);
4099
4100    let state = state.read();
4101    let table = get_table(&state.tables, &table_name)?;
4102
4103    let rest_upper = rest.trim().to_ascii_uppercase();
4104    if rest_upper.starts_with("WHERE") {
4105        let where_clause = rest.trim()[5..].trim();
4106        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
4107        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
4108        DynamoDbService::ok_json(json!({ "Items": items }))
4109    } else {
4110        // No WHERE, return all items
4111        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
4112        DynamoDbService::ok_json(json!({ "Items": items }))
4113    }
4114}
4115
4116fn execute_partiql_insert(
4117    state: &SharedDynamoDbState,
4118    statement: &str,
4119    parameters: &[Value],
4120) -> Result<AwsResponse, AwsServiceError> {
4121    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
4122    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
4123    let upper = statement.to_ascii_uppercase();
4124    let into_pos = upper.find("INTO").ok_or_else(|| {
4125        AwsServiceError::aws_error(
4126            StatusCode::BAD_REQUEST,
4127            "ValidationException",
4128            "Invalid INSERT statement: missing INTO",
4129        )
4130    })?;
4131
4132    let after_into = statement[into_pos + 4..].trim();
4133    let (table_name, rest) = parse_partiql_table_name(after_into);
4134
4135    let rest_upper = rest.trim().to_ascii_uppercase();
4136    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
4137        AwsServiceError::aws_error(
4138            StatusCode::BAD_REQUEST,
4139            "ValidationException",
4140            "Invalid INSERT statement: missing VALUE",
4141        )
4142    })?;
4143
4144    let value_str = rest.trim()[value_pos + 5..].trim();
4145    let item = parse_partiql_value_object(value_str, parameters)?;
4146
4147    let mut state = state.write();
4148    let table = get_table_mut(&mut state.tables, &table_name)?;
4149    let key = extract_key(table, &item);
4150    if table.find_item_index(&key).is_some() {
4151        // DynamoDB PartiQL INSERT fails if item exists
4152        return Err(AwsServiceError::aws_error(
4153            StatusCode::BAD_REQUEST,
4154            "DuplicateItemException",
4155            "Duplicate primary key exists in table",
4156        ));
4157    } else {
4158        table.items.push(item);
4159    }
4160    table.recalculate_stats();
4161
4162    DynamoDbService::ok_json(json!({}))
4163}
4164
4165fn execute_partiql_update(
4166    state: &SharedDynamoDbState,
4167    statement: &str,
4168    parameters: &[Value],
4169) -> Result<AwsResponse, AwsServiceError> {
4170    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
4171    // or: UPDATE "tablename" SET attr=? WHERE pk=?
4172    let after_update = statement[6..].trim(); // skip "UPDATE"
4173    let (table_name, rest) = parse_partiql_table_name(after_update);
4174
4175    let rest_upper = rest.trim().to_ascii_uppercase();
4176    let set_pos = rest_upper.find("SET").ok_or_else(|| {
4177        AwsServiceError::aws_error(
4178            StatusCode::BAD_REQUEST,
4179            "ValidationException",
4180            "Invalid UPDATE statement: missing SET",
4181        )
4182    })?;
4183
4184    let after_set = rest.trim()[set_pos + 3..].trim();
4185
4186    // Split on WHERE
4187    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
4188    let (set_clause, where_clause) = if let Some(wp) = where_pos {
4189        (&after_set[..wp], after_set[wp + 5..].trim())
4190    } else {
4191        (after_set, "")
4192    };
4193
4194    let mut state = state.write();
4195    let table = get_table_mut(&mut state.tables, &table_name)?;
4196
4197    let matched_indices = if !where_clause.is_empty() {
4198        find_partiql_where_indices(table, where_clause, parameters)?
4199    } else {
4200        (0..table.items.len()).collect()
4201    };
4202
4203    // Parse SET assignments: attr=value, attr2=value2
4204    let param_offset = count_params_in_str(where_clause);
4205    let assignments: Vec<&str> = set_clause.split(',').collect();
4206    for idx in &matched_indices {
4207        let mut local_offset = param_offset;
4208        for assignment in &assignments {
4209            let assignment = assignment.trim();
4210            if let Some((attr, val_str)) = assignment.split_once('=') {
4211                let attr = attr.trim().trim_matches('"');
4212                let val_str = val_str.trim();
4213                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
4214                if let Some(v) = value {
4215                    table.items[*idx].insert(attr.to_string(), v);
4216                }
4217            }
4218        }
4219    }
4220    table.recalculate_stats();
4221
4222    DynamoDbService::ok_json(json!({}))
4223}
4224
4225fn execute_partiql_delete(
4226    state: &SharedDynamoDbState,
4227    statement: &str,
4228    parameters: &[Value],
4229) -> Result<AwsResponse, AwsServiceError> {
4230    // Pattern: DELETE FROM "tablename" WHERE pk='val'
4231    let upper = statement.to_ascii_uppercase();
4232    let from_pos = upper.find("FROM").ok_or_else(|| {
4233        AwsServiceError::aws_error(
4234            StatusCode::BAD_REQUEST,
4235            "ValidationException",
4236            "Invalid DELETE statement: missing FROM",
4237        )
4238    })?;
4239
4240    let after_from = statement[from_pos + 4..].trim();
4241    let (table_name, rest) = parse_partiql_table_name(after_from);
4242
4243    let rest_upper = rest.trim().to_ascii_uppercase();
4244    if !rest_upper.starts_with("WHERE") {
4245        return Err(AwsServiceError::aws_error(
4246            StatusCode::BAD_REQUEST,
4247            "ValidationException",
4248            "DELETE requires a WHERE clause",
4249        ));
4250    }
4251    let where_clause = rest.trim()[5..].trim();
4252
4253    let mut state = state.write();
4254    let table = get_table_mut(&mut state.tables, &table_name)?;
4255
4256    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
4257    // Remove from highest index first to avoid invalidating lower indices
4258    indices.sort_unstable();
4259    indices.reverse();
4260    for idx in indices {
4261        table.items.remove(idx);
4262    }
4263    table.recalculate_stats();
4264
4265    DynamoDbService::ok_json(json!({}))
4266}
4267
4268/// Parse a table name that may be quoted with double quotes.
4269/// Returns (table_name, rest_of_string).
4270fn parse_partiql_table_name(s: &str) -> (String, &str) {
4271    let s = s.trim();
4272    if let Some(stripped) = s.strip_prefix('"') {
4273        // Quoted name
4274        if let Some(end) = stripped.find('"') {
4275            let name = &stripped[..end];
4276            let rest = &stripped[end + 1..];
4277            (name.to_string(), rest)
4278        } else {
4279            let end = s.find(' ').unwrap_or(s.len());
4280            (s[..end].trim_matches('"').to_string(), &s[end..])
4281        }
4282    } else {
4283        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
4284        (s[..end].to_string(), &s[end..])
4285    }
4286}
4287
4288/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
4289/// Returns matching items.
4290fn evaluate_partiql_where<'a>(
4291    table: &'a DynamoTable,
4292    where_clause: &str,
4293    parameters: &[Value],
4294) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
4295    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
4296    Ok(indices.iter().map(|i| &table.items[*i]).collect())
4297}
4298
4299fn find_partiql_where_indices(
4300    table: &DynamoTable,
4301    where_clause: &str,
4302    parameters: &[Value],
4303) -> Result<Vec<usize>, AwsServiceError> {
4304    // Support: col = 'val' AND col2 = 'val2'  or  col = ? AND col2 = ?
4305    // Case-insensitive AND splitting
4306    let upper = where_clause.to_uppercase();
4307    let conditions = if upper.contains(" AND ") {
4308        // Find positions of " AND " case-insensitively and split
4309        let mut parts = Vec::new();
4310        let mut last = 0;
4311        for (i, _) in upper.match_indices(" AND ") {
4312            parts.push(where_clause[last..i].trim());
4313            last = i + 5;
4314        }
4315        parts.push(where_clause[last..].trim());
4316        parts
4317    } else {
4318        vec![where_clause.trim()]
4319    };
4320
4321    let mut param_idx = 0usize;
4322    let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
4323
4324    for cond in &conditions {
4325        let cond = cond.trim();
4326        if let Some((left, right)) = cond.split_once('=') {
4327            let attr = left.trim().trim_matches('"').to_string();
4328            let val_str = right.trim();
4329            let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
4330            if let Some(v) = value {
4331                parsed_conditions.push((attr, v));
4332            }
4333        }
4334    }
4335
4336    let mut indices = Vec::new();
4337    for (i, item) in table.items.iter().enumerate() {
4338        let all_match = parsed_conditions
4339            .iter()
4340            .all(|(attr, expected)| item.get(attr) == Some(expected));
4341        if all_match {
4342            indices.push(i);
4343        }
4344    }
4345
4346    Ok(indices)
4347}
4348
4349/// Parse a PartiQL literal value. Supports:
4350/// - 'string' -> {"S": "string"}
4351/// - 123 -> {"N": "123"}
4352/// - ? -> parameter from list
4353fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
4354    let s = s.trim();
4355    if s == "?" {
4356        let idx = *param_idx;
4357        *param_idx += 1;
4358        parameters.get(idx).cloned()
4359    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
4360        let inner = &s[1..s.len() - 1];
4361        Some(json!({"S": inner}))
4362    } else if let Ok(n) = s.parse::<f64>() {
4363        let num_str = if n == n.trunc() {
4364            format!("{}", n as i64)
4365        } else {
4366            format!("{n}")
4367        };
4368        Some(json!({"N": num_str}))
4369    } else {
4370        None
4371    }
4372}
4373
4374/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
4375fn parse_partiql_value_object(
4376    s: &str,
4377    parameters: &[Value],
4378) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
4379    let s = s.trim();
4380    let inner = s
4381        .strip_prefix('{')
4382        .and_then(|s| s.strip_suffix('}'))
4383        .ok_or_else(|| {
4384            AwsServiceError::aws_error(
4385                StatusCode::BAD_REQUEST,
4386                "ValidationException",
4387                "Invalid VALUE: expected object literal",
4388            )
4389        })?;
4390
4391    let mut item = HashMap::new();
4392    let mut param_idx = 0usize;
4393
4394    // Simple comma-separated key:value parsing
4395    for pair in split_partiql_pairs(inner) {
4396        let pair = pair.trim();
4397        if pair.is_empty() {
4398            continue;
4399        }
4400        if let Some((key_part, val_part)) = pair.split_once(':') {
4401            let key = key_part
4402                .trim()
4403                .trim_matches('\'')
4404                .trim_matches('"')
4405                .to_string();
4406            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
4407                item.insert(key, val);
4408            }
4409        }
4410    }
4411
4412    Ok(item)
4413}
4414
4415/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
4416fn split_partiql_pairs(s: &str) -> Vec<&str> {
4417    let mut parts = Vec::new();
4418    let mut start = 0;
4419    let mut depth = 0;
4420    let mut in_quote = false;
4421
4422    for (i, c) in s.char_indices() {
4423        match c {
4424            '\'' if !in_quote => in_quote = true,
4425            '\'' if in_quote => in_quote = false,
4426            '{' if !in_quote => depth += 1,
4427            '}' if !in_quote => depth -= 1,
4428            ',' if !in_quote && depth == 0 => {
4429                parts.push(&s[start..i]);
4430                start = i + 1;
4431            }
4432            _ => {}
4433        }
4434    }
4435    parts.push(&s[start..]);
4436    parts
4437}
4438
4439/// Count ? parameters in a string.
4440fn count_params_in_str(s: &str) -> usize {
4441    s.chars().filter(|c| *c == '?').count()
4442}
4443
4444#[cfg(test)]
4445mod tests {
4446    use super::*;
4447    use serde_json::json;
4448
4449    #[test]
4450    fn test_parse_update_clauses_set() {
4451        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
4452        assert_eq!(clauses.len(), 1);
4453        assert_eq!(clauses[0].0, "SET");
4454        assert_eq!(clauses[0].1.len(), 2);
4455    }
4456
4457    #[test]
4458    fn test_parse_update_clauses_set_and_remove() {
4459        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
4460        assert_eq!(clauses.len(), 2);
4461        assert_eq!(clauses[0].0, "SET");
4462        assert_eq!(clauses[1].0, "REMOVE");
4463    }
4464
4465    #[test]
4466    fn test_evaluate_key_condition_simple() {
4467        let mut item = HashMap::new();
4468        item.insert("pk".to_string(), json!({"S": "user1"}));
4469        item.insert("sk".to_string(), json!({"S": "order1"}));
4470
4471        let mut expr_values = HashMap::new();
4472        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
4473
4474        assert!(evaluate_key_condition(
4475            "pk = :pk",
4476            &item,
4477            "pk",
4478            Some("sk"),
4479            &HashMap::new(),
4480            &expr_values,
4481        ));
4482    }
4483
4484    #[test]
4485    fn test_compare_attribute_values_numbers() {
4486        let a = json!({"N": "10"});
4487        let b = json!({"N": "20"});
4488        assert_eq!(
4489            compare_attribute_values(Some(&a), Some(&b)),
4490            std::cmp::Ordering::Less
4491        );
4492    }
4493
4494    #[test]
4495    fn test_compare_attribute_values_strings() {
4496        let a = json!({"S": "apple"});
4497        let b = json!({"S": "banana"});
4498        assert_eq!(
4499            compare_attribute_values(Some(&a), Some(&b)),
4500            std::cmp::Ordering::Less
4501        );
4502    }
4503
4504    #[test]
4505    fn test_split_on_and() {
4506        let parts = split_on_and("pk = :pk AND sk > :sk");
4507        assert_eq!(parts.len(), 2);
4508        assert_eq!(parts[0].trim(), "pk = :pk");
4509        assert_eq!(parts[1].trim(), "sk > :sk");
4510    }
4511
4512    #[test]
4513    fn test_split_on_and_respects_parentheses() {
4514        // Before fix: split_on_and would split inside the parens
4515        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
4516        // Should NOT split on the AND inside parentheses
4517        assert_eq!(parts.len(), 1);
4518        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
4519    }
4520
4521    #[test]
4522    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
4523        // (a AND b) OR c — should match when c is true but a is false
4524        let mut item = HashMap::new();
4525        item.insert("x".to_string(), json!({"S": "no"}));
4526        item.insert("y".to_string(), json!({"S": "no"}));
4527        item.insert("z".to_string(), json!({"S": "yes"}));
4528
4529        let mut expr_values = HashMap::new();
4530        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
4531
4532        // x=yes AND y=yes => false, but z=yes => true => overall true
4533        let result = evaluate_filter_expression(
4534            "(x = :yes AND y = :yes) OR z = :yes",
4535            &item,
4536            &HashMap::new(),
4537            &expr_values,
4538        );
4539        assert!(result, "should match because z = :yes is true");
4540
4541        // x=yes AND y=yes => false, z=yes => false => overall false
4542        let mut item2 = HashMap::new();
4543        item2.insert("x".to_string(), json!({"S": "no"}));
4544        item2.insert("y".to_string(), json!({"S": "no"}));
4545        item2.insert("z".to_string(), json!({"S": "no"}));
4546
4547        let result2 = evaluate_filter_expression(
4548            "(x = :yes AND y = :yes) OR z = :yes",
4549            &item2,
4550            &HashMap::new(),
4551            &expr_values,
4552        );
4553        assert!(!result2, "should not match because nothing is true");
4554    }
4555
4556    #[test]
4557    fn test_project_item_nested_path() {
4558        // Item with a list attribute containing maps
4559        let mut item = HashMap::new();
4560        item.insert("pk".to_string(), json!({"S": "key1"}));
4561        item.insert(
4562            "data".to_string(),
4563            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
4564        );
4565
4566        let body = json!({
4567            "ProjectionExpression": "data[0].name"
4568        });
4569
4570        let projected = project_item(&item, &body);
4571        // Should contain data[0].name = "Alice", not the entire data[0] element
4572        let name = projected
4573            .get("data")
4574            .and_then(|v| v.get("L"))
4575            .and_then(|v| v.get(0))
4576            .and_then(|v| v.get("M"))
4577            .and_then(|v| v.get("name"))
4578            .and_then(|v| v.get("S"))
4579            .and_then(|v| v.as_str());
4580        assert_eq!(name, Some("Alice"));
4581
4582        // Should NOT contain the "age" field
4583        let age = projected
4584            .get("data")
4585            .and_then(|v| v.get("L"))
4586            .and_then(|v| v.get(0))
4587            .and_then(|v| v.get("M"))
4588            .and_then(|v| v.get("age"));
4589        assert!(age.is_none(), "age should not be present in projection");
4590    }
4591
4592    #[test]
4593    fn test_resolve_nested_path_map() {
4594        let mut item = HashMap::new();
4595        item.insert(
4596            "info".to_string(),
4597            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
4598        );
4599
4600        let result = resolve_nested_path(&item, "info.address.city");
4601        assert_eq!(result, Some(json!({"S": "NYC"})));
4602    }
4603
4604    #[test]
4605    fn test_resolve_nested_path_list_then_map() {
4606        let mut item = HashMap::new();
4607        item.insert(
4608            "items".to_string(),
4609            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
4610        );
4611
4612        let result = resolve_nested_path(&item, "items[0].sku");
4613        assert_eq!(result, Some(json!({"S": "ABC"})));
4614    }
4615
4616    // -- Integration-style tests using DynamoDbService --
4617
4618    use crate::state::SharedDynamoDbState;
4619    use parking_lot::RwLock;
4620    use std::sync::Arc;
4621
4622    fn make_service() -> DynamoDbService {
4623        let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
4624            "123456789012",
4625            "us-east-1",
4626        )));
4627        DynamoDbService::new(state)
4628    }
4629
4630    fn make_request(action: &str, body: Value) -> AwsRequest {
4631        AwsRequest {
4632            service: "dynamodb".to_string(),
4633            action: action.to_string(),
4634            region: "us-east-1".to_string(),
4635            account_id: "123456789012".to_string(),
4636            request_id: "test-id".to_string(),
4637            headers: http::HeaderMap::new(),
4638            query_params: HashMap::new(),
4639            body: serde_json::to_vec(&body).unwrap().into(),
4640            path_segments: vec![],
4641            raw_path: "/".to_string(),
4642            method: http::Method::POST,
4643            is_query_protocol: false,
4644            access_key_id: None,
4645        }
4646    }
4647
4648    fn create_test_table(svc: &DynamoDbService) {
4649        let req = make_request(
4650            "CreateTable",
4651            json!({
4652                "TableName": "test-table",
4653                "KeySchema": [
4654                    { "AttributeName": "pk", "KeyType": "HASH" }
4655                ],
4656                "AttributeDefinitions": [
4657                    { "AttributeName": "pk", "AttributeType": "S" }
4658                ],
4659                "BillingMode": "PAY_PER_REQUEST"
4660            }),
4661        );
4662        svc.create_table(&req).unwrap();
4663    }
4664
4665    #[test]
4666    fn delete_item_return_values_all_old() {
4667        let svc = make_service();
4668        create_test_table(&svc);
4669
4670        // Put an item
4671        let req = make_request(
4672            "PutItem",
4673            json!({
4674                "TableName": "test-table",
4675                "Item": {
4676                    "pk": { "S": "key1" },
4677                    "name": { "S": "Alice" },
4678                    "age": { "N": "30" }
4679                }
4680            }),
4681        );
4682        svc.put_item(&req).unwrap();
4683
4684        // Delete with ReturnValues=ALL_OLD
4685        let req = make_request(
4686            "DeleteItem",
4687            json!({
4688                "TableName": "test-table",
4689                "Key": { "pk": { "S": "key1" } },
4690                "ReturnValues": "ALL_OLD"
4691            }),
4692        );
4693        let resp = svc.delete_item(&req).unwrap();
4694        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4695
4696        // Verify the old item is returned
4697        let attrs = &body["Attributes"];
4698        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
4699        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
4700        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
4701
4702        // Verify the item is actually deleted
4703        let req = make_request(
4704            "GetItem",
4705            json!({
4706                "TableName": "test-table",
4707                "Key": { "pk": { "S": "key1" } }
4708            }),
4709        );
4710        let resp = svc.get_item(&req).unwrap();
4711        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4712        assert!(body.get("Item").is_none(), "item should be deleted");
4713    }
4714
4715    #[test]
4716    fn transact_get_items_returns_existing_and_missing() {
4717        let svc = make_service();
4718        create_test_table(&svc);
4719
4720        // Put one item
4721        let req = make_request(
4722            "PutItem",
4723            json!({
4724                "TableName": "test-table",
4725                "Item": {
4726                    "pk": { "S": "exists" },
4727                    "val": { "S": "hello" }
4728                }
4729            }),
4730        );
4731        svc.put_item(&req).unwrap();
4732
4733        let req = make_request(
4734            "TransactGetItems",
4735            json!({
4736                "TransactItems": [
4737                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
4738                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
4739                ]
4740            }),
4741        );
4742        let resp = svc.transact_get_items(&req).unwrap();
4743        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4744        let responses = body["Responses"].as_array().unwrap();
4745        assert_eq!(responses.len(), 2);
4746        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
4747        assert!(responses[1].get("Item").is_none());
4748    }
4749
4750    #[test]
4751    fn transact_write_items_put_and_delete() {
4752        let svc = make_service();
4753        create_test_table(&svc);
4754
4755        // Put initial item
4756        let req = make_request(
4757            "PutItem",
4758            json!({
4759                "TableName": "test-table",
4760                "Item": {
4761                    "pk": { "S": "to-delete" },
4762                    "val": { "S": "bye" }
4763                }
4764            }),
4765        );
4766        svc.put_item(&req).unwrap();
4767
4768        // TransactWrite: put new + delete existing
4769        let req = make_request(
4770            "TransactWriteItems",
4771            json!({
4772                "TransactItems": [
4773                    {
4774                        "Put": {
4775                            "TableName": "test-table",
4776                            "Item": {
4777                                "pk": { "S": "new-item" },
4778                                "val": { "S": "hi" }
4779                            }
4780                        }
4781                    },
4782                    {
4783                        "Delete": {
4784                            "TableName": "test-table",
4785                            "Key": { "pk": { "S": "to-delete" } }
4786                        }
4787                    }
4788                ]
4789            }),
4790        );
4791        let resp = svc.transact_write_items(&req).unwrap();
4792        assert_eq!(resp.status, StatusCode::OK);
4793
4794        // Verify new item exists
4795        let req = make_request(
4796            "GetItem",
4797            json!({
4798                "TableName": "test-table",
4799                "Key": { "pk": { "S": "new-item" } }
4800            }),
4801        );
4802        let resp = svc.get_item(&req).unwrap();
4803        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4804        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
4805
4806        // Verify deleted item is gone
4807        let req = make_request(
4808            "GetItem",
4809            json!({
4810                "TableName": "test-table",
4811                "Key": { "pk": { "S": "to-delete" } }
4812            }),
4813        );
4814        let resp = svc.get_item(&req).unwrap();
4815        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4816        assert!(body.get("Item").is_none());
4817    }
4818
4819    #[test]
4820    fn transact_write_items_condition_check_failure() {
4821        let svc = make_service();
4822        create_test_table(&svc);
4823
4824        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
4825        let req = make_request(
4826            "TransactWriteItems",
4827            json!({
4828                "TransactItems": [
4829                    {
4830                        "ConditionCheck": {
4831                            "TableName": "test-table",
4832                            "Key": { "pk": { "S": "nonexistent" } },
4833                            "ConditionExpression": "attribute_exists(pk)"
4834                        }
4835                    }
4836                ]
4837            }),
4838        );
4839        let resp = svc.transact_write_items(&req).unwrap();
4840        // Should be a 400 error response
4841        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
4842        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4843        assert_eq!(
4844            body["__type"].as_str().unwrap(),
4845            "TransactionCanceledException"
4846        );
4847        assert!(body["CancellationReasons"].as_array().is_some());
4848    }
4849
4850    #[test]
4851    fn update_and_describe_time_to_live() {
4852        let svc = make_service();
4853        create_test_table(&svc);
4854
4855        // Enable TTL
4856        let req = make_request(
4857            "UpdateTimeToLive",
4858            json!({
4859                "TableName": "test-table",
4860                "TimeToLiveSpecification": {
4861                    "AttributeName": "ttl",
4862                    "Enabled": true
4863                }
4864            }),
4865        );
4866        let resp = svc.update_time_to_live(&req).unwrap();
4867        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4868        assert_eq!(
4869            body["TimeToLiveSpecification"]["AttributeName"]
4870                .as_str()
4871                .unwrap(),
4872            "ttl"
4873        );
4874        assert!(body["TimeToLiveSpecification"]["Enabled"]
4875            .as_bool()
4876            .unwrap());
4877
4878        // Describe TTL
4879        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
4880        let resp = svc.describe_time_to_live(&req).unwrap();
4881        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4882        assert_eq!(
4883            body["TimeToLiveDescription"]["TimeToLiveStatus"]
4884                .as_str()
4885                .unwrap(),
4886            "ENABLED"
4887        );
4888        assert_eq!(
4889            body["TimeToLiveDescription"]["AttributeName"]
4890                .as_str()
4891                .unwrap(),
4892            "ttl"
4893        );
4894
4895        // Disable TTL
4896        let req = make_request(
4897            "UpdateTimeToLive",
4898            json!({
4899                "TableName": "test-table",
4900                "TimeToLiveSpecification": {
4901                    "AttributeName": "ttl",
4902                    "Enabled": false
4903                }
4904            }),
4905        );
4906        svc.update_time_to_live(&req).unwrap();
4907
4908        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
4909        let resp = svc.describe_time_to_live(&req).unwrap();
4910        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4911        assert_eq!(
4912            body["TimeToLiveDescription"]["TimeToLiveStatus"]
4913                .as_str()
4914                .unwrap(),
4915            "DISABLED"
4916        );
4917    }
4918
4919    #[test]
4920    fn resource_policy_lifecycle() {
4921        let svc = make_service();
4922        create_test_table(&svc);
4923
4924        let table_arn = {
4925            let state = svc.state.read();
4926            state.tables.get("test-table").unwrap().arn.clone()
4927        };
4928
4929        // Put policy
4930        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
4931        let req = make_request(
4932            "PutResourcePolicy",
4933            json!({
4934                "ResourceArn": table_arn,
4935                "Policy": policy_doc
4936            }),
4937        );
4938        let resp = svc.put_resource_policy(&req).unwrap();
4939        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4940        assert!(body["RevisionId"].as_str().is_some());
4941
4942        // Get policy
4943        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
4944        let resp = svc.get_resource_policy(&req).unwrap();
4945        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4946        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
4947
4948        // Delete policy
4949        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
4950        svc.delete_resource_policy(&req).unwrap();
4951
4952        // Get should return null now
4953        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
4954        let resp = svc.get_resource_policy(&req).unwrap();
4955        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4956        assert!(body["Policy"].is_null());
4957    }
4958
4959    #[test]
4960    fn describe_endpoints() {
4961        let svc = make_service();
4962        let req = make_request("DescribeEndpoints", json!({}));
4963        let resp = svc.describe_endpoints(&req).unwrap();
4964        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4965        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
4966    }
4967
4968    #[test]
4969    fn describe_limits() {
4970        let svc = make_service();
4971        let req = make_request("DescribeLimits", json!({}));
4972        let resp = svc.describe_limits(&req).unwrap();
4973        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4974        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
4975    }
4976
4977    #[test]
4978    fn backup_lifecycle() {
4979        let svc = make_service();
4980        create_test_table(&svc);
4981
4982        // Create backup
4983        let req = make_request(
4984            "CreateBackup",
4985            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
4986        );
4987        let resp = svc.create_backup(&req).unwrap();
4988        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4989        let backup_arn = body["BackupDetails"]["BackupArn"]
4990            .as_str()
4991            .unwrap()
4992            .to_string();
4993        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
4994
4995        // Describe backup
4996        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
4997        let resp = svc.describe_backup(&req).unwrap();
4998        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4999        assert_eq!(
5000            body["BackupDescription"]["BackupDetails"]["BackupName"],
5001            "my-backup"
5002        );
5003
5004        // List backups
5005        let req = make_request("ListBackups", json!({}));
5006        let resp = svc.list_backups(&req).unwrap();
5007        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5008        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
5009
5010        // Restore from backup
5011        let req = make_request(
5012            "RestoreTableFromBackup",
5013            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
5014        );
5015        svc.restore_table_from_backup(&req).unwrap();
5016
5017        // Verify restored table exists
5018        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
5019        let resp = svc.describe_table(&req).unwrap();
5020        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5021        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5022
5023        // Delete backup
5024        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
5025        svc.delete_backup(&req).unwrap();
5026
5027        // List should be empty
5028        let req = make_request("ListBackups", json!({}));
5029        let resp = svc.list_backups(&req).unwrap();
5030        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5031        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
5032    }
5033
5034    #[test]
5035    fn continuous_backups() {
5036        let svc = make_service();
5037        create_test_table(&svc);
5038
5039        // Initially disabled
5040        let req = make_request(
5041            "DescribeContinuousBackups",
5042            json!({ "TableName": "test-table" }),
5043        );
5044        let resp = svc.describe_continuous_backups(&req).unwrap();
5045        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5046        assert_eq!(
5047            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5048                ["PointInTimeRecoveryStatus"],
5049            "DISABLED"
5050        );
5051
5052        // Enable
5053        let req = make_request(
5054            "UpdateContinuousBackups",
5055            json!({
5056                "TableName": "test-table",
5057                "PointInTimeRecoverySpecification": {
5058                    "PointInTimeRecoveryEnabled": true
5059                }
5060            }),
5061        );
5062        svc.update_continuous_backups(&req).unwrap();
5063
5064        // Verify
5065        let req = make_request(
5066            "DescribeContinuousBackups",
5067            json!({ "TableName": "test-table" }),
5068        );
5069        let resp = svc.describe_continuous_backups(&req).unwrap();
5070        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5071        assert_eq!(
5072            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5073                ["PointInTimeRecoveryStatus"],
5074            "ENABLED"
5075        );
5076    }
5077
5078    #[test]
5079    fn restore_table_to_point_in_time() {
5080        let svc = make_service();
5081        create_test_table(&svc);
5082
5083        let req = make_request(
5084            "RestoreTableToPointInTime",
5085            json!({
5086                "SourceTableName": "test-table",
5087                "TargetTableName": "pitr-restored"
5088            }),
5089        );
5090        svc.restore_table_to_point_in_time(&req).unwrap();
5091
5092        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
5093        let resp = svc.describe_table(&req).unwrap();
5094        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5095        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5096    }
5097
5098    #[test]
5099    fn global_table_lifecycle() {
5100        let svc = make_service();
5101
5102        // Create global table
5103        let req = make_request(
5104            "CreateGlobalTable",
5105            json!({
5106                "GlobalTableName": "my-global",
5107                "ReplicationGroup": [
5108                    { "RegionName": "us-east-1" },
5109                    { "RegionName": "eu-west-1" }
5110                ]
5111            }),
5112        );
5113        let resp = svc.create_global_table(&req).unwrap();
5114        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5115        assert_eq!(
5116            body["GlobalTableDescription"]["GlobalTableStatus"],
5117            "ACTIVE"
5118        );
5119
5120        // Describe
5121        let req = make_request(
5122            "DescribeGlobalTable",
5123            json!({ "GlobalTableName": "my-global" }),
5124        );
5125        let resp = svc.describe_global_table(&req).unwrap();
5126        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5127        assert_eq!(
5128            body["GlobalTableDescription"]["ReplicationGroup"]
5129                .as_array()
5130                .unwrap()
5131                .len(),
5132            2
5133        );
5134
5135        // List
5136        let req = make_request("ListGlobalTables", json!({}));
5137        let resp = svc.list_global_tables(&req).unwrap();
5138        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5139        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
5140
5141        // Update - add a region
5142        let req = make_request(
5143            "UpdateGlobalTable",
5144            json!({
5145                "GlobalTableName": "my-global",
5146                "ReplicaUpdates": [
5147                    { "Create": { "RegionName": "ap-southeast-1" } }
5148                ]
5149            }),
5150        );
5151        let resp = svc.update_global_table(&req).unwrap();
5152        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5153        assert_eq!(
5154            body["GlobalTableDescription"]["ReplicationGroup"]
5155                .as_array()
5156                .unwrap()
5157                .len(),
5158            3
5159        );
5160
5161        // Describe settings
5162        let req = make_request(
5163            "DescribeGlobalTableSettings",
5164            json!({ "GlobalTableName": "my-global" }),
5165        );
5166        let resp = svc.describe_global_table_settings(&req).unwrap();
5167        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5168        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
5169
5170        // Update settings (no-op, just verify no error)
5171        let req = make_request(
5172            "UpdateGlobalTableSettings",
5173            json!({ "GlobalTableName": "my-global" }),
5174        );
5175        svc.update_global_table_settings(&req).unwrap();
5176    }
5177
5178    #[test]
5179    fn table_replica_auto_scaling() {
5180        let svc = make_service();
5181        create_test_table(&svc);
5182
5183        let req = make_request(
5184            "DescribeTableReplicaAutoScaling",
5185            json!({ "TableName": "test-table" }),
5186        );
5187        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
5188        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5189        assert_eq!(
5190            body["TableAutoScalingDescription"]["TableName"],
5191            "test-table"
5192        );
5193
5194        let req = make_request(
5195            "UpdateTableReplicaAutoScaling",
5196            json!({ "TableName": "test-table" }),
5197        );
5198        svc.update_table_replica_auto_scaling(&req).unwrap();
5199    }
5200
5201    #[test]
5202    fn kinesis_streaming_lifecycle() {
5203        let svc = make_service();
5204        create_test_table(&svc);
5205
5206        // Enable
5207        let req = make_request(
5208            "EnableKinesisStreamingDestination",
5209            json!({
5210                "TableName": "test-table",
5211                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5212            }),
5213        );
5214        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
5215        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5216        assert_eq!(body["DestinationStatus"], "ACTIVE");
5217
5218        // Describe
5219        let req = make_request(
5220            "DescribeKinesisStreamingDestination",
5221            json!({ "TableName": "test-table" }),
5222        );
5223        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
5224        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5225        assert_eq!(
5226            body["KinesisDataStreamDestinations"]
5227                .as_array()
5228                .unwrap()
5229                .len(),
5230            1
5231        );
5232
5233        // Update
5234        let req = make_request(
5235            "UpdateKinesisStreamingDestination",
5236            json!({
5237                "TableName": "test-table",
5238                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
5239                "UpdateKinesisStreamingConfiguration": {
5240                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
5241                }
5242            }),
5243        );
5244        svc.update_kinesis_streaming_destination(&req).unwrap();
5245
5246        // Disable
5247        let req = make_request(
5248            "DisableKinesisStreamingDestination",
5249            json!({
5250                "TableName": "test-table",
5251                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5252            }),
5253        );
5254        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
5255        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5256        assert_eq!(body["DestinationStatus"], "DISABLED");
5257    }
5258
5259    #[test]
5260    fn contributor_insights_lifecycle() {
5261        let svc = make_service();
5262        create_test_table(&svc);
5263
5264        // Initially disabled
5265        let req = make_request(
5266            "DescribeContributorInsights",
5267            json!({ "TableName": "test-table" }),
5268        );
5269        let resp = svc.describe_contributor_insights(&req).unwrap();
5270        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5271        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5272
5273        // Enable
5274        let req = make_request(
5275            "UpdateContributorInsights",
5276            json!({
5277                "TableName": "test-table",
5278                "ContributorInsightsAction": "ENABLE"
5279            }),
5280        );
5281        let resp = svc.update_contributor_insights(&req).unwrap();
5282        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5283        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
5284
5285        // List
5286        let req = make_request("ListContributorInsights", json!({}));
5287        let resp = svc.list_contributor_insights(&req).unwrap();
5288        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5289        assert_eq!(
5290            body["ContributorInsightsSummaries"]
5291                .as_array()
5292                .unwrap()
5293                .len(),
5294            1
5295        );
5296    }
5297
5298    #[test]
5299    fn export_lifecycle() {
5300        let svc = make_service();
5301        create_test_table(&svc);
5302
5303        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
5304
5305        // Export
5306        let req = make_request(
5307            "ExportTableToPointInTime",
5308            json!({
5309                "TableArn": table_arn,
5310                "S3Bucket": "my-bucket"
5311            }),
5312        );
5313        let resp = svc.export_table_to_point_in_time(&req).unwrap();
5314        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5315        let export_arn = body["ExportDescription"]["ExportArn"]
5316            .as_str()
5317            .unwrap()
5318            .to_string();
5319        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
5320
5321        // Describe
5322        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
5323        let resp = svc.describe_export(&req).unwrap();
5324        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5325        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
5326
5327        // List
5328        let req = make_request("ListExports", json!({}));
5329        let resp = svc.list_exports(&req).unwrap();
5330        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5331        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
5332    }
5333
5334    #[test]
5335    fn import_lifecycle() {
5336        let svc = make_service();
5337
5338        let req = make_request(
5339            "ImportTable",
5340            json!({
5341                "InputFormat": "DYNAMODB_JSON",
5342                "S3BucketSource": { "S3Bucket": "import-bucket" },
5343                "TableCreationParameters": {
5344                    "TableName": "imported-table",
5345                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
5346                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
5347                }
5348            }),
5349        );
5350        let resp = svc.import_table(&req).unwrap();
5351        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5352        let import_arn = body["ImportTableDescription"]["ImportArn"]
5353            .as_str()
5354            .unwrap()
5355            .to_string();
5356        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5357
5358        // Describe import
5359        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
5360        let resp = svc.describe_import(&req).unwrap();
5361        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5362        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5363
5364        // List imports
5365        let req = make_request("ListImports", json!({}));
5366        let resp = svc.list_imports(&req).unwrap();
5367        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5368        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
5369
5370        // Verify the table was created
5371        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
5372        let resp = svc.describe_table(&req).unwrap();
5373        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5374        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5375    }
5376}