Skip to main content

fakecloud_dynamodb/
service.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Value};
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_core::validation::*;
10
11use crate::state::{
12    attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
13    ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
14    KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
15    ReplicaDescription, SharedDynamoDbState,
16};
17
18pub struct DynamoDbService {
19    state: SharedDynamoDbState,
20}
21
22impl DynamoDbService {
23    pub fn new(state: SharedDynamoDbState) -> Self {
24        Self { state }
25    }
26
27    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
28        serde_json::from_slice(&req.body).map_err(|e| {
29            AwsServiceError::aws_error(
30                StatusCode::BAD_REQUEST,
31                "SerializationException",
32                format!("Invalid JSON: {e}"),
33            )
34        })
35    }
36
37    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
38        Ok(AwsResponse::json(
39            StatusCode::OK,
40            serde_json::to_vec(&body).unwrap(),
41        ))
42    }
43
44    // ── Table Operations ────────────────────────────────────────────────
45
46    fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
47        let body = Self::parse_body(req)?;
48
49        let table_name = body["TableName"]
50            .as_str()
51            .ok_or_else(|| {
52                AwsServiceError::aws_error(
53                    StatusCode::BAD_REQUEST,
54                    "ValidationException",
55                    "TableName is required",
56                )
57            })?
58            .to_string();
59
60        let key_schema = parse_key_schema(&body["KeySchema"])?;
61        let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
62
63        // Validate that key schema attributes are defined
64        for ks in &key_schema {
65            if !attribute_definitions
66                .iter()
67                .any(|ad| ad.attribute_name == ks.attribute_name)
68            {
69                return Err(AwsServiceError::aws_error(
70                    StatusCode::BAD_REQUEST,
71                    "ValidationException",
72                    format!(
73                        "One or more parameter values were invalid: \
74                         Some index key attributes are not defined in AttributeDefinitions. \
75                         Keys: [{}], AttributeDefinitions: [{}]",
76                        ks.attribute_name,
77                        attribute_definitions
78                            .iter()
79                            .map(|ad| ad.attribute_name.as_str())
80                            .collect::<Vec<_>>()
81                            .join(", ")
82                    ),
83                ));
84            }
85        }
86
87        let billing_mode = body["BillingMode"]
88            .as_str()
89            .unwrap_or("PROVISIONED")
90            .to_string();
91
92        let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
93            ProvisionedThroughput {
94                read_capacity_units: 0,
95                write_capacity_units: 0,
96            }
97        } else {
98            parse_provisioned_throughput(&body["ProvisionedThroughput"])?
99        };
100
101        let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
102        let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
103        let tags = parse_tags(&body["Tags"]);
104
105        let mut state = self.state.write();
106
107        if state.tables.contains_key(&table_name) {
108            return Err(AwsServiceError::aws_error(
109                StatusCode::BAD_REQUEST,
110                "ResourceInUseException",
111                format!("Table already exists: {table_name}"),
112            ));
113        }
114
115        let arn = format!(
116            "arn:aws:dynamodb:{}:{}:table/{}",
117            state.region, state.account_id, table_name
118        );
119        let now = Utc::now();
120
121        let table = DynamoTable {
122            name: table_name.clone(),
123            arn: arn.clone(),
124            key_schema: key_schema.clone(),
125            attribute_definitions: attribute_definitions.clone(),
126            provisioned_throughput: provisioned_throughput.clone(),
127            items: Vec::new(),
128            gsi: gsi.clone(),
129            lsi: lsi.clone(),
130            tags,
131            created_at: now,
132            status: "ACTIVE".to_string(),
133            item_count: 0,
134            size_bytes: 0,
135            billing_mode: billing_mode.clone(),
136            ttl_attribute: None,
137            ttl_enabled: false,
138            resource_policy: None,
139            pitr_enabled: false,
140            kinesis_destinations: Vec::new(),
141            contributor_insights_status: "DISABLED".to_string(),
142        };
143
144        state.tables.insert(table_name, table);
145
146        let table_desc = build_table_description_json(
147            &arn,
148            &key_schema,
149            &attribute_definitions,
150            &provisioned_throughput,
151            &gsi,
152            &lsi,
153            &billing_mode,
154            now,
155            0,
156            0,
157            "ACTIVE",
158        );
159
160        Self::ok_json(json!({ "TableDescription": table_desc }))
161    }
162
163    fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
164        let body = Self::parse_body(req)?;
165        let table_name = require_str(&body, "TableName")?;
166
167        let mut state = self.state.write();
168        let table = state.tables.remove(table_name).ok_or_else(|| {
169            AwsServiceError::aws_error(
170                StatusCode::BAD_REQUEST,
171                "ResourceNotFoundException",
172                format!("Requested resource not found: Table: {table_name} not found"),
173            )
174        })?;
175
176        let table_desc = build_table_description_json(
177            &table.arn,
178            &table.key_schema,
179            &table.attribute_definitions,
180            &table.provisioned_throughput,
181            &table.gsi,
182            &table.lsi,
183            &table.billing_mode,
184            table.created_at,
185            table.item_count,
186            table.size_bytes,
187            "DELETING",
188        );
189
190        Self::ok_json(json!({ "TableDescription": table_desc }))
191    }
192
193    fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
194        let body = Self::parse_body(req)?;
195        let table_name = require_str(&body, "TableName")?;
196
197        let state = self.state.read();
198        let table = get_table(&state.tables, table_name)?;
199
200        let table_desc = build_table_description_json(
201            &table.arn,
202            &table.key_schema,
203            &table.attribute_definitions,
204            &table.provisioned_throughput,
205            &table.gsi,
206            &table.lsi,
207            &table.billing_mode,
208            table.created_at,
209            table.item_count,
210            table.size_bytes,
211            &table.status,
212        );
213
214        Self::ok_json(json!({ "Table": table_desc }))
215    }
216
217    fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
218        let body = Self::parse_body(req)?;
219
220        validate_optional_string_length(
221            "exclusiveStartTableName",
222            body["ExclusiveStartTableName"].as_str(),
223            3,
224            255,
225        )?;
226        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
227
228        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
229        let exclusive_start = body["ExclusiveStartTableName"]
230            .as_str()
231            .map(|s| s.to_string());
232
233        let state = self.state.read();
234        let mut names: Vec<&String> = state.tables.keys().collect();
235        names.sort();
236
237        let start_idx = match &exclusive_start {
238            Some(start) => names
239                .iter()
240                .position(|n| n.as_str() > start.as_str())
241                .unwrap_or(names.len()),
242            None => 0,
243        };
244
245        let page: Vec<&str> = names
246            .iter()
247            .skip(start_idx)
248            .take(limit)
249            .map(|n| n.as_str())
250            .collect();
251
252        let mut result = json!({ "TableNames": page });
253
254        if start_idx + limit < names.len() {
255            if let Some(last) = page.last() {
256                result["LastEvaluatedTableName"] = json!(last);
257            }
258        }
259
260        Self::ok_json(result)
261    }
262
263    fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
264        let body = Self::parse_body(req)?;
265        let table_name = require_str(&body, "TableName")?;
266
267        let mut state = self.state.write();
268        let table = state.tables.get_mut(table_name).ok_or_else(|| {
269            AwsServiceError::aws_error(
270                StatusCode::BAD_REQUEST,
271                "ResourceNotFoundException",
272                format!("Requested resource not found: Table: {table_name} not found"),
273            )
274        })?;
275
276        if let Some(pt) = body.get("ProvisionedThroughput") {
277            if let Ok(throughput) = parse_provisioned_throughput(pt) {
278                table.provisioned_throughput = throughput;
279            }
280        }
281
282        if let Some(bm) = body["BillingMode"].as_str() {
283            table.billing_mode = bm.to_string();
284        }
285
286        let table_desc = build_table_description_json(
287            &table.arn,
288            &table.key_schema,
289            &table.attribute_definitions,
290            &table.provisioned_throughput,
291            &table.gsi,
292            &table.lsi,
293            &table.billing_mode,
294            table.created_at,
295            table.item_count,
296            table.size_bytes,
297            &table.status,
298        );
299
300        Self::ok_json(json!({ "TableDescription": table_desc }))
301    }
302
303    // ── Item Operations ─────────────────────────────────────────────────
304
305    fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
306        let body = Self::parse_body(req)?;
307        let table_name = require_str(&body, "TableName")?;
308        let item = require_object(&body, "Item")?;
309
310        let mut state = self.state.write();
311        let table = get_table_mut(&mut state.tables, table_name)?;
312
313        validate_key_in_item(table, &item)?;
314
315        let condition = body["ConditionExpression"].as_str();
316        let expr_attr_names = parse_expression_attribute_names(&body);
317        let expr_attr_values = parse_expression_attribute_values(&body);
318
319        let key = extract_key(table, &item);
320        let existing_idx = table.find_item_index(&key);
321
322        if let Some(cond) = condition {
323            let existing = existing_idx.map(|i| &table.items[i]);
324            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
325        }
326
327        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
328        let old_item = if return_values == "ALL_OLD" {
329            existing_idx.map(|i| table.items[i].clone())
330        } else {
331            None
332        };
333
334        if let Some(idx) = existing_idx {
335            table.items[idx] = item;
336        } else {
337            table.items.push(item);
338        }
339
340        table.recalculate_stats();
341
342        let mut result = json!({});
343        if let Some(old) = old_item {
344            result["Attributes"] = json!(old);
345        }
346
347        Self::ok_json(result)
348    }
349
350    fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
351        let body = Self::parse_body(req)?;
352        let table_name = require_str(&body, "TableName")?;
353        let key = require_object(&body, "Key")?;
354
355        let state = self.state.read();
356        let table = get_table(&state.tables, table_name)?;
357
358        let result = match table.find_item_index(&key) {
359            Some(idx) => {
360                let item = &table.items[idx];
361                let projected = project_item(item, &body);
362                json!({ "Item": projected })
363            }
364            None => json!({}),
365        };
366
367        Self::ok_json(result)
368    }
369
370    fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
371        let body = Self::parse_body(req)?;
372
373        validate_optional_enum_value(
374            "conditionalOperator",
375            &body["ConditionalOperator"],
376            &["AND", "OR"],
377        )?;
378        validate_optional_enum_value(
379            "returnConsumedCapacity",
380            &body["ReturnConsumedCapacity"],
381            &["INDEXES", "TOTAL", "NONE"],
382        )?;
383        validate_optional_enum_value(
384            "returnValues",
385            &body["ReturnValues"],
386            &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
387        )?;
388        validate_optional_enum_value(
389            "returnItemCollectionMetrics",
390            &body["ReturnItemCollectionMetrics"],
391            &["SIZE", "NONE"],
392        )?;
393        validate_optional_enum_value(
394            "returnValuesOnConditionCheckFailure",
395            &body["ReturnValuesOnConditionCheckFailure"],
396            &["ALL_OLD", "NONE"],
397        )?;
398
399        let table_name = require_str(&body, "TableName")?;
400        let key = require_object(&body, "Key")?;
401
402        let mut state = self.state.write();
403        let table = get_table_mut(&mut state.tables, table_name)?;
404
405        let condition = body["ConditionExpression"].as_str();
406        let expr_attr_names = parse_expression_attribute_names(&body);
407        let expr_attr_values = parse_expression_attribute_values(&body);
408
409        let existing_idx = table.find_item_index(&key);
410
411        if let Some(cond) = condition {
412            let existing = existing_idx.map(|i| &table.items[i]);
413            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
414        }
415
416        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
417
418        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
419        let return_icm = body["ReturnItemCollectionMetrics"]
420            .as_str()
421            .unwrap_or("NONE");
422
423        let mut result = json!({});
424
425        if let Some(idx) = existing_idx {
426            if return_values == "ALL_OLD" {
427                result["Attributes"] = json!(table.items[idx]);
428            }
429            table.items.remove(idx);
430            table.recalculate_stats();
431        }
432
433        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
434            result["ConsumedCapacity"] = json!({
435                "TableName": table_name,
436                "CapacityUnits": 1.0,
437            });
438        }
439
440        if return_icm == "SIZE" {
441            result["ItemCollectionMetrics"] = json!({});
442        }
443
444        Self::ok_json(result)
445    }
446
447    fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
448        let body = Self::parse_body(req)?;
449        let table_name = require_str(&body, "TableName")?;
450        let key = require_object(&body, "Key")?;
451
452        let mut state = self.state.write();
453        let table = get_table_mut(&mut state.tables, table_name)?;
454
455        validate_key_attributes_in_key(table, &key)?;
456
457        let condition = body["ConditionExpression"].as_str();
458        let expr_attr_names = parse_expression_attribute_names(&body);
459        let expr_attr_values = parse_expression_attribute_values(&body);
460        let update_expression = body["UpdateExpression"].as_str();
461
462        let existing_idx = table.find_item_index(&key);
463
464        if let Some(cond) = condition {
465            let existing = existing_idx.map(|i| &table.items[i]);
466            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
467        }
468
469        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
470
471        let idx = match existing_idx {
472            Some(i) => i,
473            None => {
474                let mut new_item = HashMap::new();
475                for (k, v) in &key {
476                    new_item.insert(k.clone(), v.clone());
477                }
478                table.items.push(new_item);
479                table.items.len() - 1
480            }
481        };
482
483        let old_item = if return_values == "ALL_OLD" {
484            Some(table.items[idx].clone())
485        } else {
486            None
487        };
488
489        if let Some(expr) = update_expression {
490            apply_update_expression(
491                &mut table.items[idx],
492                expr,
493                &expr_attr_names,
494                &expr_attr_values,
495            )?;
496        }
497
498        let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
499            Some(table.items[idx].clone())
500        } else {
501            None
502        };
503
504        table.recalculate_stats();
505
506        let mut result = json!({});
507        if let Some(old) = old_item {
508            result["Attributes"] = json!(old);
509        } else if let Some(new) = new_item {
510            result["Attributes"] = json!(new);
511        }
512
513        Self::ok_json(result)
514    }
515
516    // ── Query & Scan ────────────────────────────────────────────────────
517
518    fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
519        let body = Self::parse_body(req)?;
520        let table_name = require_str(&body, "TableName")?;
521
522        let state = self.state.read();
523        let table = get_table(&state.tables, table_name)?;
524
525        let expr_attr_names = parse_expression_attribute_names(&body);
526        let expr_attr_values = parse_expression_attribute_values(&body);
527
528        let key_condition = body["KeyConditionExpression"].as_str();
529        let filter_expression = body["FilterExpression"].as_str();
530        let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
531        let limit = body["Limit"].as_i64().map(|l| l as usize);
532        let index_name = body["IndexName"].as_str();
533
534        let (items_to_scan, hash_key_name, range_key_name): (
535            &[HashMap<String, AttributeValue>],
536            String,
537            Option<String>,
538        ) = if let Some(idx_name) = index_name {
539            if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
540                let hk = gsi
541                    .key_schema
542                    .iter()
543                    .find(|k| k.key_type == "HASH")
544                    .map(|k| k.attribute_name.clone())
545                    .unwrap_or_default();
546                let rk = gsi
547                    .key_schema
548                    .iter()
549                    .find(|k| k.key_type == "RANGE")
550                    .map(|k| k.attribute_name.clone());
551                (&table.items, hk, rk)
552            } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
553                let hk = lsi
554                    .key_schema
555                    .iter()
556                    .find(|k| k.key_type == "HASH")
557                    .map(|k| k.attribute_name.clone())
558                    .unwrap_or_default();
559                let rk = lsi
560                    .key_schema
561                    .iter()
562                    .find(|k| k.key_type == "RANGE")
563                    .map(|k| k.attribute_name.clone());
564                (&table.items, hk, rk)
565            } else {
566                return Err(AwsServiceError::aws_error(
567                    StatusCode::BAD_REQUEST,
568                    "ValidationException",
569                    format!("The table does not have the specified index: {idx_name}"),
570                ));
571            }
572        } else {
573            (
574                &table.items[..],
575                table.hash_key_name().to_string(),
576                table.range_key_name().map(|s| s.to_string()),
577            )
578        };
579
580        let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
581            .iter()
582            .filter(|item| {
583                if let Some(kc) = key_condition {
584                    evaluate_key_condition(
585                        kc,
586                        item,
587                        &hash_key_name,
588                        range_key_name.as_deref(),
589                        &expr_attr_names,
590                        &expr_attr_values,
591                    )
592                } else {
593                    true
594                }
595            })
596            .collect();
597
598        if let Some(ref rk) = range_key_name {
599            matched.sort_by(|a, b| {
600                let av = a.get(rk.as_str());
601                let bv = b.get(rk.as_str());
602                compare_attribute_values(av, bv)
603            });
604            if !scan_forward {
605                matched.reverse();
606            }
607        }
608
609        if let Some(filter) = filter_expression {
610            matched.retain(|item| {
611                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
612            });
613        }
614
615        let scanned_count = matched.len();
616
617        if let Some(lim) = limit {
618            matched.truncate(lim);
619        }
620
621        let items: Vec<Value> = matched
622            .iter()
623            .map(|item| {
624                let projected = project_item(item, &body);
625                json!(projected)
626            })
627            .collect();
628
629        Self::ok_json(json!({
630            "Items": items,
631            "Count": items.len(),
632            "ScannedCount": scanned_count,
633        }))
634    }
635
636    fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
637        let body = Self::parse_body(req)?;
638        let table_name = require_str(&body, "TableName")?;
639
640        let state = self.state.read();
641        let table = get_table(&state.tables, table_name)?;
642
643        let expr_attr_names = parse_expression_attribute_names(&body);
644        let expr_attr_values = parse_expression_attribute_values(&body);
645        let filter_expression = body["FilterExpression"].as_str();
646        let limit = body["Limit"].as_i64().map(|l| l as usize);
647
648        let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
649        let scanned_count = matched.len();
650
651        if let Some(filter) = filter_expression {
652            matched.retain(|item| {
653                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
654            });
655        }
656
657        if let Some(lim) = limit {
658            matched.truncate(lim);
659        }
660
661        let items: Vec<Value> = matched
662            .iter()
663            .map(|item| {
664                let projected = project_item(item, &body);
665                json!(projected)
666            })
667            .collect();
668
669        Self::ok_json(json!({
670            "Items": items,
671            "Count": items.len(),
672            "ScannedCount": scanned_count,
673        }))
674    }
675
676    // ── Batch Operations ────────────────────────────────────────────────
677
678    fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
679        let body = Self::parse_body(req)?;
680
681        validate_optional_enum_value(
682            "returnConsumedCapacity",
683            &body["ReturnConsumedCapacity"],
684            &["INDEXES", "TOTAL", "NONE"],
685        )?;
686
687        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
688
689        let request_items = body["RequestItems"]
690            .as_object()
691            .ok_or_else(|| {
692                AwsServiceError::aws_error(
693                    StatusCode::BAD_REQUEST,
694                    "ValidationException",
695                    "RequestItems is required",
696                )
697            })?
698            .clone();
699
700        let state = self.state.read();
701        let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
702        let mut consumed_capacity: Vec<Value> = Vec::new();
703
704        for (table_name, params) in &request_items {
705            let table = get_table(&state.tables, table_name)?;
706            let keys = params["Keys"].as_array().ok_or_else(|| {
707                AwsServiceError::aws_error(
708                    StatusCode::BAD_REQUEST,
709                    "ValidationException",
710                    "Keys is required",
711                )
712            })?;
713
714            let mut items = Vec::new();
715            for key_val in keys {
716                let key: HashMap<String, AttributeValue> =
717                    serde_json::from_value(key_val.clone()).unwrap_or_default();
718                if let Some(idx) = table.find_item_index(&key) {
719                    items.push(json!(table.items[idx]));
720                }
721            }
722            responses.insert(table_name.clone(), items);
723
724            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
725                consumed_capacity.push(json!({
726                    "TableName": table_name,
727                    "CapacityUnits": 1.0,
728                }));
729            }
730        }
731
732        let mut result = json!({
733            "Responses": responses,
734            "UnprocessedKeys": {},
735        });
736
737        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
738            result["ConsumedCapacity"] = json!(consumed_capacity);
739        }
740
741        Self::ok_json(result)
742    }
743
744    fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
745        let body = Self::parse_body(req)?;
746
747        validate_optional_enum_value(
748            "returnConsumedCapacity",
749            &body["ReturnConsumedCapacity"],
750            &["INDEXES", "TOTAL", "NONE"],
751        )?;
752        validate_optional_enum_value(
753            "returnItemCollectionMetrics",
754            &body["ReturnItemCollectionMetrics"],
755            &["SIZE", "NONE"],
756        )?;
757
758        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
759        let return_icm = body["ReturnItemCollectionMetrics"]
760            .as_str()
761            .unwrap_or("NONE");
762
763        let request_items = body["RequestItems"]
764            .as_object()
765            .ok_or_else(|| {
766                AwsServiceError::aws_error(
767                    StatusCode::BAD_REQUEST,
768                    "ValidationException",
769                    "RequestItems is required",
770                )
771            })?
772            .clone();
773
774        let mut state = self.state.write();
775        let mut consumed_capacity: Vec<Value> = Vec::new();
776        let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
777
778        for (table_name, requests) in &request_items {
779            let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
780                AwsServiceError::aws_error(
781                    StatusCode::BAD_REQUEST,
782                    "ResourceNotFoundException",
783                    format!("Requested resource not found: Table: {table_name} not found"),
784                )
785            })?;
786
787            let reqs = requests.as_array().ok_or_else(|| {
788                AwsServiceError::aws_error(
789                    StatusCode::BAD_REQUEST,
790                    "ValidationException",
791                    "Request list must be an array",
792                )
793            })?;
794
795            for request in reqs {
796                if let Some(put_req) = request.get("PutRequest") {
797                    let item: HashMap<String, AttributeValue> =
798                        serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
799                    let key = extract_key(table, &item);
800                    if let Some(idx) = table.find_item_index(&key) {
801                        table.items[idx] = item;
802                    } else {
803                        table.items.push(item);
804                    }
805                } else if let Some(del_req) = request.get("DeleteRequest") {
806                    let key: HashMap<String, AttributeValue> =
807                        serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
808                    if let Some(idx) = table.find_item_index(&key) {
809                        table.items.remove(idx);
810                    }
811                }
812            }
813
814            table.recalculate_stats();
815
816            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
817                consumed_capacity.push(json!({
818                    "TableName": table_name,
819                    "CapacityUnits": 1.0,
820                }));
821            }
822
823            if return_icm == "SIZE" {
824                item_collection_metrics.insert(table_name.clone(), vec![]);
825            }
826        }
827
828        let mut result = json!({
829            "UnprocessedItems": {},
830        });
831
832        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
833            result["ConsumedCapacity"] = json!(consumed_capacity);
834        }
835
836        if return_icm == "SIZE" {
837            result["ItemCollectionMetrics"] = json!(item_collection_metrics);
838        }
839
840        Self::ok_json(result)
841    }
842
843    // ── Tags ────────────────────────────────────────────────────────────
844
845    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
846        let body = Self::parse_body(req)?;
847        let resource_arn = require_str(&body, "ResourceArn")?;
848        let tags_arr = body["Tags"].as_array().ok_or_else(|| {
849            AwsServiceError::aws_error(
850                StatusCode::BAD_REQUEST,
851                "ValidationException",
852                "Tags is required",
853            )
854        })?;
855
856        let mut state = self.state.write();
857        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
858
859        for tag in tags_arr {
860            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
861                table.tags.insert(k.to_string(), v.to_string());
862            }
863        }
864
865        Self::ok_json(json!({}))
866    }
867
868    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
869        let body = Self::parse_body(req)?;
870        let resource_arn = require_str(&body, "ResourceArn")?;
871        let tag_keys = body["TagKeys"].as_array().ok_or_else(|| {
872            AwsServiceError::aws_error(
873                StatusCode::BAD_REQUEST,
874                "ValidationException",
875                "TagKeys is required",
876            )
877        })?;
878
879        let mut state = self.state.write();
880        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
881
882        for key in tag_keys {
883            if let Some(k) = key.as_str() {
884                table.tags.remove(k);
885            }
886        }
887
888        Self::ok_json(json!({}))
889    }
890
891    fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
892        let body = Self::parse_body(req)?;
893        let resource_arn = require_str(&body, "ResourceArn")?;
894
895        let state = self.state.read();
896        let table = find_table_by_arn(&state.tables, resource_arn)?;
897
898        let tags: Vec<Value> = table
899            .tags
900            .iter()
901            .map(|(k, v)| json!({"Key": k, "Value": v}))
902            .collect();
903
904        Self::ok_json(json!({ "Tags": tags }))
905    }
906
907    // ── Transactions ────────────────────────────────────────────────────
908
909    fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
910        let body = Self::parse_body(req)?;
911        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
912            AwsServiceError::aws_error(
913                StatusCode::BAD_REQUEST,
914                "ValidationException",
915                "TransactItems is required",
916            )
917        })?;
918
919        let state = self.state.read();
920        let mut responses: Vec<Value> = Vec::new();
921
922        for ti in transact_items {
923            let get = &ti["Get"];
924            let table_name = get["TableName"].as_str().ok_or_else(|| {
925                AwsServiceError::aws_error(
926                    StatusCode::BAD_REQUEST,
927                    "ValidationException",
928                    "TableName is required in Get",
929                )
930            })?;
931            let key: HashMap<String, AttributeValue> =
932                serde_json::from_value(get["Key"].clone()).unwrap_or_default();
933
934            let table = get_table(&state.tables, table_name)?;
935            match table.find_item_index(&key) {
936                Some(idx) => {
937                    responses.push(json!({ "Item": table.items[idx] }));
938                }
939                None => {
940                    responses.push(json!({}));
941                }
942            }
943        }
944
945        Self::ok_json(json!({ "Responses": responses }))
946    }
947
948    fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
949        let body = Self::parse_body(req)?;
950        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
951            AwsServiceError::aws_error(
952                StatusCode::BAD_REQUEST,
953                "ValidationException",
954                "TransactItems is required",
955            )
956        })?;
957
958        let mut state = self.state.write();
959
960        // First pass: validate all conditions
961        let mut cancellation_reasons: Vec<Value> = Vec::new();
962        let mut any_failed = false;
963
964        for ti in transact_items {
965            if let Some(put) = ti.get("Put") {
966                let table_name = put["TableName"].as_str().unwrap_or_default();
967                let item: HashMap<String, AttributeValue> =
968                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
969                let condition = put["ConditionExpression"].as_str();
970
971                if let Some(cond) = condition {
972                    let table = get_table(&state.tables, table_name)?;
973                    let expr_attr_names = parse_expression_attribute_names(put);
974                    let expr_attr_values = parse_expression_attribute_values(put);
975                    let key = extract_key(table, &item);
976                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
977                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
978                        .is_err()
979                    {
980                        cancellation_reasons.push(json!({
981                            "Code": "ConditionalCheckFailed",
982                            "Message": "The conditional request failed"
983                        }));
984                        any_failed = true;
985                        continue;
986                    }
987                }
988                cancellation_reasons.push(json!({ "Code": "None" }));
989            } else if let Some(delete) = ti.get("Delete") {
990                let table_name = delete["TableName"].as_str().unwrap_or_default();
991                let key: HashMap<String, AttributeValue> =
992                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
993                let condition = delete["ConditionExpression"].as_str();
994
995                if let Some(cond) = condition {
996                    let table = get_table(&state.tables, table_name)?;
997                    let expr_attr_names = parse_expression_attribute_names(delete);
998                    let expr_attr_values = parse_expression_attribute_values(delete);
999                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1000                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1001                        .is_err()
1002                    {
1003                        cancellation_reasons.push(json!({
1004                            "Code": "ConditionalCheckFailed",
1005                            "Message": "The conditional request failed"
1006                        }));
1007                        any_failed = true;
1008                        continue;
1009                    }
1010                }
1011                cancellation_reasons.push(json!({ "Code": "None" }));
1012            } else if let Some(update) = ti.get("Update") {
1013                let table_name = update["TableName"].as_str().unwrap_or_default();
1014                let key: HashMap<String, AttributeValue> =
1015                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1016                let condition = update["ConditionExpression"].as_str();
1017
1018                if let Some(cond) = condition {
1019                    let table = get_table(&state.tables, table_name)?;
1020                    let expr_attr_names = parse_expression_attribute_names(update);
1021                    let expr_attr_values = parse_expression_attribute_values(update);
1022                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1023                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1024                        .is_err()
1025                    {
1026                        cancellation_reasons.push(json!({
1027                            "Code": "ConditionalCheckFailed",
1028                            "Message": "The conditional request failed"
1029                        }));
1030                        any_failed = true;
1031                        continue;
1032                    }
1033                }
1034                cancellation_reasons.push(json!({ "Code": "None" }));
1035            } else if let Some(check) = ti.get("ConditionCheck") {
1036                let table_name = check["TableName"].as_str().unwrap_or_default();
1037                let key: HashMap<String, AttributeValue> =
1038                    serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1039                let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1040
1041                let table = get_table(&state.tables, table_name)?;
1042                let expr_attr_names = parse_expression_attribute_names(check);
1043                let expr_attr_values = parse_expression_attribute_values(check);
1044                let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1045                if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1046                {
1047                    cancellation_reasons.push(json!({
1048                        "Code": "ConditionalCheckFailed",
1049                        "Message": "The conditional request failed"
1050                    }));
1051                    any_failed = true;
1052                    continue;
1053                }
1054                cancellation_reasons.push(json!({ "Code": "None" }));
1055            } else {
1056                cancellation_reasons.push(json!({ "Code": "None" }));
1057            }
1058        }
1059
1060        if any_failed {
1061            let error_body = json!({
1062                "__type": "TransactionCanceledException",
1063                "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1064                "CancellationReasons": cancellation_reasons
1065            });
1066            return Ok(AwsResponse::json(
1067                StatusCode::BAD_REQUEST,
1068                serde_json::to_vec(&error_body).unwrap(),
1069            ));
1070        }
1071
1072        // Second pass: apply all writes
1073        for ti in transact_items {
1074            if let Some(put) = ti.get("Put") {
1075                let table_name = put["TableName"].as_str().unwrap_or_default();
1076                let item: HashMap<String, AttributeValue> =
1077                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1078                let table = get_table_mut(&mut state.tables, table_name)?;
1079                let key = extract_key(table, &item);
1080                if let Some(idx) = table.find_item_index(&key) {
1081                    table.items[idx] = item;
1082                } else {
1083                    table.items.push(item);
1084                }
1085                table.recalculate_stats();
1086            } else if let Some(delete) = ti.get("Delete") {
1087                let table_name = delete["TableName"].as_str().unwrap_or_default();
1088                let key: HashMap<String, AttributeValue> =
1089                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1090                let table = get_table_mut(&mut state.tables, table_name)?;
1091                if let Some(idx) = table.find_item_index(&key) {
1092                    table.items.remove(idx);
1093                }
1094                table.recalculate_stats();
1095            } else if let Some(update) = ti.get("Update") {
1096                let table_name = update["TableName"].as_str().unwrap_or_default();
1097                let key: HashMap<String, AttributeValue> =
1098                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1099                let update_expression = update["UpdateExpression"].as_str();
1100                let expr_attr_names = parse_expression_attribute_names(update);
1101                let expr_attr_values = parse_expression_attribute_values(update);
1102
1103                let table = get_table_mut(&mut state.tables, table_name)?;
1104                let idx = match table.find_item_index(&key) {
1105                    Some(i) => i,
1106                    None => {
1107                        let mut new_item = HashMap::new();
1108                        for (k, v) in &key {
1109                            new_item.insert(k.clone(), v.clone());
1110                        }
1111                        table.items.push(new_item);
1112                        table.items.len() - 1
1113                    }
1114                };
1115
1116                if let Some(expr) = update_expression {
1117                    apply_update_expression(
1118                        &mut table.items[idx],
1119                        expr,
1120                        &expr_attr_names,
1121                        &expr_attr_values,
1122                    )?;
1123                }
1124                table.recalculate_stats();
1125            }
1126            // ConditionCheck: no write needed
1127        }
1128
1129        Self::ok_json(json!({}))
1130    }
1131
1132    fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1133        let body = Self::parse_body(req)?;
1134        let statement = require_str(&body, "Statement")?;
1135        let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1136
1137        execute_partiql_statement(&self.state, statement, &parameters)
1138    }
1139
1140    fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1141        let body = Self::parse_body(req)?;
1142        let statements = body["Statements"].as_array().ok_or_else(|| {
1143            AwsServiceError::aws_error(
1144                StatusCode::BAD_REQUEST,
1145                "ValidationException",
1146                "Statements is required",
1147            )
1148        })?;
1149
1150        let mut responses: Vec<Value> = Vec::new();
1151        for stmt_obj in statements {
1152            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1153            let parameters = stmt_obj["Parameters"]
1154                .as_array()
1155                .cloned()
1156                .unwrap_or_default();
1157
1158            match execute_partiql_statement(&self.state, statement, &parameters) {
1159                Ok(resp) => {
1160                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1161                    responses.push(resp_body);
1162                }
1163                Err(e) => {
1164                    responses.push(json!({
1165                        "Error": {
1166                            "Code": "ValidationException",
1167                            "Message": e.to_string()
1168                        }
1169                    }));
1170                }
1171            }
1172        }
1173
1174        Self::ok_json(json!({ "Responses": responses }))
1175    }
1176
1177    fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1178        let body = Self::parse_body(req)?;
1179        let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1180            AwsServiceError::aws_error(
1181                StatusCode::BAD_REQUEST,
1182                "ValidationException",
1183                "TransactStatements is required",
1184            )
1185        })?;
1186
1187        // Collect all results; if any fail, return TransactionCanceledException
1188        let mut results: Vec<Result<Value, String>> = Vec::new();
1189        for stmt_obj in transact_statements {
1190            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1191            let parameters = stmt_obj["Parameters"]
1192                .as_array()
1193                .cloned()
1194                .unwrap_or_default();
1195
1196            match execute_partiql_statement(&self.state, statement, &parameters) {
1197                Ok(resp) => {
1198                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1199                    results.push(Ok(resp_body));
1200                }
1201                Err(e) => {
1202                    results.push(Err(e.to_string()));
1203                }
1204            }
1205        }
1206
1207        let any_failed = results.iter().any(|r| r.is_err());
1208        if any_failed {
1209            let reasons: Vec<Value> = results
1210                .iter()
1211                .map(|r| match r {
1212                    Ok(_) => json!({ "Code": "None" }),
1213                    Err(msg) => json!({
1214                        "Code": "ValidationException",
1215                        "Message": msg
1216                    }),
1217                })
1218                .collect();
1219            let error_body = json!({
1220                "__type": "TransactionCanceledException",
1221                "message": "Transaction cancelled due to validation errors",
1222                "CancellationReasons": reasons
1223            });
1224            return Ok(AwsResponse::json(
1225                StatusCode::BAD_REQUEST,
1226                serde_json::to_vec(&error_body).unwrap(),
1227            ));
1228        }
1229
1230        let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1231        Self::ok_json(json!({ "Responses": responses }))
1232    }
1233
1234    // ── TTL ─────────────────────────────────────────────────────────────
1235
1236    fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1237        let body = Self::parse_body(req)?;
1238        let table_name = require_str(&body, "TableName")?;
1239        let spec = &body["TimeToLiveSpecification"];
1240        let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1241            AwsServiceError::aws_error(
1242                StatusCode::BAD_REQUEST,
1243                "ValidationException",
1244                "TimeToLiveSpecification.AttributeName is required",
1245            )
1246        })?;
1247        let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1248            AwsServiceError::aws_error(
1249                StatusCode::BAD_REQUEST,
1250                "ValidationException",
1251                "TimeToLiveSpecification.Enabled is required",
1252            )
1253        })?;
1254
1255        let mut state = self.state.write();
1256        let table = get_table_mut(&mut state.tables, table_name)?;
1257
1258        if enabled {
1259            table.ttl_attribute = Some(attr_name.to_string());
1260            table.ttl_enabled = true;
1261        } else {
1262            table.ttl_enabled = false;
1263        }
1264
1265        Self::ok_json(json!({
1266            "TimeToLiveSpecification": {
1267                "AttributeName": attr_name,
1268                "Enabled": enabled
1269            }
1270        }))
1271    }
1272
1273    fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1274        let body = Self::parse_body(req)?;
1275        let table_name = require_str(&body, "TableName")?;
1276
1277        let state = self.state.read();
1278        let table = get_table(&state.tables, table_name)?;
1279
1280        let status = if table.ttl_enabled {
1281            "ENABLED"
1282        } else {
1283            "DISABLED"
1284        };
1285
1286        let mut desc = json!({
1287            "TimeToLiveDescription": {
1288                "TimeToLiveStatus": status
1289            }
1290        });
1291
1292        if let Some(ref attr) = table.ttl_attribute {
1293            desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1294        }
1295
1296        Self::ok_json(desc)
1297    }
1298
1299    // ── Resource Policies ───────────────────────────────────────────────
1300
1301    fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1302        let body = Self::parse_body(req)?;
1303        let resource_arn = require_str(&body, "ResourceArn")?;
1304        let policy = require_str(&body, "Policy")?;
1305
1306        let mut state = self.state.write();
1307        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1308        table.resource_policy = Some(policy.to_string());
1309
1310        let revision_id = uuid::Uuid::new_v4().to_string();
1311        Self::ok_json(json!({ "RevisionId": revision_id }))
1312    }
1313
1314    fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1315        let body = Self::parse_body(req)?;
1316        let resource_arn = require_str(&body, "ResourceArn")?;
1317
1318        let state = self.state.read();
1319        let table = find_table_by_arn(&state.tables, resource_arn)?;
1320
1321        match &table.resource_policy {
1322            Some(policy) => {
1323                let revision_id = uuid::Uuid::new_v4().to_string();
1324                Self::ok_json(json!({
1325                    "Policy": policy,
1326                    "RevisionId": revision_id
1327                }))
1328            }
1329            None => Self::ok_json(json!({ "Policy": null })),
1330        }
1331    }
1332
1333    fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1334        let body = Self::parse_body(req)?;
1335        let resource_arn = require_str(&body, "ResourceArn")?;
1336
1337        let mut state = self.state.write();
1338        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1339        table.resource_policy = None;
1340
1341        Self::ok_json(json!({}))
1342    }
1343
1344    // ── Stubs ──────────────────────────────────────────────────────────
1345
1346    fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1347        Self::ok_json(json!({
1348            "Endpoints": [{
1349                "Address": "dynamodb.us-east-1.amazonaws.com",
1350                "CachePeriodInMinutes": 1440
1351            }]
1352        }))
1353    }
1354
1355    fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1356        Self::ok_json(json!({
1357            "AccountMaxReadCapacityUnits": 80000,
1358            "AccountMaxWriteCapacityUnits": 80000,
1359            "TableMaxReadCapacityUnits": 40000,
1360            "TableMaxWriteCapacityUnits": 40000
1361        }))
1362    }
1363
1364    // ── Backups ────────────────────────────────────────────────────────
1365
1366    fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1367        let body = Self::parse_body(req)?;
1368        let table_name = require_str(&body, "TableName")?;
1369        let backup_name = require_str(&body, "BackupName")?;
1370
1371        let mut state = self.state.write();
1372        let table = get_table(&state.tables, table_name)?;
1373
1374        let backup_arn = format!(
1375            "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1376            state.region,
1377            state.account_id,
1378            table_name,
1379            Utc::now().format("%Y%m%d%H%M%S")
1380        );
1381        let now = Utc::now();
1382
1383        let backup = BackupDescription {
1384            backup_arn: backup_arn.clone(),
1385            backup_name: backup_name.to_string(),
1386            table_name: table_name.to_string(),
1387            table_arn: table.arn.clone(),
1388            backup_status: "AVAILABLE".to_string(),
1389            backup_type: "USER".to_string(),
1390            backup_creation_date: now,
1391            key_schema: table.key_schema.clone(),
1392            attribute_definitions: table.attribute_definitions.clone(),
1393            provisioned_throughput: table.provisioned_throughput.clone(),
1394            billing_mode: table.billing_mode.clone(),
1395            item_count: table.item_count,
1396            size_bytes: table.size_bytes,
1397        };
1398
1399        state.backups.insert(backup_arn.clone(), backup);
1400
1401        Self::ok_json(json!({
1402            "BackupDetails": {
1403                "BackupArn": backup_arn,
1404                "BackupName": backup_name,
1405                "BackupStatus": "AVAILABLE",
1406                "BackupType": "USER",
1407                "BackupCreationDateTime": now.timestamp() as f64,
1408                "BackupSizeBytes": 0
1409            }
1410        }))
1411    }
1412
1413    fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1414        let body = Self::parse_body(req)?;
1415        let backup_arn = require_str(&body, "BackupArn")?;
1416
1417        let mut state = self.state.write();
1418        let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1419            AwsServiceError::aws_error(
1420                StatusCode::BAD_REQUEST,
1421                "BackupNotFoundException",
1422                format!("Backup not found: {backup_arn}"),
1423            )
1424        })?;
1425
1426        Self::ok_json(json!({
1427            "BackupDescription": {
1428                "BackupDetails": {
1429                    "BackupArn": backup.backup_arn,
1430                    "BackupName": backup.backup_name,
1431                    "BackupStatus": "DELETED",
1432                    "BackupType": backup.backup_type,
1433                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1434                    "BackupSizeBytes": backup.size_bytes
1435                },
1436                "SourceTableDetails": {
1437                    "TableName": backup.table_name,
1438                    "TableArn": backup.table_arn,
1439                    "TableId": uuid::Uuid::new_v4().to_string(),
1440                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1441                        "AttributeName": ks.attribute_name,
1442                        "KeyType": ks.key_type
1443                    })).collect::<Vec<_>>(),
1444                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1445                    "ProvisionedThroughput": {
1446                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1447                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1448                    },
1449                    "ItemCount": backup.item_count,
1450                    "BillingMode": backup.billing_mode,
1451                    "TableSizeBytes": backup.size_bytes
1452                },
1453                "SourceTableFeatureDetails": {}
1454            }
1455        }))
1456    }
1457
1458    fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1459        let body = Self::parse_body(req)?;
1460        let backup_arn = require_str(&body, "BackupArn")?;
1461
1462        let state = self.state.read();
1463        let backup = state.backups.get(backup_arn).ok_or_else(|| {
1464            AwsServiceError::aws_error(
1465                StatusCode::BAD_REQUEST,
1466                "BackupNotFoundException",
1467                format!("Backup not found: {backup_arn}"),
1468            )
1469        })?;
1470
1471        Self::ok_json(json!({
1472            "BackupDescription": {
1473                "BackupDetails": {
1474                    "BackupArn": backup.backup_arn,
1475                    "BackupName": backup.backup_name,
1476                    "BackupStatus": backup.backup_status,
1477                    "BackupType": backup.backup_type,
1478                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1479                    "BackupSizeBytes": backup.size_bytes
1480                },
1481                "SourceTableDetails": {
1482                    "TableName": backup.table_name,
1483                    "TableArn": backup.table_arn,
1484                    "TableId": uuid::Uuid::new_v4().to_string(),
1485                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1486                        "AttributeName": ks.attribute_name,
1487                        "KeyType": ks.key_type
1488                    })).collect::<Vec<_>>(),
1489                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1490                    "ProvisionedThroughput": {
1491                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1492                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1493                    },
1494                    "ItemCount": backup.item_count,
1495                    "BillingMode": backup.billing_mode,
1496                    "TableSizeBytes": backup.size_bytes
1497                },
1498                "SourceTableFeatureDetails": {}
1499            }
1500        }))
1501    }
1502
1503    fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1504        let body = Self::parse_body(req)?;
1505        let table_name = body["TableName"].as_str();
1506
1507        let state = self.state.read();
1508        let summaries: Vec<Value> = state
1509            .backups
1510            .values()
1511            .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
1512            .map(|b| {
1513                json!({
1514                    "TableName": b.table_name,
1515                    "TableArn": b.table_arn,
1516                    "BackupArn": b.backup_arn,
1517                    "BackupName": b.backup_name,
1518                    "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
1519                    "BackupStatus": b.backup_status,
1520                    "BackupType": b.backup_type,
1521                    "BackupSizeBytes": b.size_bytes
1522                })
1523            })
1524            .collect();
1525
1526        Self::ok_json(json!({
1527            "BackupSummaries": summaries
1528        }))
1529    }
1530
1531    fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1532        let body = Self::parse_body(req)?;
1533        let backup_arn = require_str(&body, "BackupArn")?;
1534        let target_table_name = require_str(&body, "TargetTableName")?;
1535
1536        let mut state = self.state.write();
1537        let backup = state.backups.get(backup_arn).ok_or_else(|| {
1538            AwsServiceError::aws_error(
1539                StatusCode::BAD_REQUEST,
1540                "BackupNotFoundException",
1541                format!("Backup not found: {backup_arn}"),
1542            )
1543        })?;
1544
1545        if state.tables.contains_key(target_table_name) {
1546            return Err(AwsServiceError::aws_error(
1547                StatusCode::BAD_REQUEST,
1548                "TableAlreadyExistsException",
1549                format!("Table already exists: {target_table_name}"),
1550            ));
1551        }
1552
1553        let now = Utc::now();
1554        let arn = format!(
1555            "arn:aws:dynamodb:{}:{}:table/{}",
1556            state.region, state.account_id, target_table_name
1557        );
1558
1559        let table = DynamoTable {
1560            name: target_table_name.to_string(),
1561            arn: arn.clone(),
1562            key_schema: backup.key_schema.clone(),
1563            attribute_definitions: backup.attribute_definitions.clone(),
1564            provisioned_throughput: backup.provisioned_throughput.clone(),
1565            items: Vec::new(),
1566            gsi: Vec::new(),
1567            lsi: Vec::new(),
1568            tags: HashMap::new(),
1569            created_at: now,
1570            status: "ACTIVE".to_string(),
1571            item_count: 0,
1572            size_bytes: 0,
1573            billing_mode: backup.billing_mode.clone(),
1574            ttl_attribute: None,
1575            ttl_enabled: false,
1576            resource_policy: None,
1577            pitr_enabled: false,
1578            kinesis_destinations: Vec::new(),
1579            contributor_insights_status: "DISABLED".to_string(),
1580        };
1581
1582        let desc = build_table_description(&table);
1583        state.tables.insert(target_table_name.to_string(), table);
1584
1585        Self::ok_json(json!({
1586            "TableDescription": desc
1587        }))
1588    }
1589
1590    fn restore_table_to_point_in_time(
1591        &self,
1592        req: &AwsRequest,
1593    ) -> Result<AwsResponse, AwsServiceError> {
1594        let body = Self::parse_body(req)?;
1595        let target_table_name = require_str(&body, "TargetTableName")?;
1596        let source_table_name = body["SourceTableName"].as_str();
1597        let source_table_arn = body["SourceTableArn"].as_str();
1598
1599        let mut state = self.state.write();
1600
1601        // Resolve source table
1602        let source = if let Some(name) = source_table_name {
1603            get_table(&state.tables, name)?.clone()
1604        } else if let Some(arn) = source_table_arn {
1605            find_table_by_arn(&state.tables, arn)?.clone()
1606        } else {
1607            return Err(AwsServiceError::aws_error(
1608                StatusCode::BAD_REQUEST,
1609                "ValidationException",
1610                "SourceTableName or SourceTableArn is required",
1611            ));
1612        };
1613
1614        if state.tables.contains_key(target_table_name) {
1615            return Err(AwsServiceError::aws_error(
1616                StatusCode::BAD_REQUEST,
1617                "TableAlreadyExistsException",
1618                format!("Table already exists: {target_table_name}"),
1619            ));
1620        }
1621
1622        let now = Utc::now();
1623        let arn = format!(
1624            "arn:aws:dynamodb:{}:{}:table/{}",
1625            state.region, state.account_id, target_table_name
1626        );
1627
1628        let table = DynamoTable {
1629            name: target_table_name.to_string(),
1630            arn: arn.clone(),
1631            key_schema: source.key_schema.clone(),
1632            attribute_definitions: source.attribute_definitions.clone(),
1633            provisioned_throughput: source.provisioned_throughput.clone(),
1634            items: Vec::new(),
1635            gsi: Vec::new(),
1636            lsi: Vec::new(),
1637            tags: HashMap::new(),
1638            created_at: now,
1639            status: "ACTIVE".to_string(),
1640            item_count: 0,
1641            size_bytes: 0,
1642            billing_mode: source.billing_mode.clone(),
1643            ttl_attribute: None,
1644            ttl_enabled: false,
1645            resource_policy: None,
1646            pitr_enabled: false,
1647            kinesis_destinations: Vec::new(),
1648            contributor_insights_status: "DISABLED".to_string(),
1649        };
1650
1651        let desc = build_table_description(&table);
1652        state.tables.insert(target_table_name.to_string(), table);
1653
1654        Self::ok_json(json!({
1655            "TableDescription": desc
1656        }))
1657    }
1658
1659    fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1660        let body = Self::parse_body(req)?;
1661        let table_name = require_str(&body, "TableName")?;
1662
1663        let pitr_spec = body["PointInTimeRecoverySpecification"]
1664            .as_object()
1665            .ok_or_else(|| {
1666                AwsServiceError::aws_error(
1667                    StatusCode::BAD_REQUEST,
1668                    "ValidationException",
1669                    "PointInTimeRecoverySpecification is required",
1670                )
1671            })?;
1672        let enabled = pitr_spec
1673            .get("PointInTimeRecoveryEnabled")
1674            .and_then(|v| v.as_bool())
1675            .unwrap_or(false);
1676
1677        let mut state = self.state.write();
1678        let table = get_table_mut(&mut state.tables, table_name)?;
1679        table.pitr_enabled = enabled;
1680
1681        let status = if enabled { "ENABLED" } else { "DISABLED" };
1682        Self::ok_json(json!({
1683            "ContinuousBackupsDescription": {
1684                "ContinuousBackupsStatus": status,
1685                "PointInTimeRecoveryDescription": {
1686                    "PointInTimeRecoveryStatus": status,
1687                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1688                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
1689                }
1690            }
1691        }))
1692    }
1693
1694    fn describe_continuous_backups(
1695        &self,
1696        req: &AwsRequest,
1697    ) -> Result<AwsResponse, AwsServiceError> {
1698        let body = Self::parse_body(req)?;
1699        let table_name = require_str(&body, "TableName")?;
1700
1701        let state = self.state.read();
1702        let table = get_table(&state.tables, table_name)?;
1703
1704        let status = if table.pitr_enabled {
1705            "ENABLED"
1706        } else {
1707            "DISABLED"
1708        };
1709        Self::ok_json(json!({
1710            "ContinuousBackupsDescription": {
1711                "ContinuousBackupsStatus": status,
1712                "PointInTimeRecoveryDescription": {
1713                    "PointInTimeRecoveryStatus": status,
1714                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1715                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
1716                }
1717            }
1718        }))
1719    }
1720
1721    // ── Global Tables ──────────────────────────────────────────────────
1722
1723    fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1724        let body = Self::parse_body(req)?;
1725        let global_table_name = require_str(&body, "GlobalTableName")?;
1726
1727        let replication_group = body["ReplicationGroup"]
1728            .as_array()
1729            .ok_or_else(|| {
1730                AwsServiceError::aws_error(
1731                    StatusCode::BAD_REQUEST,
1732                    "ValidationException",
1733                    "ReplicationGroup is required",
1734                )
1735            })?
1736            .iter()
1737            .filter_map(|r| {
1738                r["RegionName"].as_str().map(|rn| ReplicaDescription {
1739                    region_name: rn.to_string(),
1740                    replica_status: "ACTIVE".to_string(),
1741                })
1742            })
1743            .collect::<Vec<_>>();
1744
1745        let mut state = self.state.write();
1746
1747        if state.global_tables.contains_key(global_table_name) {
1748            return Err(AwsServiceError::aws_error(
1749                StatusCode::BAD_REQUEST,
1750                "GlobalTableAlreadyExistsException",
1751                format!("Global table already exists: {global_table_name}"),
1752            ));
1753        }
1754
1755        let arn = format!(
1756            "arn:aws:dynamodb::{}:global-table/{}",
1757            state.account_id, global_table_name
1758        );
1759        let now = Utc::now();
1760
1761        let gt = GlobalTableDescription {
1762            global_table_name: global_table_name.to_string(),
1763            global_table_arn: arn.clone(),
1764            global_table_status: "ACTIVE".to_string(),
1765            creation_date: now,
1766            replication_group: replication_group.clone(),
1767        };
1768
1769        state
1770            .global_tables
1771            .insert(global_table_name.to_string(), gt);
1772
1773        Self::ok_json(json!({
1774            "GlobalTableDescription": {
1775                "GlobalTableName": global_table_name,
1776                "GlobalTableArn": arn,
1777                "GlobalTableStatus": "ACTIVE",
1778                "CreationDateTime": now.timestamp() as f64,
1779                "ReplicationGroup": replication_group.iter().map(|r| json!({
1780                    "RegionName": r.region_name,
1781                    "ReplicaStatus": r.replica_status
1782                })).collect::<Vec<_>>()
1783            }
1784        }))
1785    }
1786
1787    fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1788        let body = Self::parse_body(req)?;
1789        let global_table_name = require_str(&body, "GlobalTableName")?;
1790
1791        let state = self.state.read();
1792        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
1793            AwsServiceError::aws_error(
1794                StatusCode::BAD_REQUEST,
1795                "GlobalTableNotFoundException",
1796                format!("Global table not found: {global_table_name}"),
1797            )
1798        })?;
1799
1800        Self::ok_json(json!({
1801            "GlobalTableDescription": {
1802                "GlobalTableName": gt.global_table_name,
1803                "GlobalTableArn": gt.global_table_arn,
1804                "GlobalTableStatus": gt.global_table_status,
1805                "CreationDateTime": gt.creation_date.timestamp() as f64,
1806                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1807                    "RegionName": r.region_name,
1808                    "ReplicaStatus": r.replica_status
1809                })).collect::<Vec<_>>()
1810            }
1811        }))
1812    }
1813
1814    fn describe_global_table_settings(
1815        &self,
1816        req: &AwsRequest,
1817    ) -> Result<AwsResponse, AwsServiceError> {
1818        let body = Self::parse_body(req)?;
1819        let global_table_name = require_str(&body, "GlobalTableName")?;
1820
1821        let state = self.state.read();
1822        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
1823            AwsServiceError::aws_error(
1824                StatusCode::BAD_REQUEST,
1825                "GlobalTableNotFoundException",
1826                format!("Global table not found: {global_table_name}"),
1827            )
1828        })?;
1829
1830        let replica_settings: Vec<Value> = gt
1831            .replication_group
1832            .iter()
1833            .map(|r| {
1834                json!({
1835                    "RegionName": r.region_name,
1836                    "ReplicaStatus": r.replica_status,
1837                    "ReplicaProvisionedReadCapacityUnits": 0,
1838                    "ReplicaProvisionedWriteCapacityUnits": 0
1839                })
1840            })
1841            .collect();
1842
1843        Self::ok_json(json!({
1844            "GlobalTableName": gt.global_table_name,
1845            "ReplicaSettings": replica_settings
1846        }))
1847    }
1848
1849    fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1850        let body = Self::parse_body(req)?;
1851        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1852
1853        let state = self.state.read();
1854        let tables: Vec<Value> = state
1855            .global_tables
1856            .values()
1857            .take(limit)
1858            .map(|gt| {
1859                json!({
1860                    "GlobalTableName": gt.global_table_name,
1861                    "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1862                        "RegionName": r.region_name
1863                    })).collect::<Vec<_>>()
1864                })
1865            })
1866            .collect();
1867
1868        Self::ok_json(json!({
1869            "GlobalTables": tables
1870        }))
1871    }
1872
1873    fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1874        let body = Self::parse_body(req)?;
1875        let global_table_name = require_str(&body, "GlobalTableName")?;
1876
1877        let mut state = self.state.write();
1878        let gt = state
1879            .global_tables
1880            .get_mut(global_table_name)
1881            .ok_or_else(|| {
1882                AwsServiceError::aws_error(
1883                    StatusCode::BAD_REQUEST,
1884                    "GlobalTableNotFoundException",
1885                    format!("Global table not found: {global_table_name}"),
1886                )
1887            })?;
1888
1889        if let Some(updates) = body["ReplicaUpdates"].as_array() {
1890            for update in updates {
1891                if let Some(create) = update["Create"].as_object() {
1892                    if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
1893                        gt.replication_group.push(ReplicaDescription {
1894                            region_name: region.to_string(),
1895                            replica_status: "ACTIVE".to_string(),
1896                        });
1897                    }
1898                }
1899                if let Some(delete) = update["Delete"].as_object() {
1900                    if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
1901                        gt.replication_group.retain(|r| r.region_name != region);
1902                    }
1903                }
1904            }
1905        }
1906
1907        Self::ok_json(json!({
1908            "GlobalTableDescription": {
1909                "GlobalTableName": gt.global_table_name,
1910                "GlobalTableArn": gt.global_table_arn,
1911                "GlobalTableStatus": gt.global_table_status,
1912                "CreationDateTime": gt.creation_date.timestamp() as f64,
1913                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1914                    "RegionName": r.region_name,
1915                    "ReplicaStatus": r.replica_status
1916                })).collect::<Vec<_>>()
1917            }
1918        }))
1919    }
1920
1921    fn update_global_table_settings(
1922        &self,
1923        req: &AwsRequest,
1924    ) -> Result<AwsResponse, AwsServiceError> {
1925        let body = Self::parse_body(req)?;
1926        let global_table_name = require_str(&body, "GlobalTableName")?;
1927
1928        let state = self.state.read();
1929        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
1930            AwsServiceError::aws_error(
1931                StatusCode::BAD_REQUEST,
1932                "GlobalTableNotFoundException",
1933                format!("Global table not found: {global_table_name}"),
1934            )
1935        })?;
1936
1937        let replica_settings: Vec<Value> = gt
1938            .replication_group
1939            .iter()
1940            .map(|r| {
1941                json!({
1942                    "RegionName": r.region_name,
1943                    "ReplicaStatus": r.replica_status,
1944                    "ReplicaProvisionedReadCapacityUnits": 0,
1945                    "ReplicaProvisionedWriteCapacityUnits": 0
1946                })
1947            })
1948            .collect();
1949
1950        Self::ok_json(json!({
1951            "GlobalTableName": gt.global_table_name,
1952            "ReplicaSettings": replica_settings
1953        }))
1954    }
1955
1956    fn describe_table_replica_auto_scaling(
1957        &self,
1958        req: &AwsRequest,
1959    ) -> Result<AwsResponse, AwsServiceError> {
1960        let body = Self::parse_body(req)?;
1961        let table_name = require_str(&body, "TableName")?;
1962
1963        let state = self.state.read();
1964        let table = get_table(&state.tables, table_name)?;
1965
1966        Self::ok_json(json!({
1967            "TableAutoScalingDescription": {
1968                "TableName": table.name,
1969                "TableStatus": table.status,
1970                "Replicas": []
1971            }
1972        }))
1973    }
1974
1975    fn update_table_replica_auto_scaling(
1976        &self,
1977        req: &AwsRequest,
1978    ) -> Result<AwsResponse, AwsServiceError> {
1979        let body = Self::parse_body(req)?;
1980        let table_name = require_str(&body, "TableName")?;
1981
1982        let state = self.state.read();
1983        let table = get_table(&state.tables, table_name)?;
1984
1985        Self::ok_json(json!({
1986            "TableAutoScalingDescription": {
1987                "TableName": table.name,
1988                "TableStatus": table.status,
1989                "Replicas": []
1990            }
1991        }))
1992    }
1993
1994    // ── Kinesis Streaming ──────────────────────────────────────────────
1995
1996    fn enable_kinesis_streaming_destination(
1997        &self,
1998        req: &AwsRequest,
1999    ) -> Result<AwsResponse, AwsServiceError> {
2000        let body = Self::parse_body(req)?;
2001        let table_name = require_str(&body, "TableName")?;
2002        let stream_arn = require_str(&body, "StreamArn")?;
2003        let precision = body["EnableKinesisStreamingConfiguration"]
2004            ["ApproximateCreationDateTimePrecision"]
2005            .as_str()
2006            .unwrap_or("MILLISECOND");
2007
2008        let mut state = self.state.write();
2009        let table = get_table_mut(&mut state.tables, table_name)?;
2010
2011        table.kinesis_destinations.push(KinesisDestination {
2012            stream_arn: stream_arn.to_string(),
2013            destination_status: "ACTIVE".to_string(),
2014            approximate_creation_date_time_precision: precision.to_string(),
2015        });
2016
2017        Self::ok_json(json!({
2018            "TableName": table_name,
2019            "StreamArn": stream_arn,
2020            "DestinationStatus": "ACTIVE",
2021            "EnableKinesisStreamingConfiguration": {
2022                "ApproximateCreationDateTimePrecision": precision
2023            }
2024        }))
2025    }
2026
2027    fn disable_kinesis_streaming_destination(
2028        &self,
2029        req: &AwsRequest,
2030    ) -> Result<AwsResponse, AwsServiceError> {
2031        let body = Self::parse_body(req)?;
2032        let table_name = require_str(&body, "TableName")?;
2033        let stream_arn = require_str(&body, "StreamArn")?;
2034
2035        let mut state = self.state.write();
2036        let table = get_table_mut(&mut state.tables, table_name)?;
2037
2038        if let Some(dest) = table
2039            .kinesis_destinations
2040            .iter_mut()
2041            .find(|d| d.stream_arn == stream_arn)
2042        {
2043            dest.destination_status = "DISABLED".to_string();
2044        }
2045
2046        Self::ok_json(json!({
2047            "TableName": table_name,
2048            "StreamArn": stream_arn,
2049            "DestinationStatus": "DISABLED"
2050        }))
2051    }
2052
2053    fn describe_kinesis_streaming_destination(
2054        &self,
2055        req: &AwsRequest,
2056    ) -> Result<AwsResponse, AwsServiceError> {
2057        let body = Self::parse_body(req)?;
2058        let table_name = require_str(&body, "TableName")?;
2059
2060        let state = self.state.read();
2061        let table = get_table(&state.tables, table_name)?;
2062
2063        let destinations: Vec<Value> = table
2064            .kinesis_destinations
2065            .iter()
2066            .map(|d| {
2067                json!({
2068                    "StreamArn": d.stream_arn,
2069                    "DestinationStatus": d.destination_status,
2070                    "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2071                })
2072            })
2073            .collect();
2074
2075        Self::ok_json(json!({
2076            "TableName": table_name,
2077            "KinesisDataStreamDestinations": destinations
2078        }))
2079    }
2080
2081    fn update_kinesis_streaming_destination(
2082        &self,
2083        req: &AwsRequest,
2084    ) -> Result<AwsResponse, AwsServiceError> {
2085        let body = Self::parse_body(req)?;
2086        let table_name = require_str(&body, "TableName")?;
2087        let stream_arn = require_str(&body, "StreamArn")?;
2088        let precision = body["UpdateKinesisStreamingConfiguration"]
2089            ["ApproximateCreationDateTimePrecision"]
2090            .as_str()
2091            .unwrap_or("MILLISECOND");
2092
2093        let mut state = self.state.write();
2094        let table = get_table_mut(&mut state.tables, table_name)?;
2095
2096        if let Some(dest) = table
2097            .kinesis_destinations
2098            .iter_mut()
2099            .find(|d| d.stream_arn == stream_arn)
2100        {
2101            dest.approximate_creation_date_time_precision = precision.to_string();
2102        }
2103
2104        Self::ok_json(json!({
2105            "TableName": table_name,
2106            "StreamArn": stream_arn,
2107            "DestinationStatus": "ACTIVE",
2108            "UpdateKinesisStreamingConfiguration": {
2109                "ApproximateCreationDateTimePrecision": precision
2110            }
2111        }))
2112    }
2113
2114    // ── Contributor Insights ───────────────────────────────────────────
2115
2116    fn describe_contributor_insights(
2117        &self,
2118        req: &AwsRequest,
2119    ) -> Result<AwsResponse, AwsServiceError> {
2120        let body = Self::parse_body(req)?;
2121        let table_name = require_str(&body, "TableName")?;
2122        let index_name = body["IndexName"].as_str();
2123
2124        let state = self.state.read();
2125        let table = get_table(&state.tables, table_name)?;
2126
2127        let mut result = json!({
2128            "TableName": table_name,
2129            "ContributorInsightsStatus": table.contributor_insights_status,
2130            "ContributorInsightsRuleList": []
2131        });
2132        if let Some(idx) = index_name {
2133            result["IndexName"] = json!(idx);
2134        }
2135
2136        Self::ok_json(result)
2137    }
2138
2139    fn update_contributor_insights(
2140        &self,
2141        req: &AwsRequest,
2142    ) -> Result<AwsResponse, AwsServiceError> {
2143        let body = Self::parse_body(req)?;
2144        let table_name = require_str(&body, "TableName")?;
2145        let action = require_str(&body, "ContributorInsightsAction")?;
2146        let index_name = body["IndexName"].as_str();
2147
2148        let mut state = self.state.write();
2149        let table = get_table_mut(&mut state.tables, table_name)?;
2150
2151        let status = match action {
2152            "ENABLE" => "ENABLED",
2153            "DISABLE" => "DISABLED",
2154            _ => {
2155                return Err(AwsServiceError::aws_error(
2156                    StatusCode::BAD_REQUEST,
2157                    "ValidationException",
2158                    format!("Invalid ContributorInsightsAction: {action}"),
2159                ))
2160            }
2161        };
2162        table.contributor_insights_status = status.to_string();
2163
2164        let mut result = json!({
2165            "TableName": table_name,
2166            "ContributorInsightsStatus": status
2167        });
2168        if let Some(idx) = index_name {
2169            result["IndexName"] = json!(idx);
2170        }
2171
2172        Self::ok_json(result)
2173    }
2174
2175    fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2176        let body = Self::parse_body(req)?;
2177        let table_name = body["TableName"].as_str();
2178
2179        let state = self.state.read();
2180        let summaries: Vec<Value> = state
2181            .tables
2182            .values()
2183            .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2184            .map(|t| {
2185                json!({
2186                    "TableName": t.name,
2187                    "ContributorInsightsStatus": t.contributor_insights_status
2188                })
2189            })
2190            .collect();
2191
2192        Self::ok_json(json!({
2193            "ContributorInsightsSummaries": summaries
2194        }))
2195    }
2196
2197    // ── Import/Export ──────────────────────────────────────────────────
2198
2199    fn export_table_to_point_in_time(
2200        &self,
2201        req: &AwsRequest,
2202    ) -> Result<AwsResponse, AwsServiceError> {
2203        let body = Self::parse_body(req)?;
2204        let table_arn = require_str(&body, "TableArn")?;
2205        let s3_bucket = require_str(&body, "S3Bucket")?;
2206        let s3_prefix = body["S3Prefix"].as_str();
2207        let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2208
2209        let state = self.state.read();
2210        // Verify table exists
2211        find_table_by_arn(&state.tables, table_arn)?;
2212
2213        let now = Utc::now();
2214        let export_arn = format!(
2215            "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2216            state.region,
2217            state.account_id,
2218            table_arn.rsplit('/').next().unwrap_or("unknown"),
2219            uuid::Uuid::new_v4()
2220        );
2221
2222        let export = ExportDescription {
2223            export_arn: export_arn.clone(),
2224            export_status: "COMPLETED".to_string(),
2225            table_arn: table_arn.to_string(),
2226            s3_bucket: s3_bucket.to_string(),
2227            s3_prefix: s3_prefix.map(|s| s.to_string()),
2228            export_format: export_format.to_string(),
2229            start_time: now,
2230            end_time: now,
2231            export_time: now,
2232            item_count: 0,
2233            billed_size_bytes: 0,
2234        };
2235
2236        drop(state);
2237        let mut state = self.state.write();
2238        state.exports.insert(export_arn.clone(), export);
2239
2240        Self::ok_json(json!({
2241            "ExportDescription": {
2242                "ExportArn": export_arn,
2243                "ExportStatus": "COMPLETED",
2244                "TableArn": table_arn,
2245                "S3Bucket": s3_bucket,
2246                "S3Prefix": s3_prefix,
2247                "ExportFormat": export_format,
2248                "StartTime": now.timestamp() as f64,
2249                "EndTime": now.timestamp() as f64,
2250                "ExportTime": now.timestamp() as f64,
2251                "ItemCount": 0,
2252                "BilledSizeBytes": 0
2253            }
2254        }))
2255    }
2256
2257    fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2258        let body = Self::parse_body(req)?;
2259        let export_arn = require_str(&body, "ExportArn")?;
2260
2261        let state = self.state.read();
2262        let export = state.exports.get(export_arn).ok_or_else(|| {
2263            AwsServiceError::aws_error(
2264                StatusCode::BAD_REQUEST,
2265                "ExportNotFoundException",
2266                format!("Export not found: {export_arn}"),
2267            )
2268        })?;
2269
2270        Self::ok_json(json!({
2271            "ExportDescription": {
2272                "ExportArn": export.export_arn,
2273                "ExportStatus": export.export_status,
2274                "TableArn": export.table_arn,
2275                "S3Bucket": export.s3_bucket,
2276                "S3Prefix": export.s3_prefix,
2277                "ExportFormat": export.export_format,
2278                "StartTime": export.start_time.timestamp() as f64,
2279                "EndTime": export.end_time.timestamp() as f64,
2280                "ExportTime": export.export_time.timestamp() as f64,
2281                "ItemCount": export.item_count,
2282                "BilledSizeBytes": export.billed_size_bytes
2283            }
2284        }))
2285    }
2286
2287    fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2288        let body = Self::parse_body(req)?;
2289        let table_arn = body["TableArn"].as_str();
2290
2291        let state = self.state.read();
2292        let summaries: Vec<Value> = state
2293            .exports
2294            .values()
2295            .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2296            .map(|e| {
2297                json!({
2298                    "ExportArn": e.export_arn,
2299                    "ExportStatus": e.export_status,
2300                    "TableArn": e.table_arn
2301                })
2302            })
2303            .collect();
2304
2305        Self::ok_json(json!({
2306            "ExportSummaries": summaries
2307        }))
2308    }
2309
2310    fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2311        let body = Self::parse_body(req)?;
2312        let input_format = require_str(&body, "InputFormat")?;
2313        let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2314            AwsServiceError::aws_error(
2315                StatusCode::BAD_REQUEST,
2316                "ValidationException",
2317                "S3BucketSource is required",
2318            )
2319        })?;
2320        let s3_bucket = s3_source
2321            .get("S3Bucket")
2322            .and_then(|v| v.as_str())
2323            .unwrap_or("");
2324
2325        let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2326            AwsServiceError::aws_error(
2327                StatusCode::BAD_REQUEST,
2328                "ValidationException",
2329                "TableCreationParameters is required",
2330            )
2331        })?;
2332        let table_name = table_params
2333            .get("TableName")
2334            .and_then(|v| v.as_str())
2335            .ok_or_else(|| {
2336                AwsServiceError::aws_error(
2337                    StatusCode::BAD_REQUEST,
2338                    "ValidationException",
2339                    "TableCreationParameters.TableName is required",
2340                )
2341            })?;
2342
2343        let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
2344        let attribute_definitions = parse_attribute_definitions(
2345            table_params
2346                .get("AttributeDefinitions")
2347                .unwrap_or(&Value::Null),
2348        )?;
2349
2350        let mut state = self.state.write();
2351
2352        if state.tables.contains_key(table_name) {
2353            return Err(AwsServiceError::aws_error(
2354                StatusCode::BAD_REQUEST,
2355                "ResourceInUseException",
2356                format!("Table already exists: {table_name}"),
2357            ));
2358        }
2359
2360        let now = Utc::now();
2361        let table_arn = format!(
2362            "arn:aws:dynamodb:{}:{}:table/{}",
2363            state.region, state.account_id, table_name
2364        );
2365        let import_arn = format!(
2366            "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
2367            state.region,
2368            state.account_id,
2369            table_name,
2370            uuid::Uuid::new_v4()
2371        );
2372
2373        let table = DynamoTable {
2374            name: table_name.to_string(),
2375            arn: table_arn.clone(),
2376            key_schema,
2377            attribute_definitions,
2378            provisioned_throughput: ProvisionedThroughput {
2379                read_capacity_units: 0,
2380                write_capacity_units: 0,
2381            },
2382            items: Vec::new(),
2383            gsi: Vec::new(),
2384            lsi: Vec::new(),
2385            tags: HashMap::new(),
2386            created_at: now,
2387            status: "ACTIVE".to_string(),
2388            item_count: 0,
2389            size_bytes: 0,
2390            billing_mode: "PAY_PER_REQUEST".to_string(),
2391            ttl_attribute: None,
2392            ttl_enabled: false,
2393            resource_policy: None,
2394            pitr_enabled: false,
2395            kinesis_destinations: Vec::new(),
2396            contributor_insights_status: "DISABLED".to_string(),
2397        };
2398        state.tables.insert(table_name.to_string(), table);
2399
2400        let import_desc = ImportDescription {
2401            import_arn: import_arn.clone(),
2402            import_status: "COMPLETED".to_string(),
2403            table_arn: table_arn.clone(),
2404            table_name: table_name.to_string(),
2405            s3_bucket_source: s3_bucket.to_string(),
2406            input_format: input_format.to_string(),
2407            start_time: now,
2408            end_time: now,
2409            processed_item_count: 0,
2410            processed_size_bytes: 0,
2411        };
2412        state.imports.insert(import_arn.clone(), import_desc);
2413
2414        Self::ok_json(json!({
2415            "ImportTableDescription": {
2416                "ImportArn": import_arn,
2417                "ImportStatus": "COMPLETED",
2418                "TableArn": table_arn,
2419                "TableId": uuid::Uuid::new_v4().to_string(),
2420                "S3BucketSource": {
2421                    "S3Bucket": s3_bucket
2422                },
2423                "InputFormat": input_format,
2424                "TableCreationParameters": {
2425                    "TableName": table_name
2426                },
2427                "StartTime": now.timestamp() as f64,
2428                "EndTime": now.timestamp() as f64,
2429                "ProcessedItemCount": 0,
2430                "ProcessedSizeBytes": 0
2431            }
2432        }))
2433    }
2434
2435    fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2436        let body = Self::parse_body(req)?;
2437        let import_arn = require_str(&body, "ImportArn")?;
2438
2439        let state = self.state.read();
2440        let import = state.imports.get(import_arn).ok_or_else(|| {
2441            AwsServiceError::aws_error(
2442                StatusCode::BAD_REQUEST,
2443                "ImportNotFoundException",
2444                format!("Import not found: {import_arn}"),
2445            )
2446        })?;
2447
2448        Self::ok_json(json!({
2449            "ImportTableDescription": {
2450                "ImportArn": import.import_arn,
2451                "ImportStatus": import.import_status,
2452                "TableArn": import.table_arn,
2453                "S3BucketSource": {
2454                    "S3Bucket": import.s3_bucket_source
2455                },
2456                "InputFormat": import.input_format,
2457                "StartTime": import.start_time.timestamp() as f64,
2458                "EndTime": import.end_time.timestamp() as f64,
2459                "ProcessedItemCount": import.processed_item_count,
2460                "ProcessedSizeBytes": import.processed_size_bytes
2461            }
2462        }))
2463    }
2464
2465    fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2466        let body = Self::parse_body(req)?;
2467        let table_arn = body["TableArn"].as_str();
2468
2469        let state = self.state.read();
2470        let summaries: Vec<Value> = state
2471            .imports
2472            .values()
2473            .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
2474            .map(|i| {
2475                json!({
2476                    "ImportArn": i.import_arn,
2477                    "ImportStatus": i.import_status,
2478                    "TableArn": i.table_arn
2479                })
2480            })
2481            .collect();
2482
2483        Self::ok_json(json!({
2484            "ImportSummaryList": summaries
2485        }))
2486    }
2487}
2488
2489#[async_trait]
2490impl AwsService for DynamoDbService {
2491    fn service_name(&self) -> &str {
2492        "dynamodb"
2493    }
2494
2495    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2496        match req.action.as_str() {
2497            "CreateTable" => self.create_table(&req),
2498            "DeleteTable" => self.delete_table(&req),
2499            "DescribeTable" => self.describe_table(&req),
2500            "ListTables" => self.list_tables(&req),
2501            "UpdateTable" => self.update_table(&req),
2502            "PutItem" => self.put_item(&req),
2503            "GetItem" => self.get_item(&req),
2504            "DeleteItem" => self.delete_item(&req),
2505            "UpdateItem" => self.update_item(&req),
2506            "Query" => self.query(&req),
2507            "Scan" => self.scan(&req),
2508            "BatchGetItem" => self.batch_get_item(&req),
2509            "BatchWriteItem" => self.batch_write_item(&req),
2510            "TagResource" => self.tag_resource(&req),
2511            "UntagResource" => self.untag_resource(&req),
2512            "ListTagsOfResource" => self.list_tags_of_resource(&req),
2513            "TransactGetItems" => self.transact_get_items(&req),
2514            "TransactWriteItems" => self.transact_write_items(&req),
2515            "ExecuteStatement" => self.execute_statement(&req),
2516            "BatchExecuteStatement" => self.batch_execute_statement(&req),
2517            "ExecuteTransaction" => self.execute_transaction(&req),
2518            "UpdateTimeToLive" => self.update_time_to_live(&req),
2519            "DescribeTimeToLive" => self.describe_time_to_live(&req),
2520            "PutResourcePolicy" => self.put_resource_policy(&req),
2521            "GetResourcePolicy" => self.get_resource_policy(&req),
2522            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
2523            // Stubs
2524            "DescribeEndpoints" => self.describe_endpoints(&req),
2525            "DescribeLimits" => self.describe_limits(&req),
2526            // Backups
2527            "CreateBackup" => self.create_backup(&req),
2528            "DeleteBackup" => self.delete_backup(&req),
2529            "DescribeBackup" => self.describe_backup(&req),
2530            "ListBackups" => self.list_backups(&req),
2531            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
2532            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
2533            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
2534            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
2535            // Global tables
2536            "CreateGlobalTable" => self.create_global_table(&req),
2537            "DescribeGlobalTable" => self.describe_global_table(&req),
2538            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
2539            "ListGlobalTables" => self.list_global_tables(&req),
2540            "UpdateGlobalTable" => self.update_global_table(&req),
2541            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
2542            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
2543            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
2544            // Kinesis streaming
2545            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
2546            "DisableKinesisStreamingDestination" => {
2547                self.disable_kinesis_streaming_destination(&req)
2548            }
2549            "DescribeKinesisStreamingDestination" => {
2550                self.describe_kinesis_streaming_destination(&req)
2551            }
2552            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
2553            // Contributor insights
2554            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
2555            "UpdateContributorInsights" => self.update_contributor_insights(&req),
2556            "ListContributorInsights" => self.list_contributor_insights(&req),
2557            // Import/Export
2558            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
2559            "DescribeExport" => self.describe_export(&req),
2560            "ListExports" => self.list_exports(&req),
2561            "ImportTable" => self.import_table(&req),
2562            "DescribeImport" => self.describe_import(&req),
2563            "ListImports" => self.list_imports(&req),
2564            _ => Err(AwsServiceError::action_not_implemented(
2565                "dynamodb",
2566                &req.action,
2567            )),
2568        }
2569    }
2570
2571    fn supported_actions(&self) -> &[&str] {
2572        &[
2573            "CreateTable",
2574            "DeleteTable",
2575            "DescribeTable",
2576            "ListTables",
2577            "UpdateTable",
2578            "PutItem",
2579            "GetItem",
2580            "DeleteItem",
2581            "UpdateItem",
2582            "Query",
2583            "Scan",
2584            "BatchGetItem",
2585            "BatchWriteItem",
2586            "TagResource",
2587            "UntagResource",
2588            "ListTagsOfResource",
2589            "TransactGetItems",
2590            "TransactWriteItems",
2591            "ExecuteStatement",
2592            "BatchExecuteStatement",
2593            "ExecuteTransaction",
2594            "UpdateTimeToLive",
2595            "DescribeTimeToLive",
2596            "PutResourcePolicy",
2597            "GetResourcePolicy",
2598            "DeleteResourcePolicy",
2599            "DescribeEndpoints",
2600            "DescribeLimits",
2601            "CreateBackup",
2602            "DeleteBackup",
2603            "DescribeBackup",
2604            "ListBackups",
2605            "RestoreTableFromBackup",
2606            "RestoreTableToPointInTime",
2607            "UpdateContinuousBackups",
2608            "DescribeContinuousBackups",
2609            "CreateGlobalTable",
2610            "DescribeGlobalTable",
2611            "DescribeGlobalTableSettings",
2612            "ListGlobalTables",
2613            "UpdateGlobalTable",
2614            "UpdateGlobalTableSettings",
2615            "DescribeTableReplicaAutoScaling",
2616            "UpdateTableReplicaAutoScaling",
2617            "EnableKinesisStreamingDestination",
2618            "DisableKinesisStreamingDestination",
2619            "DescribeKinesisStreamingDestination",
2620            "UpdateKinesisStreamingDestination",
2621            "DescribeContributorInsights",
2622            "UpdateContributorInsights",
2623            "ListContributorInsights",
2624            "ExportTableToPointInTime",
2625            "DescribeExport",
2626            "ListExports",
2627            "ImportTable",
2628            "DescribeImport",
2629            "ListImports",
2630        ]
2631    }
2632}
2633
2634// ── Helper functions ────────────────────────────────────────────────────
2635
2636fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
2637    body[field].as_str().ok_or_else(|| {
2638        AwsServiceError::aws_error(
2639            StatusCode::BAD_REQUEST,
2640            "ValidationException",
2641            format!("{field} is required"),
2642        )
2643    })
2644}
2645
2646fn require_object(
2647    body: &Value,
2648    field: &str,
2649) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
2650    let obj = body[field].as_object().ok_or_else(|| {
2651        AwsServiceError::aws_error(
2652            StatusCode::BAD_REQUEST,
2653            "ValidationException",
2654            format!("{field} is required"),
2655        )
2656    })?;
2657    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
2658}
2659
2660fn get_table<'a>(
2661    tables: &'a HashMap<String, DynamoTable>,
2662    name: &str,
2663) -> Result<&'a DynamoTable, AwsServiceError> {
2664    tables.get(name).ok_or_else(|| {
2665        AwsServiceError::aws_error(
2666            StatusCode::BAD_REQUEST,
2667            "ResourceNotFoundException",
2668            format!("Requested resource not found: Table: {name} not found"),
2669        )
2670    })
2671}
2672
2673fn get_table_mut<'a>(
2674    tables: &'a mut HashMap<String, DynamoTable>,
2675    name: &str,
2676) -> Result<&'a mut DynamoTable, AwsServiceError> {
2677    tables.get_mut(name).ok_or_else(|| {
2678        AwsServiceError::aws_error(
2679            StatusCode::BAD_REQUEST,
2680            "ResourceNotFoundException",
2681            format!("Requested resource not found: Table: {name} not found"),
2682        )
2683    })
2684}
2685
2686fn find_table_by_arn<'a>(
2687    tables: &'a HashMap<String, DynamoTable>,
2688    arn: &str,
2689) -> Result<&'a DynamoTable, AwsServiceError> {
2690    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
2691        AwsServiceError::aws_error(
2692            StatusCode::BAD_REQUEST,
2693            "ResourceNotFoundException",
2694            format!("Requested resource not found: {arn}"),
2695        )
2696    })
2697}
2698
2699fn find_table_by_arn_mut<'a>(
2700    tables: &'a mut HashMap<String, DynamoTable>,
2701    arn: &str,
2702) -> Result<&'a mut DynamoTable, AwsServiceError> {
2703    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
2704        AwsServiceError::aws_error(
2705            StatusCode::BAD_REQUEST,
2706            "ResourceNotFoundException",
2707            format!("Requested resource not found: {arn}"),
2708        )
2709    })
2710}
2711
2712fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
2713    let arr = val.as_array().ok_or_else(|| {
2714        AwsServiceError::aws_error(
2715            StatusCode::BAD_REQUEST,
2716            "ValidationException",
2717            "KeySchema is required",
2718        )
2719    })?;
2720    Ok(arr
2721        .iter()
2722        .map(|elem| KeySchemaElement {
2723            attribute_name: elem["AttributeName"]
2724                .as_str()
2725                .unwrap_or_default()
2726                .to_string(),
2727            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
2728        })
2729        .collect())
2730}
2731
2732fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
2733    let arr = val.as_array().ok_or_else(|| {
2734        AwsServiceError::aws_error(
2735            StatusCode::BAD_REQUEST,
2736            "ValidationException",
2737            "AttributeDefinitions is required",
2738        )
2739    })?;
2740    Ok(arr
2741        .iter()
2742        .map(|elem| AttributeDefinition {
2743            attribute_name: elem["AttributeName"]
2744                .as_str()
2745                .unwrap_or_default()
2746                .to_string(),
2747            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
2748        })
2749        .collect())
2750}
2751
2752fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
2753    Ok(ProvisionedThroughput {
2754        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
2755        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
2756    })
2757}
2758
2759fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
2760    let Some(arr) = val.as_array() else {
2761        return Vec::new();
2762    };
2763    arr.iter()
2764        .filter_map(|g| {
2765            Some(GlobalSecondaryIndex {
2766                index_name: g["IndexName"].as_str()?.to_string(),
2767                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
2768                projection: parse_projection(&g["Projection"]),
2769                provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
2770                    .ok(),
2771            })
2772        })
2773        .collect()
2774}
2775
2776fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
2777    let Some(arr) = val.as_array() else {
2778        return Vec::new();
2779    };
2780    arr.iter()
2781        .filter_map(|l| {
2782            Some(LocalSecondaryIndex {
2783                index_name: l["IndexName"].as_str()?.to_string(),
2784                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
2785                projection: parse_projection(&l["Projection"]),
2786            })
2787        })
2788        .collect()
2789}
2790
2791fn parse_projection(val: &Value) -> Projection {
2792    Projection {
2793        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
2794        non_key_attributes: val["NonKeyAttributes"]
2795            .as_array()
2796            .map(|arr| {
2797                arr.iter()
2798                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2799                    .collect()
2800            })
2801            .unwrap_or_default(),
2802    }
2803}
2804
2805fn parse_tags(val: &Value) -> HashMap<String, String> {
2806    let mut tags = HashMap::new();
2807    if let Some(arr) = val.as_array() {
2808        for tag in arr {
2809            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
2810                tags.insert(k.to_string(), v.to_string());
2811            }
2812        }
2813    }
2814    tags
2815}
2816
2817fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
2818    let mut names = HashMap::new();
2819    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
2820        for (k, v) in obj {
2821            if let Some(s) = v.as_str() {
2822                names.insert(k.clone(), s.to_string());
2823            }
2824        }
2825    }
2826    names
2827}
2828
2829fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
2830    let mut values = HashMap::new();
2831    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
2832        for (k, v) in obj {
2833            values.insert(k.clone(), v.clone());
2834        }
2835    }
2836    values
2837}
2838
2839fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
2840    if name.starts_with('#') {
2841        expr_attr_names
2842            .get(name)
2843            .cloned()
2844            .unwrap_or_else(|| name.to_string())
2845    } else {
2846        name.to_string()
2847    }
2848}
2849
2850fn extract_key(
2851    table: &DynamoTable,
2852    item: &HashMap<String, AttributeValue>,
2853) -> HashMap<String, AttributeValue> {
2854    let mut key = HashMap::new();
2855    let hash_key = table.hash_key_name();
2856    if let Some(v) = item.get(hash_key) {
2857        key.insert(hash_key.to_string(), v.clone());
2858    }
2859    if let Some(range_key) = table.range_key_name() {
2860        if let Some(v) = item.get(range_key) {
2861            key.insert(range_key.to_string(), v.clone());
2862        }
2863    }
2864    key
2865}
2866
2867fn validate_key_in_item(
2868    table: &DynamoTable,
2869    item: &HashMap<String, AttributeValue>,
2870) -> Result<(), AwsServiceError> {
2871    let hash_key = table.hash_key_name();
2872    if !item.contains_key(hash_key) {
2873        return Err(AwsServiceError::aws_error(
2874            StatusCode::BAD_REQUEST,
2875            "ValidationException",
2876            format!("Missing the key {hash_key} in the item"),
2877        ));
2878    }
2879    if let Some(range_key) = table.range_key_name() {
2880        if !item.contains_key(range_key) {
2881            return Err(AwsServiceError::aws_error(
2882                StatusCode::BAD_REQUEST,
2883                "ValidationException",
2884                format!("Missing the key {range_key} in the item"),
2885            ));
2886        }
2887    }
2888    Ok(())
2889}
2890
2891fn validate_key_attributes_in_key(
2892    table: &DynamoTable,
2893    key: &HashMap<String, AttributeValue>,
2894) -> Result<(), AwsServiceError> {
2895    let hash_key = table.hash_key_name();
2896    if !key.contains_key(hash_key) {
2897        return Err(AwsServiceError::aws_error(
2898            StatusCode::BAD_REQUEST,
2899            "ValidationException",
2900            format!("Missing the key {hash_key} in the item"),
2901        ));
2902    }
2903    Ok(())
2904}
2905
2906fn project_item(
2907    item: &HashMap<String, AttributeValue>,
2908    body: &Value,
2909) -> HashMap<String, AttributeValue> {
2910    let projection = body["ProjectionExpression"].as_str();
2911    match projection {
2912        Some(proj) if !proj.is_empty() => {
2913            let expr_attr_names = parse_expression_attribute_names(body);
2914            let attrs: Vec<String> = proj
2915                .split(',')
2916                .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
2917                .collect();
2918            let mut result = HashMap::new();
2919            for attr in &attrs {
2920                if let Some(v) = resolve_nested_path(item, attr) {
2921                    insert_nested_value(&mut result, attr, v);
2922                }
2923            }
2924            result
2925        }
2926        _ => item.clone(),
2927    }
2928}
2929
2930/// Resolve expression attribute names within each segment of a projection path.
2931/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
2932fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
2933    // Split on dots, resolve each part, rejoin
2934    let mut result = String::new();
2935    for (i, segment) in path.split('.').enumerate() {
2936        if i > 0 {
2937            result.push('.');
2938        }
2939        // A segment might be like "#n" or "people[0]" or "#attr[0]"
2940        if let Some(bracket_pos) = segment.find('[') {
2941            let key_part = &segment[..bracket_pos];
2942            let index_part = &segment[bracket_pos..];
2943            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
2944            result.push_str(index_part);
2945        } else {
2946            result.push_str(&resolve_attr_name(segment, expr_attr_names));
2947        }
2948    }
2949    result
2950}
2951
2952/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
2953fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
2954    let segments = parse_path_segments(path);
2955    if segments.is_empty() {
2956        return None;
2957    }
2958
2959    let first = &segments[0];
2960    let top_key = match first {
2961        PathSegment::Key(k) => k.as_str(),
2962        _ => return None,
2963    };
2964
2965    let mut current = item.get(top_key)?.clone();
2966
2967    for segment in &segments[1..] {
2968        match segment {
2969            PathSegment::Key(k) => {
2970                // Navigate into a Map: {"M": {"key": ...}}
2971                current = current.get("M")?.get(k)?.clone();
2972            }
2973            PathSegment::Index(idx) => {
2974                // Navigate into a List: {"L": [...]}
2975                current = current.get("L")?.get(*idx)?.clone();
2976            }
2977        }
2978    }
2979
2980    Some(current)
2981}
2982
2983#[derive(Debug)]
2984enum PathSegment {
2985    Key(String),
2986    Index(usize),
2987}
2988
2989/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
2990fn parse_path_segments(path: &str) -> Vec<PathSegment> {
2991    let mut segments = Vec::new();
2992    let mut current = String::new();
2993
2994    let chars: Vec<char> = path.chars().collect();
2995    let mut i = 0;
2996    while i < chars.len() {
2997        match chars[i] {
2998            '.' => {
2999                if !current.is_empty() {
3000                    segments.push(PathSegment::Key(current.clone()));
3001                    current.clear();
3002                }
3003            }
3004            '[' => {
3005                if !current.is_empty() {
3006                    segments.push(PathSegment::Key(current.clone()));
3007                    current.clear();
3008                }
3009                i += 1;
3010                let mut num = String::new();
3011                while i < chars.len() && chars[i] != ']' {
3012                    num.push(chars[i]);
3013                    i += 1;
3014                }
3015                if let Ok(idx) = num.parse::<usize>() {
3016                    segments.push(PathSegment::Index(idx));
3017                }
3018                // skip ']'
3019            }
3020            c => {
3021                current.push(c);
3022            }
3023        }
3024        i += 1;
3025    }
3026    if !current.is_empty() {
3027        segments.push(PathSegment::Key(current));
3028    }
3029    segments
3030}
3031
3032/// Insert a value at a nested path in the result HashMap.
3033/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
3034fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3035    // Simple case: no nesting
3036    if !path.contains('.') && !path.contains('[') {
3037        result.insert(path.to_string(), value);
3038        return;
3039    }
3040
3041    let segments = parse_path_segments(path);
3042    if segments.is_empty() {
3043        return;
3044    }
3045
3046    let top_key = match &segments[0] {
3047        PathSegment::Key(k) => k.clone(),
3048        _ => return,
3049    };
3050
3051    if segments.len() == 1 {
3052        result.insert(top_key, value);
3053        return;
3054    }
3055
3056    // For nested paths, wrap the value back into the nested structure
3057    let wrapped = wrap_value_in_path(&segments[1..], value);
3058    // Merge into existing value if present
3059    let existing = result.remove(&top_key);
3060    let merged = match existing {
3061        Some(existing) => merge_attribute_values(existing, wrapped),
3062        None => wrapped,
3063    };
3064    result.insert(top_key, merged);
3065}
3066
3067/// Wrap a value in the nested path structure.
3068fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3069    if segments.is_empty() {
3070        return value;
3071    }
3072    let inner = wrap_value_in_path(&segments[1..], value);
3073    match &segments[0] {
3074        PathSegment::Key(k) => {
3075            json!({"M": {k.clone(): inner}})
3076        }
3077        PathSegment::Index(idx) => {
3078            let mut arr = vec![Value::Null; idx + 1];
3079            arr[*idx] = inner;
3080            json!({"L": arr})
3081        }
3082    }
3083}
3084
3085/// Merge two attribute values (for overlapping projections).
3086fn merge_attribute_values(a: Value, b: Value) -> Value {
3087    if let (Some(a_map), Some(b_map)) = (
3088        a.get("M").and_then(|v| v.as_object()),
3089        b.get("M").and_then(|v| v.as_object()),
3090    ) {
3091        let mut merged = a_map.clone();
3092        for (k, v) in b_map {
3093            if let Some(existing) = merged.get(k) {
3094                merged.insert(
3095                    k.clone(),
3096                    merge_attribute_values(existing.clone(), v.clone()),
3097                );
3098            } else {
3099                merged.insert(k.clone(), v.clone());
3100            }
3101        }
3102        json!({"M": merged})
3103    } else {
3104        b
3105    }
3106}
3107
3108fn evaluate_condition(
3109    condition: &str,
3110    existing: Option<&HashMap<String, AttributeValue>>,
3111    expr_attr_names: &HashMap<String, String>,
3112    expr_attr_values: &HashMap<String, Value>,
3113) -> Result<(), AwsServiceError> {
3114    let cond = condition.trim();
3115
3116    if let Some(inner) = extract_function_arg(cond, "attribute_not_exists") {
3117        let attr = resolve_attr_name(inner, expr_attr_names);
3118        match existing {
3119            Some(item) if item.contains_key(&attr) => {
3120                return Err(AwsServiceError::aws_error(
3121                    StatusCode::BAD_REQUEST,
3122                    "ConditionalCheckFailedException",
3123                    "The conditional request failed",
3124                ));
3125            }
3126            _ => return Ok(()),
3127        }
3128    }
3129
3130    if let Some(inner) = extract_function_arg(cond, "attribute_exists") {
3131        let attr = resolve_attr_name(inner, expr_attr_names);
3132        match existing {
3133            Some(item) if item.contains_key(&attr) => return Ok(()),
3134            _ => {
3135                return Err(AwsServiceError::aws_error(
3136                    StatusCode::BAD_REQUEST,
3137                    "ConditionalCheckFailedException",
3138                    "The conditional request failed",
3139                ));
3140            }
3141        }
3142    }
3143
3144    if let Some((left, op, right)) = parse_simple_comparison(cond) {
3145        let attr_name = resolve_attr_name(left.trim(), expr_attr_names);
3146        let expected = expr_attr_values.get(right.trim());
3147        let actual = existing.and_then(|item| item.get(&attr_name));
3148
3149        let result = match op {
3150            "=" => actual == expected,
3151            "<>" => actual != expected,
3152            _ => true,
3153        };
3154
3155        if !result {
3156            return Err(AwsServiceError::aws_error(
3157                StatusCode::BAD_REQUEST,
3158                "ConditionalCheckFailedException",
3159                "The conditional request failed",
3160            ));
3161        }
3162    }
3163
3164    Ok(())
3165}
3166
3167fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3168    let prefix = format!("{func_name}(");
3169    if let Some(rest) = expr.strip_prefix(&prefix) {
3170        if let Some(inner) = rest.strip_suffix(')') {
3171            return Some(inner.trim());
3172        }
3173    }
3174    None
3175}
3176
3177fn parse_simple_comparison(expr: &str) -> Option<(&str, &str, &str)> {
3178    for op in &["<>", "=", "<", ">", "<=", ">="] {
3179        if let Some(pos) = expr.find(op) {
3180            let left = &expr[..pos];
3181            let right = &expr[pos + op.len()..];
3182            return Some((left, op, right));
3183        }
3184    }
3185    None
3186}
3187
3188fn evaluate_key_condition(
3189    expr: &str,
3190    item: &HashMap<String, AttributeValue>,
3191    hash_key_name: &str,
3192    _range_key_name: Option<&str>,
3193    expr_attr_names: &HashMap<String, String>,
3194    expr_attr_values: &HashMap<String, Value>,
3195) -> bool {
3196    let parts: Vec<&str> = split_on_and(expr);
3197    for part in &parts {
3198        let part = part.trim();
3199        if !evaluate_single_key_condition(
3200            part,
3201            item,
3202            hash_key_name,
3203            expr_attr_names,
3204            expr_attr_values,
3205        ) {
3206            return false;
3207        }
3208    }
3209    true
3210}
3211
3212fn split_on_and(expr: &str) -> Vec<&str> {
3213    let mut parts = Vec::new();
3214    let mut start = 0;
3215    let len = expr.len();
3216    let mut i = 0;
3217    let mut depth = 0;
3218    while i < len {
3219        let ch = expr.as_bytes()[i];
3220        if ch == b'(' {
3221            depth += 1;
3222        } else if ch == b')' {
3223            if depth > 0 {
3224                depth -= 1;
3225            }
3226        } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
3227            parts.push(&expr[start..i]);
3228            start = i + 5;
3229            i = start;
3230            continue;
3231        }
3232        i += 1;
3233    }
3234    parts.push(&expr[start..]);
3235    parts
3236}
3237
3238fn split_on_or(expr: &str) -> Vec<&str> {
3239    let mut parts = Vec::new();
3240    let mut start = 0;
3241    let len = expr.len();
3242    let mut i = 0;
3243    let mut depth = 0;
3244    while i < len {
3245        let ch = expr.as_bytes()[i];
3246        if ch == b'(' {
3247            depth += 1;
3248        } else if ch == b')' {
3249            if depth > 0 {
3250                depth -= 1;
3251            }
3252        } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
3253            parts.push(&expr[start..i]);
3254            start = i + 4;
3255            i = start;
3256            continue;
3257        }
3258        i += 1;
3259    }
3260    parts.push(&expr[start..]);
3261    parts
3262}
3263
3264fn evaluate_single_key_condition(
3265    part: &str,
3266    item: &HashMap<String, AttributeValue>,
3267    _hash_key_name: &str,
3268    expr_attr_names: &HashMap<String, String>,
3269    expr_attr_values: &HashMap<String, Value>,
3270) -> bool {
3271    let part = part.trim();
3272
3273    // begins_with(attr, :val)
3274    if let Some(rest) = part
3275        .strip_prefix("begins_with(")
3276        .or_else(|| part.strip_prefix("begins_with ("))
3277    {
3278        if let Some(inner) = rest.strip_suffix(')') {
3279            let mut split = inner.splitn(2, ',');
3280            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3281                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3282                let val_ref = val_ref.trim();
3283                let expected = expr_attr_values.get(val_ref);
3284                let actual = item.get(&attr_name);
3285                return match (actual, expected) {
3286                    (Some(a), Some(e)) => {
3287                        let a_str = extract_string_value(a);
3288                        let e_str = extract_string_value(e);
3289                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3290                    }
3291                    _ => false,
3292                };
3293            }
3294        }
3295        return false;
3296    }
3297
3298    // BETWEEN
3299    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
3300        let attr_part = part[..between_pos].trim();
3301        let attr_name = resolve_attr_name(attr_part, expr_attr_names);
3302        let range_part = &part[between_pos + 7..];
3303        if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
3304            let lo_ref = range_part[..and_pos].trim();
3305            let hi_ref = range_part[and_pos + 5..].trim();
3306            let lo = expr_attr_values.get(lo_ref);
3307            let hi = expr_attr_values.get(hi_ref);
3308            let actual = item.get(&attr_name);
3309            return match (actual, lo, hi) {
3310                (Some(a), Some(l), Some(h)) => {
3311                    compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
3312                        && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
3313                }
3314                _ => false,
3315            };
3316        }
3317    }
3318
3319    // Simple comparison: attr <op> :val
3320    for op in &["<=", ">=", "<>", "=", "<", ">"] {
3321        if let Some(pos) = part.find(op) {
3322            let left = part[..pos].trim();
3323            let right = part[pos + op.len()..].trim();
3324            let attr_name = resolve_attr_name(left, expr_attr_names);
3325            let expected = expr_attr_values.get(right);
3326            let actual = item.get(&attr_name);
3327
3328            return match *op {
3329                "=" => actual == expected,
3330                "<>" => actual != expected,
3331                "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
3332                ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
3333                "<=" => {
3334                    let cmp = compare_attribute_values(actual, expected);
3335                    cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
3336                }
3337                ">=" => {
3338                    let cmp = compare_attribute_values(actual, expected);
3339                    cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
3340                }
3341                _ => true,
3342            };
3343        }
3344    }
3345
3346    true
3347}
3348
3349fn extract_string_value(val: &Value) -> Option<String> {
3350    val.get("S")
3351        .and_then(|v| v.as_str())
3352        .map(|s| s.to_string())
3353        .or_else(|| val.get("N").and_then(|v| v.as_str()).map(|n| n.to_string()))
3354}
3355
3356fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
3357    match (a, b) {
3358        (None, None) => std::cmp::Ordering::Equal,
3359        (None, Some(_)) => std::cmp::Ordering::Less,
3360        (Some(_), None) => std::cmp::Ordering::Greater,
3361        (Some(a), Some(b)) => {
3362            let a_type = attribute_type_and_value(a);
3363            let b_type = attribute_type_and_value(b);
3364            match (a_type, b_type) {
3365                (Some(("S", a_val)), Some(("S", b_val))) => {
3366                    let a_str = a_val.as_str().unwrap_or("");
3367                    let b_str = b_val.as_str().unwrap_or("");
3368                    a_str.cmp(b_str)
3369                }
3370                (Some(("N", a_val)), Some(("N", b_val))) => {
3371                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3372                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3373                    a_num
3374                        .partial_cmp(&b_num)
3375                        .unwrap_or(std::cmp::Ordering::Equal)
3376                }
3377                (Some(("B", a_val)), Some(("B", b_val))) => {
3378                    let a_str = a_val.as_str().unwrap_or("");
3379                    let b_str = b_val.as_str().unwrap_or("");
3380                    a_str.cmp(b_str)
3381                }
3382                _ => std::cmp::Ordering::Equal,
3383            }
3384        }
3385    }
3386}
3387
3388fn evaluate_filter_expression(
3389    expr: &str,
3390    item: &HashMap<String, AttributeValue>,
3391    expr_attr_names: &HashMap<String, String>,
3392    expr_attr_values: &HashMap<String, Value>,
3393) -> bool {
3394    let trimmed = expr.trim();
3395
3396    // Split on OR first (lower precedence), respecting parentheses
3397    let or_parts = split_on_or(trimmed);
3398    if or_parts.len() > 1 {
3399        return or_parts.iter().any(|part| {
3400            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3401        });
3402    }
3403
3404    // Then split on AND (higher precedence), respecting parentheses
3405    let and_parts = split_on_and(trimmed);
3406    if and_parts.len() > 1 {
3407        return and_parts.iter().all(|part| {
3408            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3409        });
3410    }
3411
3412    // Strip outer parentheses if present
3413    let stripped = strip_outer_parens(trimmed);
3414    if stripped != trimmed {
3415        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
3416    }
3417
3418    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
3419}
3420
3421/// Strip matching outer parentheses from an expression.
3422fn strip_outer_parens(expr: &str) -> &str {
3423    let trimmed = expr.trim();
3424    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
3425        return trimmed;
3426    }
3427    // Verify the outer parens actually match each other
3428    let inner = &trimmed[1..trimmed.len() - 1];
3429    let mut depth = 0;
3430    for ch in inner.bytes() {
3431        match ch {
3432            b'(' => depth += 1,
3433            b')' => {
3434                if depth == 0 {
3435                    return trimmed; // closing paren matches something inside, not the outer one
3436                }
3437                depth -= 1;
3438            }
3439            _ => {}
3440        }
3441    }
3442    if depth == 0 {
3443        inner
3444    } else {
3445        trimmed
3446    }
3447}
3448
3449fn evaluate_single_filter_condition(
3450    part: &str,
3451    item: &HashMap<String, AttributeValue>,
3452    expr_attr_names: &HashMap<String, String>,
3453    expr_attr_values: &HashMap<String, Value>,
3454) -> bool {
3455    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
3456        let attr = resolve_attr_name(inner, expr_attr_names);
3457        return item.contains_key(&attr);
3458    }
3459
3460    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
3461        let attr = resolve_attr_name(inner, expr_attr_names);
3462        return !item.contains_key(&attr);
3463    }
3464
3465    if let Some(rest) = part
3466        .strip_prefix("begins_with(")
3467        .or_else(|| part.strip_prefix("begins_with ("))
3468    {
3469        if let Some(inner) = rest.strip_suffix(')') {
3470            let mut split = inner.splitn(2, ',');
3471            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3472                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3473                let expected = expr_attr_values.get(val_ref.trim());
3474                let actual = item.get(&attr_name);
3475                return match (actual, expected) {
3476                    (Some(a), Some(e)) => {
3477                        let a_str = extract_string_value(a);
3478                        let e_str = extract_string_value(e);
3479                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3480                    }
3481                    _ => false,
3482                };
3483            }
3484        }
3485    }
3486
3487    if let Some(rest) = part
3488        .strip_prefix("contains(")
3489        .or_else(|| part.strip_prefix("contains ("))
3490    {
3491        if let Some(inner) = rest.strip_suffix(')') {
3492            let mut split = inner.splitn(2, ',');
3493            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3494                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3495                let expected = expr_attr_values.get(val_ref.trim());
3496                let actual = item.get(&attr_name);
3497                return match (actual, expected) {
3498                    (Some(a), Some(e)) => {
3499                        let a_str = extract_string_value(a);
3500                        let e_str = extract_string_value(e);
3501                        matches!((a_str, e_str), (Some(a), Some(e)) if a.contains(&e))
3502                    }
3503                    _ => false,
3504                };
3505            }
3506        }
3507    }
3508
3509    evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
3510}
3511
3512fn apply_update_expression(
3513    item: &mut HashMap<String, AttributeValue>,
3514    expr: &str,
3515    expr_attr_names: &HashMap<String, String>,
3516    expr_attr_values: &HashMap<String, Value>,
3517) -> Result<(), AwsServiceError> {
3518    let clauses = parse_update_clauses(expr);
3519    for (action, assignments) in &clauses {
3520        match action.to_ascii_uppercase().as_str() {
3521            "SET" => {
3522                for assignment in assignments {
3523                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3524                }
3525            }
3526            "REMOVE" => {
3527                for attr_ref in assignments {
3528                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3529                    item.remove(&attr);
3530                }
3531            }
3532            "ADD" => {
3533                for assignment in assignments {
3534                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3535                }
3536            }
3537            "DELETE" => {
3538                for assignment in assignments {
3539                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3540                }
3541            }
3542            _ => {}
3543        }
3544    }
3545    Ok(())
3546}
3547
3548fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
3549    let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
3550    let upper = expr.to_ascii_uppercase();
3551    let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
3552    let mut positions: Vec<(usize, &str)> = Vec::new();
3553
3554    for kw in &keywords {
3555        let mut search_from = 0;
3556        while let Some(pos) = upper[search_from..].find(kw) {
3557            let abs_pos = search_from + pos;
3558            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
3559            let after_pos = abs_pos + kw.len();
3560            let after_ok =
3561                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
3562            if before_ok && after_ok {
3563                positions.push((abs_pos, kw));
3564            }
3565            search_from = abs_pos + kw.len();
3566        }
3567    }
3568
3569    positions.sort_by_key(|(pos, _)| *pos);
3570
3571    for (i, &(pos, kw)) in positions.iter().enumerate() {
3572        let start = pos + kw.len();
3573        let end = if i + 1 < positions.len() {
3574            positions[i + 1].0
3575        } else {
3576            expr.len()
3577        };
3578        let content = expr[start..end].trim();
3579        let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
3580        clauses.push((kw.to_string(), assignments));
3581    }
3582
3583    clauses
3584}
3585
3586fn apply_set_assignment(
3587    item: &mut HashMap<String, AttributeValue>,
3588    assignment: &str,
3589    expr_attr_names: &HashMap<String, String>,
3590    expr_attr_values: &HashMap<String, Value>,
3591) -> Result<(), AwsServiceError> {
3592    let Some((left, right)) = assignment.split_once('=') else {
3593        return Ok(());
3594    };
3595
3596    let attr = resolve_attr_name(left.trim(), expr_attr_names);
3597    let right = right.trim();
3598
3599    // if_not_exists(attr, :val)
3600    if let Some(rest) = right
3601        .strip_prefix("if_not_exists(")
3602        .or_else(|| right.strip_prefix("if_not_exists ("))
3603    {
3604        if let Some(inner) = rest.strip_suffix(')') {
3605            let mut split = inner.splitn(2, ',');
3606            if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
3607                let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
3608                if !item.contains_key(&check_name) {
3609                    if let Some(val) = expr_attr_values.get(default_ref.trim()) {
3610                        item.insert(attr, val.clone());
3611                    }
3612                }
3613                return Ok(());
3614            }
3615        }
3616    }
3617
3618    // list_append(a, b)
3619    if let Some(rest) = right
3620        .strip_prefix("list_append(")
3621        .or_else(|| right.strip_prefix("list_append ("))
3622    {
3623        if let Some(inner) = rest.strip_suffix(')') {
3624            let mut split = inner.splitn(2, ',');
3625            if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
3626                let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
3627                let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
3628
3629                let mut merged = Vec::new();
3630                if let Some(Value::Object(obj)) = &a_val {
3631                    if let Some(Value::Array(arr)) = obj.get("L") {
3632                        merged.extend(arr.clone());
3633                    }
3634                }
3635                if let Some(Value::Object(obj)) = &b_val {
3636                    if let Some(Value::Array(arr)) = obj.get("L") {
3637                        merged.extend(arr.clone());
3638                    }
3639                }
3640
3641                item.insert(attr, json!({"L": merged}));
3642                return Ok(());
3643            }
3644        }
3645    }
3646
3647    // Arithmetic: attr + :val or attr - :val
3648    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
3649        let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
3650        let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
3651
3652        let left_num = extract_number(&left_val).unwrap_or(0.0);
3653        let right_num = extract_number(&right_val).unwrap_or(0.0);
3654
3655        let result = if is_add {
3656            left_num + right_num
3657        } else {
3658            left_num - right_num
3659        };
3660
3661        let num_str = if result == result.trunc() {
3662            format!("{}", result as i64)
3663        } else {
3664            format!("{result}")
3665        };
3666
3667        item.insert(attr, json!({"N": num_str}));
3668        return Ok(());
3669    }
3670
3671    // Simple assignment
3672    let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
3673    if let Some(v) = val {
3674        item.insert(attr, v);
3675    }
3676
3677    Ok(())
3678}
3679
3680fn resolve_value(
3681    reference: &str,
3682    item: &HashMap<String, AttributeValue>,
3683    expr_attr_names: &HashMap<String, String>,
3684    expr_attr_values: &HashMap<String, Value>,
3685) -> Option<Value> {
3686    let reference = reference.trim();
3687    if reference.starts_with(':') {
3688        expr_attr_values.get(reference).cloned()
3689    } else {
3690        let attr_name = resolve_attr_name(reference, expr_attr_names);
3691        item.get(&attr_name).cloned()
3692    }
3693}
3694
3695fn extract_number(val: &Option<Value>) -> Option<f64> {
3696    val.as_ref()
3697        .and_then(|v| v.get("N"))
3698        .and_then(|n| n.as_str())
3699        .and_then(|s| s.parse().ok())
3700}
3701
3702fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
3703    let mut depth = 0;
3704    for (i, c) in expr.char_indices() {
3705        match c {
3706            '(' => depth += 1,
3707            ')' => depth -= 1,
3708            '+' if depth == 0 && i > 0 => {
3709                return Some((&expr[..i], &expr[i + 1..], true));
3710            }
3711            '-' if depth == 0 && i > 0 => {
3712                return Some((&expr[..i], &expr[i + 1..], false));
3713            }
3714            _ => {}
3715        }
3716    }
3717    None
3718}
3719
3720fn apply_add_assignment(
3721    item: &mut HashMap<String, AttributeValue>,
3722    assignment: &str,
3723    expr_attr_names: &HashMap<String, String>,
3724    expr_attr_values: &HashMap<String, Value>,
3725) -> Result<(), AwsServiceError> {
3726    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
3727    if parts.len() != 2 {
3728        return Ok(());
3729    }
3730
3731    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
3732    let val_ref = parts[1].trim();
3733    let add_val = expr_attr_values.get(val_ref);
3734
3735    if let Some(add_val) = add_val {
3736        if let Some(existing) = item.get(&attr) {
3737            if let (Some(existing_num), Some(add_num)) = (
3738                extract_number(&Some(existing.clone())),
3739                extract_number(&Some(add_val.clone())),
3740            ) {
3741                let result = existing_num + add_num;
3742                let num_str = if result == result.trunc() {
3743                    format!("{}", result as i64)
3744                } else {
3745                    format!("{result}")
3746                };
3747                item.insert(attr, json!({"N": num_str}));
3748            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
3749                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
3750                    let mut merged: Vec<Value> = existing_set.clone();
3751                    for v in add_set {
3752                        if !merged.contains(v) {
3753                            merged.push(v.clone());
3754                        }
3755                    }
3756                    item.insert(attr, json!({"SS": merged}));
3757                }
3758            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
3759                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
3760                    let mut merged: Vec<Value> = existing_set.clone();
3761                    for v in add_set {
3762                        if !merged.contains(v) {
3763                            merged.push(v.clone());
3764                        }
3765                    }
3766                    item.insert(attr, json!({"NS": merged}));
3767                }
3768            }
3769        } else {
3770            item.insert(attr, add_val.clone());
3771        }
3772    }
3773
3774    Ok(())
3775}
3776
3777fn apply_delete_assignment(
3778    item: &mut HashMap<String, AttributeValue>,
3779    assignment: &str,
3780    expr_attr_names: &HashMap<String, String>,
3781    expr_attr_values: &HashMap<String, Value>,
3782) -> Result<(), AwsServiceError> {
3783    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
3784    if parts.len() != 2 {
3785        return Ok(());
3786    }
3787
3788    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
3789    let val_ref = parts[1].trim();
3790    let del_val = expr_attr_values.get(val_ref);
3791
3792    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
3793        if let (Some(existing_set), Some(del_set)) = (
3794            existing.get("SS").and_then(|v| v.as_array()),
3795            del_val.get("SS").and_then(|v| v.as_array()),
3796        ) {
3797            let filtered: Vec<Value> = existing_set
3798                .iter()
3799                .filter(|v| !del_set.contains(v))
3800                .cloned()
3801                .collect();
3802            if filtered.is_empty() {
3803                item.remove(&attr);
3804            } else {
3805                item.insert(attr, json!({"SS": filtered}));
3806            }
3807        } else if let (Some(existing_set), Some(del_set)) = (
3808            existing.get("NS").and_then(|v| v.as_array()),
3809            del_val.get("NS").and_then(|v| v.as_array()),
3810        ) {
3811            let filtered: Vec<Value> = existing_set
3812                .iter()
3813                .filter(|v| !del_set.contains(v))
3814                .cloned()
3815                .collect();
3816            if filtered.is_empty() {
3817                item.remove(&attr);
3818            } else {
3819                item.insert(attr, json!({"NS": filtered}));
3820            }
3821        }
3822    }
3823
3824    Ok(())
3825}
3826
3827#[allow(clippy::too_many_arguments)]
3828fn build_table_description_json(
3829    arn: &str,
3830    key_schema: &[KeySchemaElement],
3831    attribute_definitions: &[AttributeDefinition],
3832    provisioned_throughput: &ProvisionedThroughput,
3833    gsi: &[GlobalSecondaryIndex],
3834    lsi: &[LocalSecondaryIndex],
3835    billing_mode: &str,
3836    created_at: chrono::DateTime<chrono::Utc>,
3837    item_count: i64,
3838    size_bytes: i64,
3839    status: &str,
3840) -> Value {
3841    let table_name = arn.rsplit('/').next().unwrap_or("");
3842    let creation_timestamp =
3843        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
3844
3845    let ks: Vec<Value> = key_schema
3846        .iter()
3847        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3848        .collect();
3849
3850    let ad: Vec<Value> = attribute_definitions
3851        .iter()
3852        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3853        .collect();
3854
3855    let mut desc = json!({
3856        "TableName": table_name,
3857        "TableArn": arn,
3858        "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
3859        "TableStatus": status,
3860        "KeySchema": ks,
3861        "AttributeDefinitions": ad,
3862        "CreationDateTime": creation_timestamp,
3863        "ItemCount": item_count,
3864        "TableSizeBytes": size_bytes,
3865        "BillingModeSummary": { "BillingMode": billing_mode },
3866    });
3867
3868    if billing_mode != "PAY_PER_REQUEST" {
3869        desc["ProvisionedThroughput"] = json!({
3870            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
3871            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
3872            "NumberOfDecreasesToday": 0,
3873        });
3874    } else {
3875        desc["ProvisionedThroughput"] = json!({
3876            "ReadCapacityUnits": 0,
3877            "WriteCapacityUnits": 0,
3878            "NumberOfDecreasesToday": 0,
3879        });
3880    }
3881
3882    if !gsi.is_empty() {
3883        let gsi_json: Vec<Value> = gsi
3884            .iter()
3885            .map(|g| {
3886                let gks: Vec<Value> = g
3887                    .key_schema
3888                    .iter()
3889                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3890                    .collect();
3891                let mut idx = json!({
3892                    "IndexName": g.index_name,
3893                    "KeySchema": gks,
3894                    "Projection": { "ProjectionType": g.projection.projection_type },
3895                    "IndexStatus": "ACTIVE",
3896                    "IndexArn": format!("{arn}/index/{}", g.index_name),
3897                    "ItemCount": 0,
3898                    "IndexSizeBytes": 0,
3899                });
3900                if !g.projection.non_key_attributes.is_empty() {
3901                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
3902                }
3903                if let Some(ref pt) = g.provisioned_throughput {
3904                    idx["ProvisionedThroughput"] = json!({
3905                        "ReadCapacityUnits": pt.read_capacity_units,
3906                        "WriteCapacityUnits": pt.write_capacity_units,
3907                        "NumberOfDecreasesToday": 0,
3908                    });
3909                }
3910                idx
3911            })
3912            .collect();
3913        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
3914    }
3915
3916    if !lsi.is_empty() {
3917        let lsi_json: Vec<Value> = lsi
3918            .iter()
3919            .map(|l| {
3920                let lks: Vec<Value> = l
3921                    .key_schema
3922                    .iter()
3923                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3924                    .collect();
3925                let mut idx = json!({
3926                    "IndexName": l.index_name,
3927                    "KeySchema": lks,
3928                    "Projection": { "ProjectionType": l.projection.projection_type },
3929                    "IndexArn": format!("{arn}/index/{}", l.index_name),
3930                    "ItemCount": 0,
3931                    "IndexSizeBytes": 0,
3932                });
3933                if !l.projection.non_key_attributes.is_empty() {
3934                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
3935                }
3936                idx
3937            })
3938            .collect();
3939        desc["LocalSecondaryIndexes"] = json!(lsi_json);
3940    }
3941
3942    desc
3943}
3944
3945fn build_table_description(table: &DynamoTable) -> Value {
3946    build_table_description_json(
3947        &table.arn,
3948        &table.key_schema,
3949        &table.attribute_definitions,
3950        &table.provisioned_throughput,
3951        &table.gsi,
3952        &table.lsi,
3953        &table.billing_mode,
3954        table.created_at,
3955        table.item_count,
3956        table.size_bytes,
3957        &table.status,
3958    )
3959}
3960
3961fn execute_partiql_statement(
3962    state: &SharedDynamoDbState,
3963    statement: &str,
3964    parameters: &[Value],
3965) -> Result<AwsResponse, AwsServiceError> {
3966    let trimmed = statement.trim();
3967    let upper = trimmed.to_ascii_uppercase();
3968
3969    if upper.starts_with("SELECT") {
3970        execute_partiql_select(state, trimmed, parameters)
3971    } else if upper.starts_with("INSERT") {
3972        execute_partiql_insert(state, trimmed, parameters)
3973    } else if upper.starts_with("UPDATE") {
3974        execute_partiql_update(state, trimmed, parameters)
3975    } else if upper.starts_with("DELETE") {
3976        execute_partiql_delete(state, trimmed, parameters)
3977    } else {
3978        Err(AwsServiceError::aws_error(
3979            StatusCode::BAD_REQUEST,
3980            "ValidationException",
3981            format!("Unsupported PartiQL statement: {trimmed}"),
3982        ))
3983    }
3984}
3985
3986/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
3987fn execute_partiql_select(
3988    state: &SharedDynamoDbState,
3989    statement: &str,
3990    parameters: &[Value],
3991) -> Result<AwsResponse, AwsServiceError> {
3992    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
3993    let upper = statement.to_ascii_uppercase();
3994    let from_pos = upper.find("FROM").ok_or_else(|| {
3995        AwsServiceError::aws_error(
3996            StatusCode::BAD_REQUEST,
3997            "ValidationException",
3998            "Invalid SELECT statement: missing FROM",
3999        )
4000    })?;
4001
4002    let after_from = statement[from_pos + 4..].trim();
4003    let (table_name, rest) = parse_partiql_table_name(after_from);
4004
4005    let state = state.read();
4006    let table = get_table(&state.tables, &table_name)?;
4007
4008    let rest_upper = rest.trim().to_ascii_uppercase();
4009    if rest_upper.starts_with("WHERE") {
4010        let where_clause = rest.trim()[5..].trim();
4011        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
4012        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
4013        DynamoDbService::ok_json(json!({ "Items": items }))
4014    } else {
4015        // No WHERE, return all items
4016        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
4017        DynamoDbService::ok_json(json!({ "Items": items }))
4018    }
4019}
4020
4021fn execute_partiql_insert(
4022    state: &SharedDynamoDbState,
4023    statement: &str,
4024    parameters: &[Value],
4025) -> Result<AwsResponse, AwsServiceError> {
4026    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
4027    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
4028    let upper = statement.to_ascii_uppercase();
4029    let into_pos = upper.find("INTO").ok_or_else(|| {
4030        AwsServiceError::aws_error(
4031            StatusCode::BAD_REQUEST,
4032            "ValidationException",
4033            "Invalid INSERT statement: missing INTO",
4034        )
4035    })?;
4036
4037    let after_into = statement[into_pos + 4..].trim();
4038    let (table_name, rest) = parse_partiql_table_name(after_into);
4039
4040    let rest_upper = rest.trim().to_ascii_uppercase();
4041    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
4042        AwsServiceError::aws_error(
4043            StatusCode::BAD_REQUEST,
4044            "ValidationException",
4045            "Invalid INSERT statement: missing VALUE",
4046        )
4047    })?;
4048
4049    let value_str = rest.trim()[value_pos + 5..].trim();
4050    let item = parse_partiql_value_object(value_str, parameters)?;
4051
4052    let mut state = state.write();
4053    let table = get_table_mut(&mut state.tables, &table_name)?;
4054    let key = extract_key(table, &item);
4055    if table.find_item_index(&key).is_some() {
4056        // DynamoDB PartiQL INSERT fails if item exists
4057        return Err(AwsServiceError::aws_error(
4058            StatusCode::BAD_REQUEST,
4059            "DuplicateItemException",
4060            "Duplicate primary key exists in table",
4061        ));
4062    } else {
4063        table.items.push(item);
4064    }
4065    table.recalculate_stats();
4066
4067    DynamoDbService::ok_json(json!({}))
4068}
4069
4070fn execute_partiql_update(
4071    state: &SharedDynamoDbState,
4072    statement: &str,
4073    parameters: &[Value],
4074) -> Result<AwsResponse, AwsServiceError> {
4075    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
4076    // or: UPDATE "tablename" SET attr=? WHERE pk=?
4077    let after_update = statement[6..].trim(); // skip "UPDATE"
4078    let (table_name, rest) = parse_partiql_table_name(after_update);
4079
4080    let rest_upper = rest.trim().to_ascii_uppercase();
4081    let set_pos = rest_upper.find("SET").ok_or_else(|| {
4082        AwsServiceError::aws_error(
4083            StatusCode::BAD_REQUEST,
4084            "ValidationException",
4085            "Invalid UPDATE statement: missing SET",
4086        )
4087    })?;
4088
4089    let after_set = rest.trim()[set_pos + 3..].trim();
4090
4091    // Split on WHERE
4092    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
4093    let (set_clause, where_clause) = if let Some(wp) = where_pos {
4094        (&after_set[..wp], after_set[wp + 5..].trim())
4095    } else {
4096        (after_set, "")
4097    };
4098
4099    let mut state = state.write();
4100    let table = get_table_mut(&mut state.tables, &table_name)?;
4101
4102    let matched_indices = if !where_clause.is_empty() {
4103        find_partiql_where_indices(table, where_clause, parameters)?
4104    } else {
4105        (0..table.items.len()).collect()
4106    };
4107
4108    // Parse SET assignments: attr=value, attr2=value2
4109    let param_offset = count_params_in_str(where_clause);
4110    let assignments: Vec<&str> = set_clause.split(',').collect();
4111    for idx in &matched_indices {
4112        let mut local_offset = param_offset;
4113        for assignment in &assignments {
4114            let assignment = assignment.trim();
4115            if let Some((attr, val_str)) = assignment.split_once('=') {
4116                let attr = attr.trim().trim_matches('"');
4117                let val_str = val_str.trim();
4118                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
4119                if let Some(v) = value {
4120                    table.items[*idx].insert(attr.to_string(), v);
4121                }
4122            }
4123        }
4124    }
4125    table.recalculate_stats();
4126
4127    DynamoDbService::ok_json(json!({}))
4128}
4129
4130fn execute_partiql_delete(
4131    state: &SharedDynamoDbState,
4132    statement: &str,
4133    parameters: &[Value],
4134) -> Result<AwsResponse, AwsServiceError> {
4135    // Pattern: DELETE FROM "tablename" WHERE pk='val'
4136    let upper = statement.to_ascii_uppercase();
4137    let from_pos = upper.find("FROM").ok_or_else(|| {
4138        AwsServiceError::aws_error(
4139            StatusCode::BAD_REQUEST,
4140            "ValidationException",
4141            "Invalid DELETE statement: missing FROM",
4142        )
4143    })?;
4144
4145    let after_from = statement[from_pos + 4..].trim();
4146    let (table_name, rest) = parse_partiql_table_name(after_from);
4147
4148    let rest_upper = rest.trim().to_ascii_uppercase();
4149    if !rest_upper.starts_with("WHERE") {
4150        return Err(AwsServiceError::aws_error(
4151            StatusCode::BAD_REQUEST,
4152            "ValidationException",
4153            "DELETE requires a WHERE clause",
4154        ));
4155    }
4156    let where_clause = rest.trim()[5..].trim();
4157
4158    let mut state = state.write();
4159    let table = get_table_mut(&mut state.tables, &table_name)?;
4160
4161    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
4162    // Remove from highest index first to avoid invalidating lower indices
4163    indices.sort_unstable();
4164    indices.reverse();
4165    for idx in indices {
4166        table.items.remove(idx);
4167    }
4168    table.recalculate_stats();
4169
4170    DynamoDbService::ok_json(json!({}))
4171}
4172
4173/// Parse a table name that may be quoted with double quotes.
4174/// Returns (table_name, rest_of_string).
4175fn parse_partiql_table_name(s: &str) -> (String, &str) {
4176    let s = s.trim();
4177    if let Some(stripped) = s.strip_prefix('"') {
4178        // Quoted name
4179        if let Some(end) = stripped.find('"') {
4180            let name = &stripped[..end];
4181            let rest = &stripped[end + 1..];
4182            (name.to_string(), rest)
4183        } else {
4184            let end = s.find(' ').unwrap_or(s.len());
4185            (s[..end].trim_matches('"').to_string(), &s[end..])
4186        }
4187    } else {
4188        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
4189        (s[..end].to_string(), &s[end..])
4190    }
4191}
4192
4193/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
4194/// Returns matching items.
4195fn evaluate_partiql_where<'a>(
4196    table: &'a DynamoTable,
4197    where_clause: &str,
4198    parameters: &[Value],
4199) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
4200    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
4201    Ok(indices.iter().map(|i| &table.items[*i]).collect())
4202}
4203
4204fn find_partiql_where_indices(
4205    table: &DynamoTable,
4206    where_clause: &str,
4207    parameters: &[Value],
4208) -> Result<Vec<usize>, AwsServiceError> {
4209    // Support: col = 'val' AND col2 = 'val2'  or  col = ? AND col2 = ?
4210    // Case-insensitive AND splitting
4211    let upper = where_clause.to_uppercase();
4212    let conditions = if upper.contains(" AND ") {
4213        // Find positions of " AND " case-insensitively and split
4214        let mut parts = Vec::new();
4215        let mut last = 0;
4216        for (i, _) in upper.match_indices(" AND ") {
4217            parts.push(where_clause[last..i].trim());
4218            last = i + 5;
4219        }
4220        parts.push(where_clause[last..].trim());
4221        parts
4222    } else {
4223        vec![where_clause.trim()]
4224    };
4225
4226    let mut param_idx = 0usize;
4227    let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
4228
4229    for cond in &conditions {
4230        let cond = cond.trim();
4231        if let Some((left, right)) = cond.split_once('=') {
4232            let attr = left.trim().trim_matches('"').to_string();
4233            let val_str = right.trim();
4234            let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
4235            if let Some(v) = value {
4236                parsed_conditions.push((attr, v));
4237            }
4238        }
4239    }
4240
4241    let mut indices = Vec::new();
4242    for (i, item) in table.items.iter().enumerate() {
4243        let all_match = parsed_conditions
4244            .iter()
4245            .all(|(attr, expected)| item.get(attr) == Some(expected));
4246        if all_match {
4247            indices.push(i);
4248        }
4249    }
4250
4251    Ok(indices)
4252}
4253
4254/// Parse a PartiQL literal value. Supports:
4255/// - 'string' -> {"S": "string"}
4256/// - 123 -> {"N": "123"}
4257/// - ? -> parameter from list
4258fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
4259    let s = s.trim();
4260    if s == "?" {
4261        let idx = *param_idx;
4262        *param_idx += 1;
4263        parameters.get(idx).cloned()
4264    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
4265        let inner = &s[1..s.len() - 1];
4266        Some(json!({"S": inner}))
4267    } else if let Ok(n) = s.parse::<f64>() {
4268        let num_str = if n == n.trunc() {
4269            format!("{}", n as i64)
4270        } else {
4271            format!("{n}")
4272        };
4273        Some(json!({"N": num_str}))
4274    } else {
4275        None
4276    }
4277}
4278
4279/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
4280fn parse_partiql_value_object(
4281    s: &str,
4282    parameters: &[Value],
4283) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
4284    let s = s.trim();
4285    let inner = s
4286        .strip_prefix('{')
4287        .and_then(|s| s.strip_suffix('}'))
4288        .ok_or_else(|| {
4289            AwsServiceError::aws_error(
4290                StatusCode::BAD_REQUEST,
4291                "ValidationException",
4292                "Invalid VALUE: expected object literal",
4293            )
4294        })?;
4295
4296    let mut item = HashMap::new();
4297    let mut param_idx = 0usize;
4298
4299    // Simple comma-separated key:value parsing
4300    for pair in split_partiql_pairs(inner) {
4301        let pair = pair.trim();
4302        if pair.is_empty() {
4303            continue;
4304        }
4305        if let Some((key_part, val_part)) = pair.split_once(':') {
4306            let key = key_part
4307                .trim()
4308                .trim_matches('\'')
4309                .trim_matches('"')
4310                .to_string();
4311            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
4312                item.insert(key, val);
4313            }
4314        }
4315    }
4316
4317    Ok(item)
4318}
4319
4320/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
4321fn split_partiql_pairs(s: &str) -> Vec<&str> {
4322    let mut parts = Vec::new();
4323    let mut start = 0;
4324    let mut depth = 0;
4325    let mut in_quote = false;
4326
4327    for (i, c) in s.char_indices() {
4328        match c {
4329            '\'' if !in_quote => in_quote = true,
4330            '\'' if in_quote => in_quote = false,
4331            '{' if !in_quote => depth += 1,
4332            '}' if !in_quote => depth -= 1,
4333            ',' if !in_quote && depth == 0 => {
4334                parts.push(&s[start..i]);
4335                start = i + 1;
4336            }
4337            _ => {}
4338        }
4339    }
4340    parts.push(&s[start..]);
4341    parts
4342}
4343
4344/// Count ? parameters in a string.
4345fn count_params_in_str(s: &str) -> usize {
4346    s.chars().filter(|c| *c == '?').count()
4347}
4348
4349#[cfg(test)]
4350mod tests {
4351    use super::*;
4352    use serde_json::json;
4353
4354    #[test]
4355    fn test_parse_update_clauses_set() {
4356        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
4357        assert_eq!(clauses.len(), 1);
4358        assert_eq!(clauses[0].0, "SET");
4359        assert_eq!(clauses[0].1.len(), 2);
4360    }
4361
4362    #[test]
4363    fn test_parse_update_clauses_set_and_remove() {
4364        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
4365        assert_eq!(clauses.len(), 2);
4366        assert_eq!(clauses[0].0, "SET");
4367        assert_eq!(clauses[1].0, "REMOVE");
4368    }
4369
4370    #[test]
4371    fn test_evaluate_key_condition_simple() {
4372        let mut item = HashMap::new();
4373        item.insert("pk".to_string(), json!({"S": "user1"}));
4374        item.insert("sk".to_string(), json!({"S": "order1"}));
4375
4376        let mut expr_values = HashMap::new();
4377        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
4378
4379        assert!(evaluate_key_condition(
4380            "pk = :pk",
4381            &item,
4382            "pk",
4383            Some("sk"),
4384            &HashMap::new(),
4385            &expr_values,
4386        ));
4387    }
4388
4389    #[test]
4390    fn test_compare_attribute_values_numbers() {
4391        let a = json!({"N": "10"});
4392        let b = json!({"N": "20"});
4393        assert_eq!(
4394            compare_attribute_values(Some(&a), Some(&b)),
4395            std::cmp::Ordering::Less
4396        );
4397    }
4398
4399    #[test]
4400    fn test_compare_attribute_values_strings() {
4401        let a = json!({"S": "apple"});
4402        let b = json!({"S": "banana"});
4403        assert_eq!(
4404            compare_attribute_values(Some(&a), Some(&b)),
4405            std::cmp::Ordering::Less
4406        );
4407    }
4408
4409    #[test]
4410    fn test_split_on_and() {
4411        let parts = split_on_and("pk = :pk AND sk > :sk");
4412        assert_eq!(parts.len(), 2);
4413        assert_eq!(parts[0].trim(), "pk = :pk");
4414        assert_eq!(parts[1].trim(), "sk > :sk");
4415    }
4416
4417    #[test]
4418    fn test_split_on_and_respects_parentheses() {
4419        // Before fix: split_on_and would split inside the parens
4420        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
4421        // Should NOT split on the AND inside parentheses
4422        assert_eq!(parts.len(), 1);
4423        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
4424    }
4425
4426    #[test]
4427    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
4428        // (a AND b) OR c — should match when c is true but a is false
4429        let mut item = HashMap::new();
4430        item.insert("x".to_string(), json!({"S": "no"}));
4431        item.insert("y".to_string(), json!({"S": "no"}));
4432        item.insert("z".to_string(), json!({"S": "yes"}));
4433
4434        let mut expr_values = HashMap::new();
4435        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
4436
4437        // x=yes AND y=yes => false, but z=yes => true => overall true
4438        let result = evaluate_filter_expression(
4439            "(x = :yes AND y = :yes) OR z = :yes",
4440            &item,
4441            &HashMap::new(),
4442            &expr_values,
4443        );
4444        assert!(result, "should match because z = :yes is true");
4445
4446        // x=yes AND y=yes => false, z=yes => false => overall false
4447        let mut item2 = HashMap::new();
4448        item2.insert("x".to_string(), json!({"S": "no"}));
4449        item2.insert("y".to_string(), json!({"S": "no"}));
4450        item2.insert("z".to_string(), json!({"S": "no"}));
4451
4452        let result2 = evaluate_filter_expression(
4453            "(x = :yes AND y = :yes) OR z = :yes",
4454            &item2,
4455            &HashMap::new(),
4456            &expr_values,
4457        );
4458        assert!(!result2, "should not match because nothing is true");
4459    }
4460
4461    #[test]
4462    fn test_project_item_nested_path() {
4463        // Item with a list attribute containing maps
4464        let mut item = HashMap::new();
4465        item.insert("pk".to_string(), json!({"S": "key1"}));
4466        item.insert(
4467            "data".to_string(),
4468            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
4469        );
4470
4471        let body = json!({
4472            "ProjectionExpression": "data[0].name"
4473        });
4474
4475        let projected = project_item(&item, &body);
4476        // Should contain data[0].name = "Alice", not the entire data[0] element
4477        let name = projected
4478            .get("data")
4479            .and_then(|v| v.get("L"))
4480            .and_then(|v| v.get(0))
4481            .and_then(|v| v.get("M"))
4482            .and_then(|v| v.get("name"))
4483            .and_then(|v| v.get("S"))
4484            .and_then(|v| v.as_str());
4485        assert_eq!(name, Some("Alice"));
4486
4487        // Should NOT contain the "age" field
4488        let age = projected
4489            .get("data")
4490            .and_then(|v| v.get("L"))
4491            .and_then(|v| v.get(0))
4492            .and_then(|v| v.get("M"))
4493            .and_then(|v| v.get("age"));
4494        assert!(age.is_none(), "age should not be present in projection");
4495    }
4496
4497    #[test]
4498    fn test_resolve_nested_path_map() {
4499        let mut item = HashMap::new();
4500        item.insert(
4501            "info".to_string(),
4502            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
4503        );
4504
4505        let result = resolve_nested_path(&item, "info.address.city");
4506        assert_eq!(result, Some(json!({"S": "NYC"})));
4507    }
4508
4509    #[test]
4510    fn test_resolve_nested_path_list_then_map() {
4511        let mut item = HashMap::new();
4512        item.insert(
4513            "items".to_string(),
4514            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
4515        );
4516
4517        let result = resolve_nested_path(&item, "items[0].sku");
4518        assert_eq!(result, Some(json!({"S": "ABC"})));
4519    }
4520
4521    // -- Integration-style tests using DynamoDbService --
4522
4523    use crate::state::SharedDynamoDbState;
4524    use parking_lot::RwLock;
4525    use std::sync::Arc;
4526
4527    fn make_service() -> DynamoDbService {
4528        let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
4529            "123456789012",
4530            "us-east-1",
4531        )));
4532        DynamoDbService::new(state)
4533    }
4534
4535    fn make_request(action: &str, body: Value) -> AwsRequest {
4536        AwsRequest {
4537            service: "dynamodb".to_string(),
4538            action: action.to_string(),
4539            region: "us-east-1".to_string(),
4540            account_id: "123456789012".to_string(),
4541            request_id: "test-id".to_string(),
4542            headers: http::HeaderMap::new(),
4543            query_params: HashMap::new(),
4544            body: serde_json::to_vec(&body).unwrap().into(),
4545            path_segments: vec![],
4546            raw_path: "/".to_string(),
4547            method: http::Method::POST,
4548            is_query_protocol: false,
4549            access_key_id: None,
4550        }
4551    }
4552
4553    fn create_test_table(svc: &DynamoDbService) {
4554        let req = make_request(
4555            "CreateTable",
4556            json!({
4557                "TableName": "test-table",
4558                "KeySchema": [
4559                    { "AttributeName": "pk", "KeyType": "HASH" }
4560                ],
4561                "AttributeDefinitions": [
4562                    { "AttributeName": "pk", "AttributeType": "S" }
4563                ],
4564                "BillingMode": "PAY_PER_REQUEST"
4565            }),
4566        );
4567        svc.create_table(&req).unwrap();
4568    }
4569
4570    #[test]
4571    fn delete_item_return_values_all_old() {
4572        let svc = make_service();
4573        create_test_table(&svc);
4574
4575        // Put an item
4576        let req = make_request(
4577            "PutItem",
4578            json!({
4579                "TableName": "test-table",
4580                "Item": {
4581                    "pk": { "S": "key1" },
4582                    "name": { "S": "Alice" },
4583                    "age": { "N": "30" }
4584                }
4585            }),
4586        );
4587        svc.put_item(&req).unwrap();
4588
4589        // Delete with ReturnValues=ALL_OLD
4590        let req = make_request(
4591            "DeleteItem",
4592            json!({
4593                "TableName": "test-table",
4594                "Key": { "pk": { "S": "key1" } },
4595                "ReturnValues": "ALL_OLD"
4596            }),
4597        );
4598        let resp = svc.delete_item(&req).unwrap();
4599        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4600
4601        // Verify the old item is returned
4602        let attrs = &body["Attributes"];
4603        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
4604        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
4605        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
4606
4607        // Verify the item is actually deleted
4608        let req = make_request(
4609            "GetItem",
4610            json!({
4611                "TableName": "test-table",
4612                "Key": { "pk": { "S": "key1" } }
4613            }),
4614        );
4615        let resp = svc.get_item(&req).unwrap();
4616        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4617        assert!(body.get("Item").is_none(), "item should be deleted");
4618    }
4619
4620    #[test]
4621    fn transact_get_items_returns_existing_and_missing() {
4622        let svc = make_service();
4623        create_test_table(&svc);
4624
4625        // Put one item
4626        let req = make_request(
4627            "PutItem",
4628            json!({
4629                "TableName": "test-table",
4630                "Item": {
4631                    "pk": { "S": "exists" },
4632                    "val": { "S": "hello" }
4633                }
4634            }),
4635        );
4636        svc.put_item(&req).unwrap();
4637
4638        let req = make_request(
4639            "TransactGetItems",
4640            json!({
4641                "TransactItems": [
4642                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
4643                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
4644                ]
4645            }),
4646        );
4647        let resp = svc.transact_get_items(&req).unwrap();
4648        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4649        let responses = body["Responses"].as_array().unwrap();
4650        assert_eq!(responses.len(), 2);
4651        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
4652        assert!(responses[1].get("Item").is_none());
4653    }
4654
4655    #[test]
4656    fn transact_write_items_put_and_delete() {
4657        let svc = make_service();
4658        create_test_table(&svc);
4659
4660        // Put initial item
4661        let req = make_request(
4662            "PutItem",
4663            json!({
4664                "TableName": "test-table",
4665                "Item": {
4666                    "pk": { "S": "to-delete" },
4667                    "val": { "S": "bye" }
4668                }
4669            }),
4670        );
4671        svc.put_item(&req).unwrap();
4672
4673        // TransactWrite: put new + delete existing
4674        let req = make_request(
4675            "TransactWriteItems",
4676            json!({
4677                "TransactItems": [
4678                    {
4679                        "Put": {
4680                            "TableName": "test-table",
4681                            "Item": {
4682                                "pk": { "S": "new-item" },
4683                                "val": { "S": "hi" }
4684                            }
4685                        }
4686                    },
4687                    {
4688                        "Delete": {
4689                            "TableName": "test-table",
4690                            "Key": { "pk": { "S": "to-delete" } }
4691                        }
4692                    }
4693                ]
4694            }),
4695        );
4696        let resp = svc.transact_write_items(&req).unwrap();
4697        assert_eq!(resp.status, StatusCode::OK);
4698
4699        // Verify new item exists
4700        let req = make_request(
4701            "GetItem",
4702            json!({
4703                "TableName": "test-table",
4704                "Key": { "pk": { "S": "new-item" } }
4705            }),
4706        );
4707        let resp = svc.get_item(&req).unwrap();
4708        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4709        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
4710
4711        // Verify deleted item is gone
4712        let req = make_request(
4713            "GetItem",
4714            json!({
4715                "TableName": "test-table",
4716                "Key": { "pk": { "S": "to-delete" } }
4717            }),
4718        );
4719        let resp = svc.get_item(&req).unwrap();
4720        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4721        assert!(body.get("Item").is_none());
4722    }
4723
4724    #[test]
4725    fn transact_write_items_condition_check_failure() {
4726        let svc = make_service();
4727        create_test_table(&svc);
4728
4729        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
4730        let req = make_request(
4731            "TransactWriteItems",
4732            json!({
4733                "TransactItems": [
4734                    {
4735                        "ConditionCheck": {
4736                            "TableName": "test-table",
4737                            "Key": { "pk": { "S": "nonexistent" } },
4738                            "ConditionExpression": "attribute_exists(pk)"
4739                        }
4740                    }
4741                ]
4742            }),
4743        );
4744        let resp = svc.transact_write_items(&req).unwrap();
4745        // Should be a 400 error response
4746        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
4747        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4748        assert_eq!(
4749            body["__type"].as_str().unwrap(),
4750            "TransactionCanceledException"
4751        );
4752        assert!(body["CancellationReasons"].as_array().is_some());
4753    }
4754
4755    #[test]
4756    fn update_and_describe_time_to_live() {
4757        let svc = make_service();
4758        create_test_table(&svc);
4759
4760        // Enable TTL
4761        let req = make_request(
4762            "UpdateTimeToLive",
4763            json!({
4764                "TableName": "test-table",
4765                "TimeToLiveSpecification": {
4766                    "AttributeName": "ttl",
4767                    "Enabled": true
4768                }
4769            }),
4770        );
4771        let resp = svc.update_time_to_live(&req).unwrap();
4772        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4773        assert_eq!(
4774            body["TimeToLiveSpecification"]["AttributeName"]
4775                .as_str()
4776                .unwrap(),
4777            "ttl"
4778        );
4779        assert!(body["TimeToLiveSpecification"]["Enabled"]
4780            .as_bool()
4781            .unwrap());
4782
4783        // Describe TTL
4784        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
4785        let resp = svc.describe_time_to_live(&req).unwrap();
4786        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4787        assert_eq!(
4788            body["TimeToLiveDescription"]["TimeToLiveStatus"]
4789                .as_str()
4790                .unwrap(),
4791            "ENABLED"
4792        );
4793        assert_eq!(
4794            body["TimeToLiveDescription"]["AttributeName"]
4795                .as_str()
4796                .unwrap(),
4797            "ttl"
4798        );
4799
4800        // Disable TTL
4801        let req = make_request(
4802            "UpdateTimeToLive",
4803            json!({
4804                "TableName": "test-table",
4805                "TimeToLiveSpecification": {
4806                    "AttributeName": "ttl",
4807                    "Enabled": false
4808                }
4809            }),
4810        );
4811        svc.update_time_to_live(&req).unwrap();
4812
4813        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
4814        let resp = svc.describe_time_to_live(&req).unwrap();
4815        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4816        assert_eq!(
4817            body["TimeToLiveDescription"]["TimeToLiveStatus"]
4818                .as_str()
4819                .unwrap(),
4820            "DISABLED"
4821        );
4822    }
4823
4824    #[test]
4825    fn resource_policy_lifecycle() {
4826        let svc = make_service();
4827        create_test_table(&svc);
4828
4829        let table_arn = {
4830            let state = svc.state.read();
4831            state.tables.get("test-table").unwrap().arn.clone()
4832        };
4833
4834        // Put policy
4835        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
4836        let req = make_request(
4837            "PutResourcePolicy",
4838            json!({
4839                "ResourceArn": table_arn,
4840                "Policy": policy_doc
4841            }),
4842        );
4843        let resp = svc.put_resource_policy(&req).unwrap();
4844        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4845        assert!(body["RevisionId"].as_str().is_some());
4846
4847        // Get policy
4848        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
4849        let resp = svc.get_resource_policy(&req).unwrap();
4850        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4851        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
4852
4853        // Delete policy
4854        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
4855        svc.delete_resource_policy(&req).unwrap();
4856
4857        // Get should return null now
4858        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
4859        let resp = svc.get_resource_policy(&req).unwrap();
4860        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4861        assert!(body["Policy"].is_null());
4862    }
4863
4864    #[test]
4865    fn describe_endpoints() {
4866        let svc = make_service();
4867        let req = make_request("DescribeEndpoints", json!({}));
4868        let resp = svc.describe_endpoints(&req).unwrap();
4869        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4870        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
4871    }
4872
4873    #[test]
4874    fn describe_limits() {
4875        let svc = make_service();
4876        let req = make_request("DescribeLimits", json!({}));
4877        let resp = svc.describe_limits(&req).unwrap();
4878        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4879        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
4880    }
4881
4882    #[test]
4883    fn backup_lifecycle() {
4884        let svc = make_service();
4885        create_test_table(&svc);
4886
4887        // Create backup
4888        let req = make_request(
4889            "CreateBackup",
4890            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
4891        );
4892        let resp = svc.create_backup(&req).unwrap();
4893        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4894        let backup_arn = body["BackupDetails"]["BackupArn"]
4895            .as_str()
4896            .unwrap()
4897            .to_string();
4898        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
4899
4900        // Describe backup
4901        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
4902        let resp = svc.describe_backup(&req).unwrap();
4903        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4904        assert_eq!(
4905            body["BackupDescription"]["BackupDetails"]["BackupName"],
4906            "my-backup"
4907        );
4908
4909        // List backups
4910        let req = make_request("ListBackups", json!({}));
4911        let resp = svc.list_backups(&req).unwrap();
4912        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4913        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
4914
4915        // Restore from backup
4916        let req = make_request(
4917            "RestoreTableFromBackup",
4918            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
4919        );
4920        svc.restore_table_from_backup(&req).unwrap();
4921
4922        // Verify restored table exists
4923        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
4924        let resp = svc.describe_table(&req).unwrap();
4925        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4926        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
4927
4928        // Delete backup
4929        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
4930        svc.delete_backup(&req).unwrap();
4931
4932        // List should be empty
4933        let req = make_request("ListBackups", json!({}));
4934        let resp = svc.list_backups(&req).unwrap();
4935        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4936        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
4937    }
4938
4939    #[test]
4940    fn continuous_backups() {
4941        let svc = make_service();
4942        create_test_table(&svc);
4943
4944        // Initially disabled
4945        let req = make_request(
4946            "DescribeContinuousBackups",
4947            json!({ "TableName": "test-table" }),
4948        );
4949        let resp = svc.describe_continuous_backups(&req).unwrap();
4950        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4951        assert_eq!(
4952            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
4953                ["PointInTimeRecoveryStatus"],
4954            "DISABLED"
4955        );
4956
4957        // Enable
4958        let req = make_request(
4959            "UpdateContinuousBackups",
4960            json!({
4961                "TableName": "test-table",
4962                "PointInTimeRecoverySpecification": {
4963                    "PointInTimeRecoveryEnabled": true
4964                }
4965            }),
4966        );
4967        svc.update_continuous_backups(&req).unwrap();
4968
4969        // Verify
4970        let req = make_request(
4971            "DescribeContinuousBackups",
4972            json!({ "TableName": "test-table" }),
4973        );
4974        let resp = svc.describe_continuous_backups(&req).unwrap();
4975        let body: Value = serde_json::from_slice(&resp.body).unwrap();
4976        assert_eq!(
4977            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
4978                ["PointInTimeRecoveryStatus"],
4979            "ENABLED"
4980        );
4981    }
4982
4983    #[test]
4984    fn restore_table_to_point_in_time() {
4985        let svc = make_service();
4986        create_test_table(&svc);
4987
4988        let req = make_request(
4989            "RestoreTableToPointInTime",
4990            json!({
4991                "SourceTableName": "test-table",
4992                "TargetTableName": "pitr-restored"
4993            }),
4994        );
4995        svc.restore_table_to_point_in_time(&req).unwrap();
4996
4997        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
4998        let resp = svc.describe_table(&req).unwrap();
4999        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5000        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5001    }
5002
5003    #[test]
5004    fn global_table_lifecycle() {
5005        let svc = make_service();
5006
5007        // Create global table
5008        let req = make_request(
5009            "CreateGlobalTable",
5010            json!({
5011                "GlobalTableName": "my-global",
5012                "ReplicationGroup": [
5013                    { "RegionName": "us-east-1" },
5014                    { "RegionName": "eu-west-1" }
5015                ]
5016            }),
5017        );
5018        let resp = svc.create_global_table(&req).unwrap();
5019        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5020        assert_eq!(
5021            body["GlobalTableDescription"]["GlobalTableStatus"],
5022            "ACTIVE"
5023        );
5024
5025        // Describe
5026        let req = make_request(
5027            "DescribeGlobalTable",
5028            json!({ "GlobalTableName": "my-global" }),
5029        );
5030        let resp = svc.describe_global_table(&req).unwrap();
5031        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5032        assert_eq!(
5033            body["GlobalTableDescription"]["ReplicationGroup"]
5034                .as_array()
5035                .unwrap()
5036                .len(),
5037            2
5038        );
5039
5040        // List
5041        let req = make_request("ListGlobalTables", json!({}));
5042        let resp = svc.list_global_tables(&req).unwrap();
5043        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5044        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
5045
5046        // Update - add a region
5047        let req = make_request(
5048            "UpdateGlobalTable",
5049            json!({
5050                "GlobalTableName": "my-global",
5051                "ReplicaUpdates": [
5052                    { "Create": { "RegionName": "ap-southeast-1" } }
5053                ]
5054            }),
5055        );
5056        let resp = svc.update_global_table(&req).unwrap();
5057        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5058        assert_eq!(
5059            body["GlobalTableDescription"]["ReplicationGroup"]
5060                .as_array()
5061                .unwrap()
5062                .len(),
5063            3
5064        );
5065
5066        // Describe settings
5067        let req = make_request(
5068            "DescribeGlobalTableSettings",
5069            json!({ "GlobalTableName": "my-global" }),
5070        );
5071        let resp = svc.describe_global_table_settings(&req).unwrap();
5072        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5073        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
5074
5075        // Update settings (no-op, just verify no error)
5076        let req = make_request(
5077            "UpdateGlobalTableSettings",
5078            json!({ "GlobalTableName": "my-global" }),
5079        );
5080        svc.update_global_table_settings(&req).unwrap();
5081    }
5082
5083    #[test]
5084    fn table_replica_auto_scaling() {
5085        let svc = make_service();
5086        create_test_table(&svc);
5087
5088        let req = make_request(
5089            "DescribeTableReplicaAutoScaling",
5090            json!({ "TableName": "test-table" }),
5091        );
5092        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
5093        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5094        assert_eq!(
5095            body["TableAutoScalingDescription"]["TableName"],
5096            "test-table"
5097        );
5098
5099        let req = make_request(
5100            "UpdateTableReplicaAutoScaling",
5101            json!({ "TableName": "test-table" }),
5102        );
5103        svc.update_table_replica_auto_scaling(&req).unwrap();
5104    }
5105
5106    #[test]
5107    fn kinesis_streaming_lifecycle() {
5108        let svc = make_service();
5109        create_test_table(&svc);
5110
5111        // Enable
5112        let req = make_request(
5113            "EnableKinesisStreamingDestination",
5114            json!({
5115                "TableName": "test-table",
5116                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5117            }),
5118        );
5119        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
5120        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5121        assert_eq!(body["DestinationStatus"], "ACTIVE");
5122
5123        // Describe
5124        let req = make_request(
5125            "DescribeKinesisStreamingDestination",
5126            json!({ "TableName": "test-table" }),
5127        );
5128        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
5129        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5130        assert_eq!(
5131            body["KinesisDataStreamDestinations"]
5132                .as_array()
5133                .unwrap()
5134                .len(),
5135            1
5136        );
5137
5138        // Update
5139        let req = make_request(
5140            "UpdateKinesisStreamingDestination",
5141            json!({
5142                "TableName": "test-table",
5143                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
5144                "UpdateKinesisStreamingConfiguration": {
5145                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
5146                }
5147            }),
5148        );
5149        svc.update_kinesis_streaming_destination(&req).unwrap();
5150
5151        // Disable
5152        let req = make_request(
5153            "DisableKinesisStreamingDestination",
5154            json!({
5155                "TableName": "test-table",
5156                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5157            }),
5158        );
5159        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
5160        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5161        assert_eq!(body["DestinationStatus"], "DISABLED");
5162    }
5163
5164    #[test]
5165    fn contributor_insights_lifecycle() {
5166        let svc = make_service();
5167        create_test_table(&svc);
5168
5169        // Initially disabled
5170        let req = make_request(
5171            "DescribeContributorInsights",
5172            json!({ "TableName": "test-table" }),
5173        );
5174        let resp = svc.describe_contributor_insights(&req).unwrap();
5175        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5176        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5177
5178        // Enable
5179        let req = make_request(
5180            "UpdateContributorInsights",
5181            json!({
5182                "TableName": "test-table",
5183                "ContributorInsightsAction": "ENABLE"
5184            }),
5185        );
5186        let resp = svc.update_contributor_insights(&req).unwrap();
5187        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5188        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
5189
5190        // List
5191        let req = make_request("ListContributorInsights", json!({}));
5192        let resp = svc.list_contributor_insights(&req).unwrap();
5193        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5194        assert_eq!(
5195            body["ContributorInsightsSummaries"]
5196                .as_array()
5197                .unwrap()
5198                .len(),
5199            1
5200        );
5201    }
5202
5203    #[test]
5204    fn export_lifecycle() {
5205        let svc = make_service();
5206        create_test_table(&svc);
5207
5208        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
5209
5210        // Export
5211        let req = make_request(
5212            "ExportTableToPointInTime",
5213            json!({
5214                "TableArn": table_arn,
5215                "S3Bucket": "my-bucket"
5216            }),
5217        );
5218        let resp = svc.export_table_to_point_in_time(&req).unwrap();
5219        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5220        let export_arn = body["ExportDescription"]["ExportArn"]
5221            .as_str()
5222            .unwrap()
5223            .to_string();
5224        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
5225
5226        // Describe
5227        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
5228        let resp = svc.describe_export(&req).unwrap();
5229        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5230        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
5231
5232        // List
5233        let req = make_request("ListExports", json!({}));
5234        let resp = svc.list_exports(&req).unwrap();
5235        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5236        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
5237    }
5238
5239    #[test]
5240    fn import_lifecycle() {
5241        let svc = make_service();
5242
5243        let req = make_request(
5244            "ImportTable",
5245            json!({
5246                "InputFormat": "DYNAMODB_JSON",
5247                "S3BucketSource": { "S3Bucket": "import-bucket" },
5248                "TableCreationParameters": {
5249                    "TableName": "imported-table",
5250                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
5251                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
5252                }
5253            }),
5254        );
5255        let resp = svc.import_table(&req).unwrap();
5256        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5257        let import_arn = body["ImportTableDescription"]["ImportArn"]
5258            .as_str()
5259            .unwrap()
5260            .to_string();
5261        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5262
5263        // Describe import
5264        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
5265        let resp = svc.describe_import(&req).unwrap();
5266        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5267        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5268
5269        // List imports
5270        let req = make_request("ListImports", json!({}));
5271        let resp = svc.list_imports(&req).unwrap();
5272        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5273        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
5274
5275        // Verify the table was created
5276        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
5277        let resp = svc.describe_table(&req).unwrap();
5278        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5279        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5280    }
5281}