Skip to main content

fakecloud_dynamodb/
service.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Value};
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_core::validation::*;
10
11use fakecloud_s3::state::SharedS3State;
12
13use crate::state::{
14    attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
15    ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
16    KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
17    ReplicaDescription, SharedDynamoDbState,
18};
19
20pub struct DynamoDbService {
21    state: SharedDynamoDbState,
22    s3_state: Option<SharedS3State>,
23}
24
25impl DynamoDbService {
26    pub fn new(state: SharedDynamoDbState) -> Self {
27        Self {
28            state,
29            s3_state: None,
30        }
31    }
32
33    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
34        self.s3_state = Some(s3_state);
35        self
36    }
37
38    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
39        serde_json::from_slice(&req.body).map_err(|e| {
40            AwsServiceError::aws_error(
41                StatusCode::BAD_REQUEST,
42                "SerializationException",
43                format!("Invalid JSON: {e}"),
44            )
45        })
46    }
47
48    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
49        Ok(AwsResponse::ok_json(body))
50    }
51
52    // ── Table Operations ────────────────────────────────────────────────
53
54    fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
55        let body = Self::parse_body(req)?;
56
57        let table_name = body["TableName"]
58            .as_str()
59            .ok_or_else(|| {
60                AwsServiceError::aws_error(
61                    StatusCode::BAD_REQUEST,
62                    "ValidationException",
63                    "TableName is required",
64                )
65            })?
66            .to_string();
67
68        let key_schema = parse_key_schema(&body["KeySchema"])?;
69        let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
70
71        // Validate that key schema attributes are defined
72        for ks in &key_schema {
73            if !attribute_definitions
74                .iter()
75                .any(|ad| ad.attribute_name == ks.attribute_name)
76            {
77                return Err(AwsServiceError::aws_error(
78                    StatusCode::BAD_REQUEST,
79                    "ValidationException",
80                    format!(
81                        "One or more parameter values were invalid: \
82                         Some index key attributes are not defined in AttributeDefinitions. \
83                         Keys: [{}], AttributeDefinitions: [{}]",
84                        ks.attribute_name,
85                        attribute_definitions
86                            .iter()
87                            .map(|ad| ad.attribute_name.as_str())
88                            .collect::<Vec<_>>()
89                            .join(", ")
90                    ),
91                ));
92            }
93        }
94
95        let billing_mode = body["BillingMode"]
96            .as_str()
97            .unwrap_or("PROVISIONED")
98            .to_string();
99
100        let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
101            ProvisionedThroughput {
102                read_capacity_units: 0,
103                write_capacity_units: 0,
104            }
105        } else {
106            parse_provisioned_throughput(&body["ProvisionedThroughput"])?
107        };
108
109        let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
110        let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
111        let tags = parse_tags(&body["Tags"]);
112
113        let mut state = self.state.write();
114
115        if state.tables.contains_key(&table_name) {
116            return Err(AwsServiceError::aws_error(
117                StatusCode::BAD_REQUEST,
118                "ResourceInUseException",
119                format!("Table already exists: {table_name}"),
120            ));
121        }
122
123        let arn = format!(
124            "arn:aws:dynamodb:{}:{}:table/{}",
125            state.region, state.account_id, table_name
126        );
127        let now = Utc::now();
128
129        let table = DynamoTable {
130            name: table_name.clone(),
131            arn: arn.clone(),
132            key_schema: key_schema.clone(),
133            attribute_definitions: attribute_definitions.clone(),
134            provisioned_throughput: provisioned_throughput.clone(),
135            items: Vec::new(),
136            gsi: gsi.clone(),
137            lsi: lsi.clone(),
138            tags,
139            created_at: now,
140            status: "ACTIVE".to_string(),
141            item_count: 0,
142            size_bytes: 0,
143            billing_mode: billing_mode.clone(),
144            ttl_attribute: None,
145            ttl_enabled: false,
146            resource_policy: None,
147            pitr_enabled: false,
148            kinesis_destinations: Vec::new(),
149            contributor_insights_status: "DISABLED".to_string(),
150            contributor_insights_counters: HashMap::new(),
151        };
152
153        state.tables.insert(table_name, table);
154
155        let table_desc = build_table_description_json(
156            &arn,
157            &key_schema,
158            &attribute_definitions,
159            &provisioned_throughput,
160            &gsi,
161            &lsi,
162            &billing_mode,
163            now,
164            0,
165            0,
166            "ACTIVE",
167        );
168
169        Self::ok_json(json!({ "TableDescription": table_desc }))
170    }
171
172    fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
173        let body = Self::parse_body(req)?;
174        let table_name = require_str(&body, "TableName")?;
175
176        let mut state = self.state.write();
177        let table = state.tables.remove(table_name).ok_or_else(|| {
178            AwsServiceError::aws_error(
179                StatusCode::BAD_REQUEST,
180                "ResourceNotFoundException",
181                format!("Requested resource not found: Table: {table_name} not found"),
182            )
183        })?;
184
185        let table_desc = build_table_description_json(
186            &table.arn,
187            &table.key_schema,
188            &table.attribute_definitions,
189            &table.provisioned_throughput,
190            &table.gsi,
191            &table.lsi,
192            &table.billing_mode,
193            table.created_at,
194            table.item_count,
195            table.size_bytes,
196            "DELETING",
197        );
198
199        Self::ok_json(json!({ "TableDescription": table_desc }))
200    }
201
202    fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
203        let body = Self::parse_body(req)?;
204        let table_name = require_str(&body, "TableName")?;
205
206        let state = self.state.read();
207        let table = get_table(&state.tables, table_name)?;
208
209        let table_desc = build_table_description_json(
210            &table.arn,
211            &table.key_schema,
212            &table.attribute_definitions,
213            &table.provisioned_throughput,
214            &table.gsi,
215            &table.lsi,
216            &table.billing_mode,
217            table.created_at,
218            table.item_count,
219            table.size_bytes,
220            &table.status,
221        );
222
223        Self::ok_json(json!({ "Table": table_desc }))
224    }
225
226    fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
227        let body = Self::parse_body(req)?;
228
229        validate_optional_string_length(
230            "exclusiveStartTableName",
231            body["ExclusiveStartTableName"].as_str(),
232            3,
233            255,
234        )?;
235        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
236
237        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
238        let exclusive_start = body["ExclusiveStartTableName"]
239            .as_str()
240            .map(|s| s.to_string());
241
242        let state = self.state.read();
243        let mut names: Vec<&String> = state.tables.keys().collect();
244        names.sort();
245
246        let start_idx = match &exclusive_start {
247            Some(start) => names
248                .iter()
249                .position(|n| n.as_str() > start.as_str())
250                .unwrap_or(names.len()),
251            None => 0,
252        };
253
254        let page: Vec<&str> = names
255            .iter()
256            .skip(start_idx)
257            .take(limit)
258            .map(|n| n.as_str())
259            .collect();
260
261        let mut result = json!({ "TableNames": page });
262
263        if start_idx + limit < names.len() {
264            if let Some(last) = page.last() {
265                result["LastEvaluatedTableName"] = json!(last);
266            }
267        }
268
269        Self::ok_json(result)
270    }
271
272    fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
273        let body = Self::parse_body(req)?;
274        let table_name = require_str(&body, "TableName")?;
275
276        let mut state = self.state.write();
277        let table = state.tables.get_mut(table_name).ok_or_else(|| {
278            AwsServiceError::aws_error(
279                StatusCode::BAD_REQUEST,
280                "ResourceNotFoundException",
281                format!("Requested resource not found: Table: {table_name} not found"),
282            )
283        })?;
284
285        if let Some(pt) = body.get("ProvisionedThroughput") {
286            if let Ok(throughput) = parse_provisioned_throughput(pt) {
287                table.provisioned_throughput = throughput;
288            }
289        }
290
291        if let Some(bm) = body["BillingMode"].as_str() {
292            table.billing_mode = bm.to_string();
293        }
294
295        let table_desc = build_table_description_json(
296            &table.arn,
297            &table.key_schema,
298            &table.attribute_definitions,
299            &table.provisioned_throughput,
300            &table.gsi,
301            &table.lsi,
302            &table.billing_mode,
303            table.created_at,
304            table.item_count,
305            table.size_bytes,
306            &table.status,
307        );
308
309        Self::ok_json(json!({ "TableDescription": table_desc }))
310    }
311
312    // ── Item Operations ─────────────────────────────────────────────────
313
314    fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
315        // --- Parse request body and expression attributes WITHOUT holding any lock ---
316        let body = Self::parse_body(req)?;
317        let table_name = require_str(&body, "TableName")?;
318        let item = require_object(&body, "Item")?;
319        let condition = body["ConditionExpression"].as_str().map(|s| s.to_string());
320        let expr_attr_names = parse_expression_attribute_names(&body);
321        let expr_attr_values = parse_expression_attribute_values(&body);
322        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE").to_string();
323
324        // --- Acquire write lock ONLY for validation + mutation ---
325        let old_item = {
326            let mut state = self.state.write();
327            let table = get_table_mut(&mut state.tables, table_name)?;
328
329            validate_key_in_item(table, &item)?;
330
331            let key = extract_key(table, &item);
332            let existing_idx = table.find_item_index(&key);
333
334            if let Some(ref cond) = condition {
335                let existing = existing_idx.map(|i| &table.items[i]);
336                evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
337            }
338
339            let old_item = if return_values == "ALL_OLD" {
340                existing_idx.map(|i| table.items[i].clone())
341            } else {
342                None
343            };
344
345            if let Some(idx) = existing_idx {
346                table.items[idx] = item.clone();
347            } else {
348                table.items.push(item.clone());
349            }
350
351            table.record_item_access(&item);
352            table.recalculate_stats();
353
354            old_item
355        };
356        // --- Write lock released, build response ---
357
358        let mut result = json!({});
359        if let Some(old) = old_item {
360            result["Attributes"] = json!(old);
361        }
362
363        Self::ok_json(result)
364    }
365
366    fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
367        // --- Parse request body WITHOUT holding any lock ---
368        let body = Self::parse_body(req)?;
369        let table_name = require_str(&body, "TableName")?;
370        let key = require_object(&body, "Key")?;
371
372        // --- Use a read lock for the lookup (allows concurrent GetItem calls) ---
373        let (result, needs_insights) = {
374            let state = self.state.read();
375            let table = get_table(&state.tables, table_name)?;
376            let needs_insights = table.contributor_insights_status == "ENABLED";
377
378            let result = match table.find_item_index(&key) {
379                Some(idx) => {
380                    let item = &table.items[idx];
381                    let projected = project_item(item, &body);
382                    json!({ "Item": projected })
383                }
384                None => json!({}),
385            };
386            (result, needs_insights)
387        };
388        // --- Read lock released ---
389
390        // Only acquire write lock if contributor insights tracking is enabled
391        if needs_insights {
392            let mut state = self.state.write();
393            if let Some(table) = state.tables.get_mut(table_name) {
394                table.record_key_access(&key);
395            }
396        }
397
398        Self::ok_json(result)
399    }
400
401    fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
402        let body = Self::parse_body(req)?;
403
404        validate_optional_enum_value(
405            "conditionalOperator",
406            &body["ConditionalOperator"],
407            &["AND", "OR"],
408        )?;
409        validate_optional_enum_value(
410            "returnConsumedCapacity",
411            &body["ReturnConsumedCapacity"],
412            &["INDEXES", "TOTAL", "NONE"],
413        )?;
414        validate_optional_enum_value(
415            "returnValues",
416            &body["ReturnValues"],
417            &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
418        )?;
419        validate_optional_enum_value(
420            "returnItemCollectionMetrics",
421            &body["ReturnItemCollectionMetrics"],
422            &["SIZE", "NONE"],
423        )?;
424        validate_optional_enum_value(
425            "returnValuesOnConditionCheckFailure",
426            &body["ReturnValuesOnConditionCheckFailure"],
427            &["ALL_OLD", "NONE"],
428        )?;
429
430        let table_name = require_str(&body, "TableName")?;
431        let key = require_object(&body, "Key")?;
432
433        let mut state = self.state.write();
434        let table = get_table_mut(&mut state.tables, table_name)?;
435
436        let condition = body["ConditionExpression"].as_str();
437        let expr_attr_names = parse_expression_attribute_names(&body);
438        let expr_attr_values = parse_expression_attribute_values(&body);
439
440        let existing_idx = table.find_item_index(&key);
441
442        if let Some(cond) = condition {
443            let existing = existing_idx.map(|i| &table.items[i]);
444            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
445        }
446
447        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
448
449        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
450        let return_icm = body["ReturnItemCollectionMetrics"]
451            .as_str()
452            .unwrap_or("NONE");
453
454        let mut result = json!({});
455
456        if let Some(idx) = existing_idx {
457            if return_values == "ALL_OLD" {
458                result["Attributes"] = json!(table.items[idx]);
459            }
460            table.items.remove(idx);
461            table.recalculate_stats();
462        }
463
464        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
465            result["ConsumedCapacity"] = json!({
466                "TableName": table_name,
467                "CapacityUnits": 1.0,
468            });
469        }
470
471        if return_icm == "SIZE" {
472            result["ItemCollectionMetrics"] = json!({});
473        }
474
475        Self::ok_json(result)
476    }
477
478    fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
479        let body = Self::parse_body(req)?;
480        let table_name = require_str(&body, "TableName")?;
481        let key = require_object(&body, "Key")?;
482
483        let mut state = self.state.write();
484        let table = get_table_mut(&mut state.tables, table_name)?;
485
486        validate_key_attributes_in_key(table, &key)?;
487
488        let condition = body["ConditionExpression"].as_str();
489        let expr_attr_names = parse_expression_attribute_names(&body);
490        let expr_attr_values = parse_expression_attribute_values(&body);
491        let update_expression = body["UpdateExpression"].as_str();
492
493        let existing_idx = table.find_item_index(&key);
494
495        if let Some(cond) = condition {
496            let existing = existing_idx.map(|i| &table.items[i]);
497            evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
498        }
499
500        let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
501
502        let idx = match existing_idx {
503            Some(i) => i,
504            None => {
505                let mut new_item = HashMap::new();
506                for (k, v) in &key {
507                    new_item.insert(k.clone(), v.clone());
508                }
509                table.items.push(new_item);
510                table.items.len() - 1
511            }
512        };
513
514        let old_item = if return_values == "ALL_OLD" {
515            Some(table.items[idx].clone())
516        } else {
517            None
518        };
519
520        if let Some(expr) = update_expression {
521            apply_update_expression(
522                &mut table.items[idx],
523                expr,
524                &expr_attr_names,
525                &expr_attr_values,
526            )?;
527        }
528
529        let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
530            Some(table.items[idx].clone())
531        } else {
532            None
533        };
534
535        table.recalculate_stats();
536
537        let mut result = json!({});
538        if let Some(old) = old_item {
539            result["Attributes"] = json!(old);
540        } else if let Some(new) = new_item {
541            result["Attributes"] = json!(new);
542        }
543
544        Self::ok_json(result)
545    }
546
547    // ── Query & Scan ────────────────────────────────────────────────────
548
549    fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
550        let body = Self::parse_body(req)?;
551        let table_name = require_str(&body, "TableName")?;
552
553        let state = self.state.read();
554        let table = get_table(&state.tables, table_name)?;
555
556        let expr_attr_names = parse_expression_attribute_names(&body);
557        let expr_attr_values = parse_expression_attribute_values(&body);
558
559        let key_condition = body["KeyConditionExpression"].as_str();
560        let filter_expression = body["FilterExpression"].as_str();
561        let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
562        let limit = body["Limit"].as_i64().map(|l| l as usize);
563        let index_name = body["IndexName"].as_str();
564        let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
565            parse_key_map(&body["ExclusiveStartKey"]);
566
567        let (items_to_scan, hash_key_name, range_key_name): (
568            &[HashMap<String, AttributeValue>],
569            String,
570            Option<String>,
571        ) = if let Some(idx_name) = index_name {
572            if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
573                let hk = gsi
574                    .key_schema
575                    .iter()
576                    .find(|k| k.key_type == "HASH")
577                    .map(|k| k.attribute_name.clone())
578                    .unwrap_or_default();
579                let rk = gsi
580                    .key_schema
581                    .iter()
582                    .find(|k| k.key_type == "RANGE")
583                    .map(|k| k.attribute_name.clone());
584                (&table.items, hk, rk)
585            } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
586                let hk = lsi
587                    .key_schema
588                    .iter()
589                    .find(|k| k.key_type == "HASH")
590                    .map(|k| k.attribute_name.clone())
591                    .unwrap_or_default();
592                let rk = lsi
593                    .key_schema
594                    .iter()
595                    .find(|k| k.key_type == "RANGE")
596                    .map(|k| k.attribute_name.clone());
597                (&table.items, hk, rk)
598            } else {
599                return Err(AwsServiceError::aws_error(
600                    StatusCode::BAD_REQUEST,
601                    "ValidationException",
602                    format!("The table does not have the specified index: {idx_name}"),
603                ));
604            }
605        } else {
606            (
607                &table.items[..],
608                table.hash_key_name().to_string(),
609                table.range_key_name().map(|s| s.to_string()),
610            )
611        };
612
613        let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
614            .iter()
615            .filter(|item| {
616                if let Some(kc) = key_condition {
617                    evaluate_key_condition(
618                        kc,
619                        item,
620                        &hash_key_name,
621                        range_key_name.as_deref(),
622                        &expr_attr_names,
623                        &expr_attr_values,
624                    )
625                } else {
626                    true
627                }
628            })
629            .collect();
630
631        if let Some(ref rk) = range_key_name {
632            matched.sort_by(|a, b| {
633                let av = a.get(rk.as_str());
634                let bv = b.get(rk.as_str());
635                compare_attribute_values(av, bv)
636            });
637            if !scan_forward {
638                matched.reverse();
639            }
640        }
641
642        // For GSI queries, we need the table's primary key attributes to uniquely
643        // identify items (GSI keys are not unique).
644        let table_pk_hash = table.hash_key_name().to_string();
645        let table_pk_range = table.range_key_name().map(|s| s.to_string());
646        let is_gsi_query = index_name.is_some()
647            && (hash_key_name != table_pk_hash
648                || range_key_name.as_deref() != table_pk_range.as_deref());
649
650        // Apply ExclusiveStartKey: skip items up to and including the start key.
651        // For GSI queries the start key contains both index keys and table PK, so
652        // we must match on ALL of them to find the exact item.
653        if let Some(ref start_key) = exclusive_start_key {
654            if let Some(pos) = matched.iter().position(|item| {
655                let index_match =
656                    item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref());
657                if is_gsi_query {
658                    index_match
659                        && item_matches_key(
660                            item,
661                            start_key,
662                            &table_pk_hash,
663                            table_pk_range.as_deref(),
664                        )
665                } else {
666                    index_match
667                }
668            }) {
669                matched = matched.split_off(pos + 1);
670            }
671        }
672
673        if let Some(filter) = filter_expression {
674            matched.retain(|item| {
675                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
676            });
677        }
678
679        let scanned_count = matched.len();
680
681        let has_more = if let Some(lim) = limit {
682            let more = matched.len() > lim;
683            matched.truncate(lim);
684            more
685        } else {
686            false
687        };
688
689        // Build LastEvaluatedKey from the last returned item if there are more results.
690        // For GSI queries, include both the index keys and the table's primary key
691        // so the item can be uniquely identified on resume.
692        let last_evaluated_key = if has_more {
693            matched.last().map(|item| {
694                let mut key =
695                    extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref());
696                if is_gsi_query {
697                    let table_key =
698                        extract_key_for_schema(item, &table_pk_hash, table_pk_range.as_deref());
699                    key.extend(table_key);
700                }
701                key
702            })
703        } else {
704            None
705        };
706
707        // Collect partition key values for contributor insights
708        let insights_enabled = table.contributor_insights_status == "ENABLED";
709        let pk_name = table.hash_key_name().to_string();
710        let accessed_keys: Vec<String> = if insights_enabled {
711            matched
712                .iter()
713                .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
714                .collect()
715        } else {
716            Vec::new()
717        };
718
719        let items: Vec<Value> = matched
720            .iter()
721            .map(|item| {
722                let projected = project_item(item, &body);
723                json!(projected)
724            })
725            .collect();
726
727        let mut result = json!({
728            "Items": items,
729            "Count": items.len(),
730            "ScannedCount": scanned_count,
731        });
732
733        if let Some(lek) = last_evaluated_key {
734            result["LastEvaluatedKey"] = json!(lek);
735        }
736
737        drop(state);
738
739        if !accessed_keys.is_empty() {
740            let mut state = self.state.write();
741            if let Some(table) = state.tables.get_mut(table_name) {
742                // Re-check insights status after acquiring write lock in case it
743                // was disabled between the read and write lock acquisitions.
744                if table.contributor_insights_status == "ENABLED" {
745                    for key_str in accessed_keys {
746                        *table
747                            .contributor_insights_counters
748                            .entry(key_str)
749                            .or_insert(0) += 1;
750                    }
751                }
752            }
753        }
754
755        Self::ok_json(result)
756    }
757
758    fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
759        let body = Self::parse_body(req)?;
760        let table_name = require_str(&body, "TableName")?;
761
762        let state = self.state.read();
763        let table = get_table(&state.tables, table_name)?;
764
765        let expr_attr_names = parse_expression_attribute_names(&body);
766        let expr_attr_values = parse_expression_attribute_values(&body);
767        let filter_expression = body["FilterExpression"].as_str();
768        let limit = body["Limit"].as_i64().map(|l| l as usize);
769        let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
770            parse_key_map(&body["ExclusiveStartKey"]);
771
772        let hash_key_name = table.hash_key_name().to_string();
773        let range_key_name = table.range_key_name().map(|s| s.to_string());
774
775        let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
776
777        // Apply ExclusiveStartKey: skip items up to and including the start key
778        if let Some(ref start_key) = exclusive_start_key {
779            if let Some(pos) = matched.iter().position(|item| {
780                item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref())
781            }) {
782                matched = matched.split_off(pos + 1);
783            }
784        }
785
786        let scanned_count = matched.len();
787
788        if let Some(filter) = filter_expression {
789            matched.retain(|item| {
790                evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
791            });
792        }
793
794        let has_more = if let Some(lim) = limit {
795            let more = matched.len() > lim;
796            matched.truncate(lim);
797            more
798        } else {
799            false
800        };
801
802        // Build LastEvaluatedKey from the last returned item if there are more results
803        let last_evaluated_key = if has_more {
804            matched
805                .last()
806                .map(|item| extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref()))
807        } else {
808            None
809        };
810
811        // Collect partition key values for contributor insights
812        let insights_enabled = table.contributor_insights_status == "ENABLED";
813        let pk_name = table.hash_key_name().to_string();
814        let accessed_keys: Vec<String> = if insights_enabled {
815            matched
816                .iter()
817                .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
818                .collect()
819        } else {
820            Vec::new()
821        };
822
823        let items: Vec<Value> = matched
824            .iter()
825            .map(|item| {
826                let projected = project_item(item, &body);
827                json!(projected)
828            })
829            .collect();
830
831        let mut result = json!({
832            "Items": items,
833            "Count": items.len(),
834            "ScannedCount": scanned_count,
835        });
836
837        if let Some(lek) = last_evaluated_key {
838            result["LastEvaluatedKey"] = json!(lek);
839        }
840
841        drop(state);
842
843        if !accessed_keys.is_empty() {
844            let mut state = self.state.write();
845            if let Some(table) = state.tables.get_mut(table_name) {
846                // Re-check insights status after acquiring write lock in case it
847                // was disabled between the read and write lock acquisitions.
848                if table.contributor_insights_status == "ENABLED" {
849                    for key_str in accessed_keys {
850                        *table
851                            .contributor_insights_counters
852                            .entry(key_str)
853                            .or_insert(0) += 1;
854                    }
855                }
856            }
857        }
858
859        Self::ok_json(result)
860    }
861
862    // ── Batch Operations ────────────────────────────────────────────────
863
864    fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
865        let body = Self::parse_body(req)?;
866
867        validate_optional_enum_value(
868            "returnConsumedCapacity",
869            &body["ReturnConsumedCapacity"],
870            &["INDEXES", "TOTAL", "NONE"],
871        )?;
872
873        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
874
875        let request_items = body["RequestItems"]
876            .as_object()
877            .ok_or_else(|| {
878                AwsServiceError::aws_error(
879                    StatusCode::BAD_REQUEST,
880                    "ValidationException",
881                    "RequestItems is required",
882                )
883            })?
884            .clone();
885
886        let state = self.state.read();
887        let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
888        let mut consumed_capacity: Vec<Value> = Vec::new();
889
890        for (table_name, params) in &request_items {
891            let table = get_table(&state.tables, table_name)?;
892            let keys = params["Keys"].as_array().ok_or_else(|| {
893                AwsServiceError::aws_error(
894                    StatusCode::BAD_REQUEST,
895                    "ValidationException",
896                    "Keys is required",
897                )
898            })?;
899
900            let mut items = Vec::new();
901            for key_val in keys {
902                let key: HashMap<String, AttributeValue> =
903                    serde_json::from_value(key_val.clone()).unwrap_or_default();
904                if let Some(idx) = table.find_item_index(&key) {
905                    items.push(json!(table.items[idx]));
906                }
907            }
908            responses.insert(table_name.clone(), items);
909
910            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
911                consumed_capacity.push(json!({
912                    "TableName": table_name,
913                    "CapacityUnits": 1.0,
914                }));
915            }
916        }
917
918        let mut result = json!({
919            "Responses": responses,
920            "UnprocessedKeys": {},
921        });
922
923        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
924            result["ConsumedCapacity"] = json!(consumed_capacity);
925        }
926
927        Self::ok_json(result)
928    }
929
930    fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
931        let body = Self::parse_body(req)?;
932
933        validate_optional_enum_value(
934            "returnConsumedCapacity",
935            &body["ReturnConsumedCapacity"],
936            &["INDEXES", "TOTAL", "NONE"],
937        )?;
938        validate_optional_enum_value(
939            "returnItemCollectionMetrics",
940            &body["ReturnItemCollectionMetrics"],
941            &["SIZE", "NONE"],
942        )?;
943
944        let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
945        let return_icm = body["ReturnItemCollectionMetrics"]
946            .as_str()
947            .unwrap_or("NONE");
948
949        let request_items = body["RequestItems"]
950            .as_object()
951            .ok_or_else(|| {
952                AwsServiceError::aws_error(
953                    StatusCode::BAD_REQUEST,
954                    "ValidationException",
955                    "RequestItems is required",
956                )
957            })?
958            .clone();
959
960        let mut state = self.state.write();
961        let mut consumed_capacity: Vec<Value> = Vec::new();
962        let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
963
964        for (table_name, requests) in &request_items {
965            let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
966                AwsServiceError::aws_error(
967                    StatusCode::BAD_REQUEST,
968                    "ResourceNotFoundException",
969                    format!("Requested resource not found: Table: {table_name} not found"),
970                )
971            })?;
972
973            let reqs = requests.as_array().ok_or_else(|| {
974                AwsServiceError::aws_error(
975                    StatusCode::BAD_REQUEST,
976                    "ValidationException",
977                    "Request list must be an array",
978                )
979            })?;
980
981            for request in reqs {
982                if let Some(put_req) = request.get("PutRequest") {
983                    let item: HashMap<String, AttributeValue> =
984                        serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
985                    let key = extract_key(table, &item);
986                    if let Some(idx) = table.find_item_index(&key) {
987                        table.items[idx] = item;
988                    } else {
989                        table.items.push(item);
990                    }
991                } else if let Some(del_req) = request.get("DeleteRequest") {
992                    let key: HashMap<String, AttributeValue> =
993                        serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
994                    if let Some(idx) = table.find_item_index(&key) {
995                        table.items.remove(idx);
996                    }
997                }
998            }
999
1000            table.recalculate_stats();
1001
1002            if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1003                consumed_capacity.push(json!({
1004                    "TableName": table_name,
1005                    "CapacityUnits": 1.0,
1006                }));
1007            }
1008
1009            if return_icm == "SIZE" {
1010                item_collection_metrics.insert(table_name.clone(), vec![]);
1011            }
1012        }
1013
1014        let mut result = json!({
1015            "UnprocessedItems": {},
1016        });
1017
1018        if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1019            result["ConsumedCapacity"] = json!(consumed_capacity);
1020        }
1021
1022        if return_icm == "SIZE" {
1023            result["ItemCollectionMetrics"] = json!(item_collection_metrics);
1024        }
1025
1026        Self::ok_json(result)
1027    }
1028
1029    // ── Tags ────────────────────────────────────────────────────────────
1030
1031    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1032        let body = Self::parse_body(req)?;
1033        let resource_arn = require_str(&body, "ResourceArn")?;
1034        validate_required("Tags", &body["Tags"])?;
1035
1036        let mut state = self.state.write();
1037        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1038
1039        fakecloud_core::tags::apply_tags(&mut table.tags, &body, "Tags", "Key", "Value").map_err(
1040            |f| {
1041                AwsServiceError::aws_error(
1042                    StatusCode::BAD_REQUEST,
1043                    "ValidationException",
1044                    format!("{f} must be a list"),
1045                )
1046            },
1047        )?;
1048
1049        Self::ok_json(json!({}))
1050    }
1051
1052    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1053        let body = Self::parse_body(req)?;
1054        let resource_arn = require_str(&body, "ResourceArn")?;
1055        validate_required("TagKeys", &body["TagKeys"])?;
1056
1057        let mut state = self.state.write();
1058        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1059
1060        fakecloud_core::tags::remove_tags(&mut table.tags, &body, "TagKeys").map_err(|f| {
1061            AwsServiceError::aws_error(
1062                StatusCode::BAD_REQUEST,
1063                "ValidationException",
1064                format!("{f} must be a list"),
1065            )
1066        })?;
1067
1068        Self::ok_json(json!({}))
1069    }
1070
1071    fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1072        let body = Self::parse_body(req)?;
1073        let resource_arn = require_str(&body, "ResourceArn")?;
1074
1075        let state = self.state.read();
1076        let table = find_table_by_arn(&state.tables, resource_arn)?;
1077
1078        let tags = fakecloud_core::tags::tags_to_json(&table.tags, "Key", "Value");
1079
1080        Self::ok_json(json!({ "Tags": tags }))
1081    }
1082
1083    // ── Transactions ────────────────────────────────────────────────────
1084
1085    fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1086        let body = Self::parse_body(req)?;
1087        validate_optional_enum_value(
1088            "returnConsumedCapacity",
1089            &body["ReturnConsumedCapacity"],
1090            &["INDEXES", "TOTAL", "NONE"],
1091        )?;
1092        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1093            AwsServiceError::aws_error(
1094                StatusCode::BAD_REQUEST,
1095                "ValidationException",
1096                "TransactItems is required",
1097            )
1098        })?;
1099
1100        let state = self.state.read();
1101        let mut responses: Vec<Value> = Vec::new();
1102
1103        for ti in transact_items {
1104            let get = &ti["Get"];
1105            let table_name = get["TableName"].as_str().ok_or_else(|| {
1106                AwsServiceError::aws_error(
1107                    StatusCode::BAD_REQUEST,
1108                    "ValidationException",
1109                    "TableName is required in Get",
1110                )
1111            })?;
1112            let key: HashMap<String, AttributeValue> =
1113                serde_json::from_value(get["Key"].clone()).unwrap_or_default();
1114
1115            let table = get_table(&state.tables, table_name)?;
1116            match table.find_item_index(&key) {
1117                Some(idx) => {
1118                    responses.push(json!({ "Item": table.items[idx] }));
1119                }
1120                None => {
1121                    responses.push(json!({}));
1122                }
1123            }
1124        }
1125
1126        Self::ok_json(json!({ "Responses": responses }))
1127    }
1128
1129    fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1130        let body = Self::parse_body(req)?;
1131        validate_optional_string_length(
1132            "clientRequestToken",
1133            body["ClientRequestToken"].as_str(),
1134            1,
1135            36,
1136        )?;
1137        validate_optional_enum_value(
1138            "returnConsumedCapacity",
1139            &body["ReturnConsumedCapacity"],
1140            &["INDEXES", "TOTAL", "NONE"],
1141        )?;
1142        validate_optional_enum_value(
1143            "returnItemCollectionMetrics",
1144            &body["ReturnItemCollectionMetrics"],
1145            &["SIZE", "NONE"],
1146        )?;
1147        let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1148            AwsServiceError::aws_error(
1149                StatusCode::BAD_REQUEST,
1150                "ValidationException",
1151                "TransactItems is required",
1152            )
1153        })?;
1154
1155        let mut state = self.state.write();
1156
1157        // First pass: validate all conditions
1158        let mut cancellation_reasons: Vec<Value> = Vec::new();
1159        let mut any_failed = false;
1160
1161        for ti in transact_items {
1162            if let Some(put) = ti.get("Put") {
1163                let table_name = put["TableName"].as_str().unwrap_or_default();
1164                let item: HashMap<String, AttributeValue> =
1165                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1166                let condition = put["ConditionExpression"].as_str();
1167
1168                if let Some(cond) = condition {
1169                    let table = get_table(&state.tables, table_name)?;
1170                    let expr_attr_names = parse_expression_attribute_names(put);
1171                    let expr_attr_values = parse_expression_attribute_values(put);
1172                    let key = extract_key(table, &item);
1173                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1174                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1175                        .is_err()
1176                    {
1177                        cancellation_reasons.push(json!({
1178                            "Code": "ConditionalCheckFailed",
1179                            "Message": "The conditional request failed"
1180                        }));
1181                        any_failed = true;
1182                        continue;
1183                    }
1184                }
1185                cancellation_reasons.push(json!({ "Code": "None" }));
1186            } else if let Some(delete) = ti.get("Delete") {
1187                let table_name = delete["TableName"].as_str().unwrap_or_default();
1188                let key: HashMap<String, AttributeValue> =
1189                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1190                let condition = delete["ConditionExpression"].as_str();
1191
1192                if let Some(cond) = condition {
1193                    let table = get_table(&state.tables, table_name)?;
1194                    let expr_attr_names = parse_expression_attribute_names(delete);
1195                    let expr_attr_values = parse_expression_attribute_values(delete);
1196                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1197                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1198                        .is_err()
1199                    {
1200                        cancellation_reasons.push(json!({
1201                            "Code": "ConditionalCheckFailed",
1202                            "Message": "The conditional request failed"
1203                        }));
1204                        any_failed = true;
1205                        continue;
1206                    }
1207                }
1208                cancellation_reasons.push(json!({ "Code": "None" }));
1209            } else if let Some(update) = ti.get("Update") {
1210                let table_name = update["TableName"].as_str().unwrap_or_default();
1211                let key: HashMap<String, AttributeValue> =
1212                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1213                let condition = update["ConditionExpression"].as_str();
1214
1215                if let Some(cond) = condition {
1216                    let table = get_table(&state.tables, table_name)?;
1217                    let expr_attr_names = parse_expression_attribute_names(update);
1218                    let expr_attr_values = parse_expression_attribute_values(update);
1219                    let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1220                    if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1221                        .is_err()
1222                    {
1223                        cancellation_reasons.push(json!({
1224                            "Code": "ConditionalCheckFailed",
1225                            "Message": "The conditional request failed"
1226                        }));
1227                        any_failed = true;
1228                        continue;
1229                    }
1230                }
1231                cancellation_reasons.push(json!({ "Code": "None" }));
1232            } else if let Some(check) = ti.get("ConditionCheck") {
1233                let table_name = check["TableName"].as_str().unwrap_or_default();
1234                let key: HashMap<String, AttributeValue> =
1235                    serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1236                let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1237
1238                let table = get_table(&state.tables, table_name)?;
1239                let expr_attr_names = parse_expression_attribute_names(check);
1240                let expr_attr_values = parse_expression_attribute_values(check);
1241                let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1242                if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1243                {
1244                    cancellation_reasons.push(json!({
1245                        "Code": "ConditionalCheckFailed",
1246                        "Message": "The conditional request failed"
1247                    }));
1248                    any_failed = true;
1249                    continue;
1250                }
1251                cancellation_reasons.push(json!({ "Code": "None" }));
1252            } else {
1253                cancellation_reasons.push(json!({ "Code": "None" }));
1254            }
1255        }
1256
1257        if any_failed {
1258            let error_body = json!({
1259                "__type": "TransactionCanceledException",
1260                "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1261                "CancellationReasons": cancellation_reasons
1262            });
1263            return Ok(AwsResponse::json(
1264                StatusCode::BAD_REQUEST,
1265                serde_json::to_vec(&error_body).unwrap(),
1266            ));
1267        }
1268
1269        // Second pass: apply all writes
1270        for ti in transact_items {
1271            if let Some(put) = ti.get("Put") {
1272                let table_name = put["TableName"].as_str().unwrap_or_default();
1273                let item: HashMap<String, AttributeValue> =
1274                    serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1275                let table = get_table_mut(&mut state.tables, table_name)?;
1276                let key = extract_key(table, &item);
1277                if let Some(idx) = table.find_item_index(&key) {
1278                    table.items[idx] = item;
1279                } else {
1280                    table.items.push(item);
1281                }
1282                table.recalculate_stats();
1283            } else if let Some(delete) = ti.get("Delete") {
1284                let table_name = delete["TableName"].as_str().unwrap_or_default();
1285                let key: HashMap<String, AttributeValue> =
1286                    serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1287                let table = get_table_mut(&mut state.tables, table_name)?;
1288                if let Some(idx) = table.find_item_index(&key) {
1289                    table.items.remove(idx);
1290                }
1291                table.recalculate_stats();
1292            } else if let Some(update) = ti.get("Update") {
1293                let table_name = update["TableName"].as_str().unwrap_or_default();
1294                let key: HashMap<String, AttributeValue> =
1295                    serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1296                let update_expression = update["UpdateExpression"].as_str();
1297                let expr_attr_names = parse_expression_attribute_names(update);
1298                let expr_attr_values = parse_expression_attribute_values(update);
1299
1300                let table = get_table_mut(&mut state.tables, table_name)?;
1301                let idx = match table.find_item_index(&key) {
1302                    Some(i) => i,
1303                    None => {
1304                        let mut new_item = HashMap::new();
1305                        for (k, v) in &key {
1306                            new_item.insert(k.clone(), v.clone());
1307                        }
1308                        table.items.push(new_item);
1309                        table.items.len() - 1
1310                    }
1311                };
1312
1313                if let Some(expr) = update_expression {
1314                    apply_update_expression(
1315                        &mut table.items[idx],
1316                        expr,
1317                        &expr_attr_names,
1318                        &expr_attr_values,
1319                    )?;
1320                }
1321                table.recalculate_stats();
1322            }
1323            // ConditionCheck: no write needed
1324        }
1325
1326        Self::ok_json(json!({}))
1327    }
1328
1329    fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1330        let body = Self::parse_body(req)?;
1331        let statement = require_str(&body, "Statement")?;
1332        let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1333
1334        execute_partiql_statement(&self.state, statement, &parameters)
1335    }
1336
1337    fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1338        let body = Self::parse_body(req)?;
1339        validate_optional_enum_value(
1340            "returnConsumedCapacity",
1341            &body["ReturnConsumedCapacity"],
1342            &["INDEXES", "TOTAL", "NONE"],
1343        )?;
1344        let statements = body["Statements"].as_array().ok_or_else(|| {
1345            AwsServiceError::aws_error(
1346                StatusCode::BAD_REQUEST,
1347                "ValidationException",
1348                "Statements is required",
1349            )
1350        })?;
1351
1352        let mut responses: Vec<Value> = Vec::new();
1353        for stmt_obj in statements {
1354            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1355            let parameters = stmt_obj["Parameters"]
1356                .as_array()
1357                .cloned()
1358                .unwrap_or_default();
1359
1360            match execute_partiql_statement(&self.state, statement, &parameters) {
1361                Ok(resp) => {
1362                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1363                    responses.push(resp_body);
1364                }
1365                Err(e) => {
1366                    responses.push(json!({
1367                        "Error": {
1368                            "Code": "ValidationException",
1369                            "Message": e.to_string()
1370                        }
1371                    }));
1372                }
1373            }
1374        }
1375
1376        Self::ok_json(json!({ "Responses": responses }))
1377    }
1378
1379    fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1380        let body = Self::parse_body(req)?;
1381        validate_optional_string_length(
1382            "clientRequestToken",
1383            body["ClientRequestToken"].as_str(),
1384            1,
1385            36,
1386        )?;
1387        validate_optional_enum_value(
1388            "returnConsumedCapacity",
1389            &body["ReturnConsumedCapacity"],
1390            &["INDEXES", "TOTAL", "NONE"],
1391        )?;
1392        let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1393            AwsServiceError::aws_error(
1394                StatusCode::BAD_REQUEST,
1395                "ValidationException",
1396                "TransactStatements is required",
1397            )
1398        })?;
1399
1400        // Collect all results; if any fail, return TransactionCanceledException
1401        let mut results: Vec<Result<Value, String>> = Vec::new();
1402        for stmt_obj in transact_statements {
1403            let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1404            let parameters = stmt_obj["Parameters"]
1405                .as_array()
1406                .cloned()
1407                .unwrap_or_default();
1408
1409            match execute_partiql_statement(&self.state, statement, &parameters) {
1410                Ok(resp) => {
1411                    let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1412                    results.push(Ok(resp_body));
1413                }
1414                Err(e) => {
1415                    results.push(Err(e.to_string()));
1416                }
1417            }
1418        }
1419
1420        let any_failed = results.iter().any(|r| r.is_err());
1421        if any_failed {
1422            let reasons: Vec<Value> = results
1423                .iter()
1424                .map(|r| match r {
1425                    Ok(_) => json!({ "Code": "None" }),
1426                    Err(msg) => json!({
1427                        "Code": "ValidationException",
1428                        "Message": msg
1429                    }),
1430                })
1431                .collect();
1432            let error_body = json!({
1433                "__type": "TransactionCanceledException",
1434                "message": "Transaction cancelled due to validation errors",
1435                "CancellationReasons": reasons
1436            });
1437            return Ok(AwsResponse::json(
1438                StatusCode::BAD_REQUEST,
1439                serde_json::to_vec(&error_body).unwrap(),
1440            ));
1441        }
1442
1443        let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1444        Self::ok_json(json!({ "Responses": responses }))
1445    }
1446
1447    // ── TTL ─────────────────────────────────────────────────────────────
1448
1449    fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1450        let body = Self::parse_body(req)?;
1451        let table_name = require_str(&body, "TableName")?;
1452        let spec = &body["TimeToLiveSpecification"];
1453        let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1454            AwsServiceError::aws_error(
1455                StatusCode::BAD_REQUEST,
1456                "ValidationException",
1457                "TimeToLiveSpecification.AttributeName is required",
1458            )
1459        })?;
1460        let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1461            AwsServiceError::aws_error(
1462                StatusCode::BAD_REQUEST,
1463                "ValidationException",
1464                "TimeToLiveSpecification.Enabled is required",
1465            )
1466        })?;
1467
1468        let mut state = self.state.write();
1469        let table = get_table_mut(&mut state.tables, table_name)?;
1470
1471        if enabled {
1472            table.ttl_attribute = Some(attr_name.to_string());
1473            table.ttl_enabled = true;
1474        } else {
1475            table.ttl_enabled = false;
1476        }
1477
1478        Self::ok_json(json!({
1479            "TimeToLiveSpecification": {
1480                "AttributeName": attr_name,
1481                "Enabled": enabled
1482            }
1483        }))
1484    }
1485
1486    fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1487        let body = Self::parse_body(req)?;
1488        let table_name = require_str(&body, "TableName")?;
1489
1490        let state = self.state.read();
1491        let table = get_table(&state.tables, table_name)?;
1492
1493        let status = if table.ttl_enabled {
1494            "ENABLED"
1495        } else {
1496            "DISABLED"
1497        };
1498
1499        let mut desc = json!({
1500            "TimeToLiveDescription": {
1501                "TimeToLiveStatus": status
1502            }
1503        });
1504
1505        if let Some(ref attr) = table.ttl_attribute {
1506            desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1507        }
1508
1509        Self::ok_json(desc)
1510    }
1511
1512    // ── Resource Policies ───────────────────────────────────────────────
1513
1514    fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1515        let body = Self::parse_body(req)?;
1516        let resource_arn = require_str(&body, "ResourceArn")?;
1517        let policy = require_str(&body, "Policy")?;
1518
1519        let mut state = self.state.write();
1520        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1521        table.resource_policy = Some(policy.to_string());
1522
1523        let revision_id = uuid::Uuid::new_v4().to_string();
1524        Self::ok_json(json!({ "RevisionId": revision_id }))
1525    }
1526
1527    fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1528        let body = Self::parse_body(req)?;
1529        let resource_arn = require_str(&body, "ResourceArn")?;
1530
1531        let state = self.state.read();
1532        let table = find_table_by_arn(&state.tables, resource_arn)?;
1533
1534        match &table.resource_policy {
1535            Some(policy) => {
1536                let revision_id = uuid::Uuid::new_v4().to_string();
1537                Self::ok_json(json!({
1538                    "Policy": policy,
1539                    "RevisionId": revision_id
1540                }))
1541            }
1542            None => Self::ok_json(json!({ "Policy": null })),
1543        }
1544    }
1545
1546    fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1547        let body = Self::parse_body(req)?;
1548        let resource_arn = require_str(&body, "ResourceArn")?;
1549
1550        let mut state = self.state.write();
1551        let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1552        table.resource_policy = None;
1553
1554        Self::ok_json(json!({}))
1555    }
1556
1557    // ── Stubs ──────────────────────────────────────────────────────────
1558
1559    fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1560        Self::ok_json(json!({
1561            "Endpoints": [{
1562                "Address": "dynamodb.us-east-1.amazonaws.com",
1563                "CachePeriodInMinutes": 1440
1564            }]
1565        }))
1566    }
1567
1568    fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1569        Self::ok_json(json!({
1570            "AccountMaxReadCapacityUnits": 80000,
1571            "AccountMaxWriteCapacityUnits": 80000,
1572            "TableMaxReadCapacityUnits": 40000,
1573            "TableMaxWriteCapacityUnits": 40000
1574        }))
1575    }
1576
1577    // ── Backups ────────────────────────────────────────────────────────
1578
1579    fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1580        let body = Self::parse_body(req)?;
1581        let table_name = require_str(&body, "TableName")?;
1582        let backup_name = require_str(&body, "BackupName")?;
1583
1584        let mut state = self.state.write();
1585        let table = get_table(&state.tables, table_name)?;
1586
1587        let backup_arn = format!(
1588            "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1589            state.region,
1590            state.account_id,
1591            table_name,
1592            Utc::now().format("%Y%m%d%H%M%S")
1593        );
1594        let now = Utc::now();
1595
1596        let backup = BackupDescription {
1597            backup_arn: backup_arn.clone(),
1598            backup_name: backup_name.to_string(),
1599            table_name: table_name.to_string(),
1600            table_arn: table.arn.clone(),
1601            backup_status: "AVAILABLE".to_string(),
1602            backup_type: "USER".to_string(),
1603            backup_creation_date: now,
1604            key_schema: table.key_schema.clone(),
1605            attribute_definitions: table.attribute_definitions.clone(),
1606            provisioned_throughput: table.provisioned_throughput.clone(),
1607            billing_mode: table.billing_mode.clone(),
1608            item_count: table.item_count,
1609            size_bytes: table.size_bytes,
1610            items: table.items.clone(),
1611        };
1612
1613        state.backups.insert(backup_arn.clone(), backup);
1614
1615        Self::ok_json(json!({
1616            "BackupDetails": {
1617                "BackupArn": backup_arn,
1618                "BackupName": backup_name,
1619                "BackupStatus": "AVAILABLE",
1620                "BackupType": "USER",
1621                "BackupCreationDateTime": now.timestamp() as f64,
1622                "BackupSizeBytes": 0
1623            }
1624        }))
1625    }
1626
1627    fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1628        let body = Self::parse_body(req)?;
1629        let backup_arn = require_str(&body, "BackupArn")?;
1630
1631        let mut state = self.state.write();
1632        let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1633            AwsServiceError::aws_error(
1634                StatusCode::BAD_REQUEST,
1635                "BackupNotFoundException",
1636                format!("Backup not found: {backup_arn}"),
1637            )
1638        })?;
1639
1640        Self::ok_json(json!({
1641            "BackupDescription": {
1642                "BackupDetails": {
1643                    "BackupArn": backup.backup_arn,
1644                    "BackupName": backup.backup_name,
1645                    "BackupStatus": "DELETED",
1646                    "BackupType": backup.backup_type,
1647                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1648                    "BackupSizeBytes": backup.size_bytes
1649                },
1650                "SourceTableDetails": {
1651                    "TableName": backup.table_name,
1652                    "TableArn": backup.table_arn,
1653                    "TableId": uuid::Uuid::new_v4().to_string(),
1654                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1655                        "AttributeName": ks.attribute_name,
1656                        "KeyType": ks.key_type
1657                    })).collect::<Vec<_>>(),
1658                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1659                    "ProvisionedThroughput": {
1660                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1661                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1662                    },
1663                    "ItemCount": backup.item_count,
1664                    "BillingMode": backup.billing_mode,
1665                    "TableSizeBytes": backup.size_bytes
1666                },
1667                "SourceTableFeatureDetails": {}
1668            }
1669        }))
1670    }
1671
1672    fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1673        let body = Self::parse_body(req)?;
1674        let backup_arn = require_str(&body, "BackupArn")?;
1675
1676        let state = self.state.read();
1677        let backup = state.backups.get(backup_arn).ok_or_else(|| {
1678            AwsServiceError::aws_error(
1679                StatusCode::BAD_REQUEST,
1680                "BackupNotFoundException",
1681                format!("Backup not found: {backup_arn}"),
1682            )
1683        })?;
1684
1685        Self::ok_json(json!({
1686            "BackupDescription": {
1687                "BackupDetails": {
1688                    "BackupArn": backup.backup_arn,
1689                    "BackupName": backup.backup_name,
1690                    "BackupStatus": backup.backup_status,
1691                    "BackupType": backup.backup_type,
1692                    "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1693                    "BackupSizeBytes": backup.size_bytes
1694                },
1695                "SourceTableDetails": {
1696                    "TableName": backup.table_name,
1697                    "TableArn": backup.table_arn,
1698                    "TableId": uuid::Uuid::new_v4().to_string(),
1699                    "KeySchema": backup.key_schema.iter().map(|ks| json!({
1700                        "AttributeName": ks.attribute_name,
1701                        "KeyType": ks.key_type
1702                    })).collect::<Vec<_>>(),
1703                    "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1704                    "ProvisionedThroughput": {
1705                        "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1706                        "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1707                    },
1708                    "ItemCount": backup.item_count,
1709                    "BillingMode": backup.billing_mode,
1710                    "TableSizeBytes": backup.size_bytes
1711                },
1712                "SourceTableFeatureDetails": {}
1713            }
1714        }))
1715    }
1716
1717    fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1718        let body = Self::parse_body(req)?;
1719        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
1720        validate_optional_string_length(
1721            "exclusiveStartBackupArn",
1722            body["ExclusiveStartBackupArn"].as_str(),
1723            37,
1724            1024,
1725        )?;
1726        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1727        validate_optional_enum_value(
1728            "backupType",
1729            &body["BackupType"],
1730            &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
1731        )?;
1732        let table_name = body["TableName"].as_str();
1733
1734        let state = self.state.read();
1735        let summaries: Vec<Value> = state
1736            .backups
1737            .values()
1738            .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
1739            .map(|b| {
1740                json!({
1741                    "TableName": b.table_name,
1742                    "TableArn": b.table_arn,
1743                    "BackupArn": b.backup_arn,
1744                    "BackupName": b.backup_name,
1745                    "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
1746                    "BackupStatus": b.backup_status,
1747                    "BackupType": b.backup_type,
1748                    "BackupSizeBytes": b.size_bytes
1749                })
1750            })
1751            .collect();
1752
1753        Self::ok_json(json!({
1754            "BackupSummaries": summaries
1755        }))
1756    }
1757
1758    fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1759        let body = Self::parse_body(req)?;
1760        let backup_arn = require_str(&body, "BackupArn")?;
1761        let target_table_name = require_str(&body, "TargetTableName")?;
1762
1763        let mut state = self.state.write();
1764        let backup = state.backups.get(backup_arn).ok_or_else(|| {
1765            AwsServiceError::aws_error(
1766                StatusCode::BAD_REQUEST,
1767                "BackupNotFoundException",
1768                format!("Backup not found: {backup_arn}"),
1769            )
1770        })?;
1771
1772        if state.tables.contains_key(target_table_name) {
1773            return Err(AwsServiceError::aws_error(
1774                StatusCode::BAD_REQUEST,
1775                "TableAlreadyExistsException",
1776                format!("Table already exists: {target_table_name}"),
1777            ));
1778        }
1779
1780        let now = Utc::now();
1781        let arn = format!(
1782            "arn:aws:dynamodb:{}:{}:table/{}",
1783            state.region, state.account_id, target_table_name
1784        );
1785
1786        let restored_items = backup.items.clone();
1787        let mut table = DynamoTable {
1788            name: target_table_name.to_string(),
1789            arn: arn.clone(),
1790            key_schema: backup.key_schema.clone(),
1791            attribute_definitions: backup.attribute_definitions.clone(),
1792            provisioned_throughput: backup.provisioned_throughput.clone(),
1793            items: restored_items,
1794            gsi: Vec::new(),
1795            lsi: Vec::new(),
1796            tags: HashMap::new(),
1797            created_at: now,
1798            status: "ACTIVE".to_string(),
1799            item_count: 0,
1800            size_bytes: 0,
1801            billing_mode: backup.billing_mode.clone(),
1802            ttl_attribute: None,
1803            ttl_enabled: false,
1804            resource_policy: None,
1805            pitr_enabled: false,
1806            kinesis_destinations: Vec::new(),
1807            contributor_insights_status: "DISABLED".to_string(),
1808            contributor_insights_counters: HashMap::new(),
1809        };
1810        table.recalculate_stats();
1811
1812        let desc = build_table_description(&table);
1813        state.tables.insert(target_table_name.to_string(), table);
1814
1815        Self::ok_json(json!({
1816            "TableDescription": desc
1817        }))
1818    }
1819
1820    fn restore_table_to_point_in_time(
1821        &self,
1822        req: &AwsRequest,
1823    ) -> Result<AwsResponse, AwsServiceError> {
1824        let body = Self::parse_body(req)?;
1825        let target_table_name = require_str(&body, "TargetTableName")?;
1826        let source_table_name = body["SourceTableName"].as_str();
1827        let source_table_arn = body["SourceTableArn"].as_str();
1828
1829        let mut state = self.state.write();
1830
1831        // Resolve source table
1832        let source = if let Some(name) = source_table_name {
1833            get_table(&state.tables, name)?.clone()
1834        } else if let Some(arn) = source_table_arn {
1835            find_table_by_arn(&state.tables, arn)?.clone()
1836        } else {
1837            return Err(AwsServiceError::aws_error(
1838                StatusCode::BAD_REQUEST,
1839                "ValidationException",
1840                "SourceTableName or SourceTableArn is required",
1841            ));
1842        };
1843
1844        if state.tables.contains_key(target_table_name) {
1845            return Err(AwsServiceError::aws_error(
1846                StatusCode::BAD_REQUEST,
1847                "TableAlreadyExistsException",
1848                format!("Table already exists: {target_table_name}"),
1849            ));
1850        }
1851
1852        let now = Utc::now();
1853        let arn = format!(
1854            "arn:aws:dynamodb:{}:{}:table/{}",
1855            state.region, state.account_id, target_table_name
1856        );
1857
1858        let mut table = DynamoTable {
1859            name: target_table_name.to_string(),
1860            arn: arn.clone(),
1861            key_schema: source.key_schema.clone(),
1862            attribute_definitions: source.attribute_definitions.clone(),
1863            provisioned_throughput: source.provisioned_throughput.clone(),
1864            items: source.items.clone(),
1865            gsi: Vec::new(),
1866            lsi: Vec::new(),
1867            tags: HashMap::new(),
1868            created_at: now,
1869            status: "ACTIVE".to_string(),
1870            item_count: 0,
1871            size_bytes: 0,
1872            billing_mode: source.billing_mode.clone(),
1873            ttl_attribute: None,
1874            ttl_enabled: false,
1875            resource_policy: None,
1876            pitr_enabled: false,
1877            kinesis_destinations: Vec::new(),
1878            contributor_insights_status: "DISABLED".to_string(),
1879            contributor_insights_counters: HashMap::new(),
1880        };
1881        table.recalculate_stats();
1882
1883        let desc = build_table_description(&table);
1884        state.tables.insert(target_table_name.to_string(), table);
1885
1886        Self::ok_json(json!({
1887            "TableDescription": desc
1888        }))
1889    }
1890
1891    fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1892        let body = Self::parse_body(req)?;
1893        let table_name = require_str(&body, "TableName")?;
1894
1895        let pitr_spec = body["PointInTimeRecoverySpecification"]
1896            .as_object()
1897            .ok_or_else(|| {
1898                AwsServiceError::aws_error(
1899                    StatusCode::BAD_REQUEST,
1900                    "ValidationException",
1901                    "PointInTimeRecoverySpecification is required",
1902                )
1903            })?;
1904        let enabled = pitr_spec
1905            .get("PointInTimeRecoveryEnabled")
1906            .and_then(|v| v.as_bool())
1907            .unwrap_or(false);
1908
1909        let mut state = self.state.write();
1910        let table = get_table_mut(&mut state.tables, table_name)?;
1911        table.pitr_enabled = enabled;
1912
1913        let status = if enabled { "ENABLED" } else { "DISABLED" };
1914        Self::ok_json(json!({
1915            "ContinuousBackupsDescription": {
1916                "ContinuousBackupsStatus": status,
1917                "PointInTimeRecoveryDescription": {
1918                    "PointInTimeRecoveryStatus": status,
1919                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1920                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
1921                }
1922            }
1923        }))
1924    }
1925
1926    fn describe_continuous_backups(
1927        &self,
1928        req: &AwsRequest,
1929    ) -> Result<AwsResponse, AwsServiceError> {
1930        let body = Self::parse_body(req)?;
1931        let table_name = require_str(&body, "TableName")?;
1932
1933        let state = self.state.read();
1934        let table = get_table(&state.tables, table_name)?;
1935
1936        let status = if table.pitr_enabled {
1937            "ENABLED"
1938        } else {
1939            "DISABLED"
1940        };
1941        Self::ok_json(json!({
1942            "ContinuousBackupsDescription": {
1943                "ContinuousBackupsStatus": status,
1944                "PointInTimeRecoveryDescription": {
1945                    "PointInTimeRecoveryStatus": status,
1946                    "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1947                    "LatestRestorableDateTime": Utc::now().timestamp() as f64
1948                }
1949            }
1950        }))
1951    }
1952
1953    // ── Global Tables ──────────────────────────────────────────────────
1954
1955    fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1956        let body = Self::parse_body(req)?;
1957        let global_table_name = require_str(&body, "GlobalTableName")?;
1958        validate_string_length("globalTableName", global_table_name, 3, 255)?;
1959
1960        let replication_group = body["ReplicationGroup"]
1961            .as_array()
1962            .ok_or_else(|| {
1963                AwsServiceError::aws_error(
1964                    StatusCode::BAD_REQUEST,
1965                    "ValidationException",
1966                    "ReplicationGroup is required",
1967                )
1968            })?
1969            .iter()
1970            .filter_map(|r| {
1971                r["RegionName"].as_str().map(|rn| ReplicaDescription {
1972                    region_name: rn.to_string(),
1973                    replica_status: "ACTIVE".to_string(),
1974                })
1975            })
1976            .collect::<Vec<_>>();
1977
1978        let mut state = self.state.write();
1979
1980        if state.global_tables.contains_key(global_table_name) {
1981            return Err(AwsServiceError::aws_error(
1982                StatusCode::BAD_REQUEST,
1983                "GlobalTableAlreadyExistsException",
1984                format!("Global table already exists: {global_table_name}"),
1985            ));
1986        }
1987
1988        let arn = format!(
1989            "arn:aws:dynamodb::{}:global-table/{}",
1990            state.account_id, global_table_name
1991        );
1992        let now = Utc::now();
1993
1994        let gt = GlobalTableDescription {
1995            global_table_name: global_table_name.to_string(),
1996            global_table_arn: arn.clone(),
1997            global_table_status: "ACTIVE".to_string(),
1998            creation_date: now,
1999            replication_group: replication_group.clone(),
2000        };
2001
2002        state
2003            .global_tables
2004            .insert(global_table_name.to_string(), gt);
2005
2006        Self::ok_json(json!({
2007            "GlobalTableDescription": {
2008                "GlobalTableName": global_table_name,
2009                "GlobalTableArn": arn,
2010                "GlobalTableStatus": "ACTIVE",
2011                "CreationDateTime": now.timestamp() as f64,
2012                "ReplicationGroup": replication_group.iter().map(|r| json!({
2013                    "RegionName": r.region_name,
2014                    "ReplicaStatus": r.replica_status
2015                })).collect::<Vec<_>>()
2016            }
2017        }))
2018    }
2019
2020    fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2021        let body = Self::parse_body(req)?;
2022        let global_table_name = require_str(&body, "GlobalTableName")?;
2023        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2024
2025        let state = self.state.read();
2026        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2027            AwsServiceError::aws_error(
2028                StatusCode::BAD_REQUEST,
2029                "GlobalTableNotFoundException",
2030                format!("Global table not found: {global_table_name}"),
2031            )
2032        })?;
2033
2034        Self::ok_json(json!({
2035            "GlobalTableDescription": {
2036                "GlobalTableName": gt.global_table_name,
2037                "GlobalTableArn": gt.global_table_arn,
2038                "GlobalTableStatus": gt.global_table_status,
2039                "CreationDateTime": gt.creation_date.timestamp() as f64,
2040                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2041                    "RegionName": r.region_name,
2042                    "ReplicaStatus": r.replica_status
2043                })).collect::<Vec<_>>()
2044            }
2045        }))
2046    }
2047
2048    fn describe_global_table_settings(
2049        &self,
2050        req: &AwsRequest,
2051    ) -> Result<AwsResponse, AwsServiceError> {
2052        let body = Self::parse_body(req)?;
2053        let global_table_name = require_str(&body, "GlobalTableName")?;
2054        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2055
2056        let state = self.state.read();
2057        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2058            AwsServiceError::aws_error(
2059                StatusCode::BAD_REQUEST,
2060                "GlobalTableNotFoundException",
2061                format!("Global table not found: {global_table_name}"),
2062            )
2063        })?;
2064
2065        let replica_settings: Vec<Value> = gt
2066            .replication_group
2067            .iter()
2068            .map(|r| {
2069                json!({
2070                    "RegionName": r.region_name,
2071                    "ReplicaStatus": r.replica_status,
2072                    "ReplicaProvisionedReadCapacityUnits": 0,
2073                    "ReplicaProvisionedWriteCapacityUnits": 0
2074                })
2075            })
2076            .collect();
2077
2078        Self::ok_json(json!({
2079            "GlobalTableName": gt.global_table_name,
2080            "ReplicaSettings": replica_settings
2081        }))
2082    }
2083
2084    fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2085        let body = Self::parse_body(req)?;
2086        validate_optional_string_length(
2087            "exclusiveStartGlobalTableName",
2088            body["ExclusiveStartGlobalTableName"].as_str(),
2089            3,
2090            255,
2091        )?;
2092        validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
2093        let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2094
2095        let state = self.state.read();
2096        let tables: Vec<Value> = state
2097            .global_tables
2098            .values()
2099            .take(limit)
2100            .map(|gt| {
2101                json!({
2102                    "GlobalTableName": gt.global_table_name,
2103                    "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2104                        "RegionName": r.region_name
2105                    })).collect::<Vec<_>>()
2106                })
2107            })
2108            .collect();
2109
2110        Self::ok_json(json!({
2111            "GlobalTables": tables
2112        }))
2113    }
2114
2115    fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2116        let body = Self::parse_body(req)?;
2117        let global_table_name = require_str(&body, "GlobalTableName")?;
2118        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2119        validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
2120
2121        let mut state = self.state.write();
2122        let gt = state
2123            .global_tables
2124            .get_mut(global_table_name)
2125            .ok_or_else(|| {
2126                AwsServiceError::aws_error(
2127                    StatusCode::BAD_REQUEST,
2128                    "GlobalTableNotFoundException",
2129                    format!("Global table not found: {global_table_name}"),
2130                )
2131            })?;
2132
2133        if let Some(updates) = body["ReplicaUpdates"].as_array() {
2134            for update in updates {
2135                if let Some(create) = update["Create"].as_object() {
2136                    if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
2137                        gt.replication_group.push(ReplicaDescription {
2138                            region_name: region.to_string(),
2139                            replica_status: "ACTIVE".to_string(),
2140                        });
2141                    }
2142                }
2143                if let Some(delete) = update["Delete"].as_object() {
2144                    if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
2145                        gt.replication_group.retain(|r| r.region_name != region);
2146                    }
2147                }
2148            }
2149        }
2150
2151        Self::ok_json(json!({
2152            "GlobalTableDescription": {
2153                "GlobalTableName": gt.global_table_name,
2154                "GlobalTableArn": gt.global_table_arn,
2155                "GlobalTableStatus": gt.global_table_status,
2156                "CreationDateTime": gt.creation_date.timestamp() as f64,
2157                "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2158                    "RegionName": r.region_name,
2159                    "ReplicaStatus": r.replica_status
2160                })).collect::<Vec<_>>()
2161            }
2162        }))
2163    }
2164
2165    fn update_global_table_settings(
2166        &self,
2167        req: &AwsRequest,
2168    ) -> Result<AwsResponse, AwsServiceError> {
2169        let body = Self::parse_body(req)?;
2170        let global_table_name = require_str(&body, "GlobalTableName")?;
2171        validate_string_length("globalTableName", global_table_name, 3, 255)?;
2172        validate_optional_enum_value(
2173            "globalTableBillingMode",
2174            &body["GlobalTableBillingMode"],
2175            &["PROVISIONED", "PAY_PER_REQUEST"],
2176        )?;
2177        validate_optional_range_i64(
2178            "globalTableProvisionedWriteCapacityUnits",
2179            body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
2180            1,
2181            i64::MAX,
2182        )?;
2183
2184        let state = self.state.read();
2185        let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2186            AwsServiceError::aws_error(
2187                StatusCode::BAD_REQUEST,
2188                "GlobalTableNotFoundException",
2189                format!("Global table not found: {global_table_name}"),
2190            )
2191        })?;
2192
2193        let replica_settings: Vec<Value> = gt
2194            .replication_group
2195            .iter()
2196            .map(|r| {
2197                json!({
2198                    "RegionName": r.region_name,
2199                    "ReplicaStatus": r.replica_status,
2200                    "ReplicaProvisionedReadCapacityUnits": 0,
2201                    "ReplicaProvisionedWriteCapacityUnits": 0
2202                })
2203            })
2204            .collect();
2205
2206        Self::ok_json(json!({
2207            "GlobalTableName": gt.global_table_name,
2208            "ReplicaSettings": replica_settings
2209        }))
2210    }
2211
2212    fn describe_table_replica_auto_scaling(
2213        &self,
2214        req: &AwsRequest,
2215    ) -> Result<AwsResponse, AwsServiceError> {
2216        let body = Self::parse_body(req)?;
2217        let table_name = require_str(&body, "TableName")?;
2218
2219        let state = self.state.read();
2220        let table = get_table(&state.tables, table_name)?;
2221
2222        Self::ok_json(json!({
2223            "TableAutoScalingDescription": {
2224                "TableName": table.name,
2225                "TableStatus": table.status,
2226                "Replicas": []
2227            }
2228        }))
2229    }
2230
2231    fn update_table_replica_auto_scaling(
2232        &self,
2233        req: &AwsRequest,
2234    ) -> Result<AwsResponse, AwsServiceError> {
2235        let body = Self::parse_body(req)?;
2236        let table_name = require_str(&body, "TableName")?;
2237
2238        let state = self.state.read();
2239        let table = get_table(&state.tables, table_name)?;
2240
2241        Self::ok_json(json!({
2242            "TableAutoScalingDescription": {
2243                "TableName": table.name,
2244                "TableStatus": table.status,
2245                "Replicas": []
2246            }
2247        }))
2248    }
2249
2250    // ── Kinesis Streaming ──────────────────────────────────────────────
2251
2252    fn enable_kinesis_streaming_destination(
2253        &self,
2254        req: &AwsRequest,
2255    ) -> Result<AwsResponse, AwsServiceError> {
2256        let body = Self::parse_body(req)?;
2257        let table_name = require_str(&body, "TableName")?;
2258        let stream_arn = require_str(&body, "StreamArn")?;
2259        let precision = body["EnableKinesisStreamingConfiguration"]
2260            ["ApproximateCreationDateTimePrecision"]
2261            .as_str()
2262            .unwrap_or("MILLISECOND");
2263
2264        let mut state = self.state.write();
2265        let table = get_table_mut(&mut state.tables, table_name)?;
2266
2267        table.kinesis_destinations.push(KinesisDestination {
2268            stream_arn: stream_arn.to_string(),
2269            destination_status: "ACTIVE".to_string(),
2270            approximate_creation_date_time_precision: precision.to_string(),
2271        });
2272
2273        Self::ok_json(json!({
2274            "TableName": table_name,
2275            "StreamArn": stream_arn,
2276            "DestinationStatus": "ACTIVE",
2277            "EnableKinesisStreamingConfiguration": {
2278                "ApproximateCreationDateTimePrecision": precision
2279            }
2280        }))
2281    }
2282
2283    fn disable_kinesis_streaming_destination(
2284        &self,
2285        req: &AwsRequest,
2286    ) -> Result<AwsResponse, AwsServiceError> {
2287        let body = Self::parse_body(req)?;
2288        let table_name = require_str(&body, "TableName")?;
2289        let stream_arn = require_str(&body, "StreamArn")?;
2290
2291        let mut state = self.state.write();
2292        let table = get_table_mut(&mut state.tables, table_name)?;
2293
2294        if let Some(dest) = table
2295            .kinesis_destinations
2296            .iter_mut()
2297            .find(|d| d.stream_arn == stream_arn)
2298        {
2299            dest.destination_status = "DISABLED".to_string();
2300        }
2301
2302        Self::ok_json(json!({
2303            "TableName": table_name,
2304            "StreamArn": stream_arn,
2305            "DestinationStatus": "DISABLED"
2306        }))
2307    }
2308
2309    fn describe_kinesis_streaming_destination(
2310        &self,
2311        req: &AwsRequest,
2312    ) -> Result<AwsResponse, AwsServiceError> {
2313        let body = Self::parse_body(req)?;
2314        let table_name = require_str(&body, "TableName")?;
2315
2316        let state = self.state.read();
2317        let table = get_table(&state.tables, table_name)?;
2318
2319        let destinations: Vec<Value> = table
2320            .kinesis_destinations
2321            .iter()
2322            .map(|d| {
2323                json!({
2324                    "StreamArn": d.stream_arn,
2325                    "DestinationStatus": d.destination_status,
2326                    "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2327                })
2328            })
2329            .collect();
2330
2331        Self::ok_json(json!({
2332            "TableName": table_name,
2333            "KinesisDataStreamDestinations": destinations
2334        }))
2335    }
2336
2337    fn update_kinesis_streaming_destination(
2338        &self,
2339        req: &AwsRequest,
2340    ) -> Result<AwsResponse, AwsServiceError> {
2341        let body = Self::parse_body(req)?;
2342        let table_name = require_str(&body, "TableName")?;
2343        let stream_arn = require_str(&body, "StreamArn")?;
2344        let precision = body["UpdateKinesisStreamingConfiguration"]
2345            ["ApproximateCreationDateTimePrecision"]
2346            .as_str()
2347            .unwrap_or("MILLISECOND");
2348
2349        let mut state = self.state.write();
2350        let table = get_table_mut(&mut state.tables, table_name)?;
2351
2352        if let Some(dest) = table
2353            .kinesis_destinations
2354            .iter_mut()
2355            .find(|d| d.stream_arn == stream_arn)
2356        {
2357            dest.approximate_creation_date_time_precision = precision.to_string();
2358        }
2359
2360        Self::ok_json(json!({
2361            "TableName": table_name,
2362            "StreamArn": stream_arn,
2363            "DestinationStatus": "ACTIVE",
2364            "UpdateKinesisStreamingConfiguration": {
2365                "ApproximateCreationDateTimePrecision": precision
2366            }
2367        }))
2368    }
2369
2370    // ── Contributor Insights ───────────────────────────────────────────
2371
2372    fn describe_contributor_insights(
2373        &self,
2374        req: &AwsRequest,
2375    ) -> Result<AwsResponse, AwsServiceError> {
2376        let body = Self::parse_body(req)?;
2377        let table_name = require_str(&body, "TableName")?;
2378        let index_name = body["IndexName"].as_str();
2379
2380        let state = self.state.read();
2381        let table = get_table(&state.tables, table_name)?;
2382
2383        let top = table.top_contributors(10);
2384        let contributors: Vec<Value> = top
2385            .iter()
2386            .map(|(key, count)| {
2387                json!({
2388                    "Key": key,
2389                    "Count": count
2390                })
2391            })
2392            .collect();
2393
2394        let mut result = json!({
2395            "TableName": table_name,
2396            "ContributorInsightsStatus": table.contributor_insights_status,
2397            "ContributorInsightsRuleList": ["DynamoDBContributorInsights"],
2398            "TopContributors": contributors
2399        });
2400        if let Some(idx) = index_name {
2401            result["IndexName"] = json!(idx);
2402        }
2403
2404        Self::ok_json(result)
2405    }
2406
2407    fn update_contributor_insights(
2408        &self,
2409        req: &AwsRequest,
2410    ) -> Result<AwsResponse, AwsServiceError> {
2411        let body = Self::parse_body(req)?;
2412        let table_name = require_str(&body, "TableName")?;
2413        let action = require_str(&body, "ContributorInsightsAction")?;
2414        let index_name = body["IndexName"].as_str();
2415
2416        let mut state = self.state.write();
2417        let table = get_table_mut(&mut state.tables, table_name)?;
2418
2419        let status = match action {
2420            "ENABLE" => "ENABLED",
2421            "DISABLE" => "DISABLED",
2422            _ => {
2423                return Err(AwsServiceError::aws_error(
2424                    StatusCode::BAD_REQUEST,
2425                    "ValidationException",
2426                    format!("Invalid ContributorInsightsAction: {action}"),
2427                ))
2428            }
2429        };
2430        table.contributor_insights_status = status.to_string();
2431        if status == "DISABLED" {
2432            table.contributor_insights_counters.clear();
2433        }
2434
2435        let mut result = json!({
2436            "TableName": table_name,
2437            "ContributorInsightsStatus": status
2438        });
2439        if let Some(idx) = index_name {
2440            result["IndexName"] = json!(idx);
2441        }
2442
2443        Self::ok_json(result)
2444    }
2445
2446    fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2447        let body = Self::parse_body(req)?;
2448        validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2449        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2450        let table_name = body["TableName"].as_str();
2451
2452        let state = self.state.read();
2453        let summaries: Vec<Value> = state
2454            .tables
2455            .values()
2456            .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2457            .map(|t| {
2458                json!({
2459                    "TableName": t.name,
2460                    "ContributorInsightsStatus": t.contributor_insights_status
2461                })
2462            })
2463            .collect();
2464
2465        Self::ok_json(json!({
2466            "ContributorInsightsSummaries": summaries
2467        }))
2468    }
2469
2470    // ── Import/Export ──────────────────────────────────────────────────
2471
2472    fn export_table_to_point_in_time(
2473        &self,
2474        req: &AwsRequest,
2475    ) -> Result<AwsResponse, AwsServiceError> {
2476        let body = Self::parse_body(req)?;
2477        let table_arn = require_str(&body, "TableArn")?;
2478        let s3_bucket = require_str(&body, "S3Bucket")?;
2479        let s3_prefix = body["S3Prefix"].as_str();
2480        let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2481
2482        let state = self.state.read();
2483        // Verify table exists and get items
2484        let table = find_table_by_arn(&state.tables, table_arn)?;
2485        let items = table.items.clone();
2486        let item_count = items.len() as i64;
2487
2488        let now = Utc::now();
2489        let export_arn = format!(
2490            "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2491            state.region,
2492            state.account_id,
2493            table_arn.rsplit('/').next().unwrap_or("unknown"),
2494            uuid::Uuid::new_v4()
2495        );
2496
2497        drop(state);
2498
2499        // Serialize items as JSON Lines and write to S3
2500        let mut json_lines = String::new();
2501        for item in &items {
2502            let item_json = if export_format == "DYNAMODB_JSON" {
2503                json!({ "Item": item })
2504            } else {
2505                json!(item)
2506            };
2507            json_lines.push_str(&serde_json::to_string(&item_json).unwrap_or_default());
2508            json_lines.push('\n');
2509        }
2510        let data_size = json_lines.len() as i64;
2511
2512        // Build S3 key for the export data
2513        let s3_key = if let Some(prefix) = s3_prefix {
2514            format!("{prefix}/data/manifest-files.json")
2515        } else {
2516            "data/manifest-files.json".to_string()
2517        };
2518
2519        // Write to S3 if we have access to S3 state
2520        let mut export_failed = false;
2521        let mut failure_reason = String::new();
2522        if let Some(ref s3_state) = self.s3_state {
2523            let mut s3 = s3_state.write();
2524            if let Some(bucket) = s3.buckets.get_mut(s3_bucket) {
2525                let etag = uuid::Uuid::new_v4().to_string().replace('-', "");
2526                let obj = fakecloud_s3::state::S3Object {
2527                    key: s3_key.clone(),
2528                    data: bytes::Bytes::from(json_lines),
2529                    content_type: "application/json".to_string(),
2530                    etag,
2531                    size: data_size as u64,
2532                    last_modified: now,
2533                    metadata: HashMap::new(),
2534                    storage_class: "STANDARD".to_string(),
2535                    tags: HashMap::new(),
2536                    acl_grants: Vec::new(),
2537                    acl_owner_id: None,
2538                    parts_count: None,
2539                    part_sizes: None,
2540                    sse_algorithm: None,
2541                    sse_kms_key_id: None,
2542                    bucket_key_enabled: None,
2543                    version_id: None,
2544                    is_delete_marker: false,
2545                    content_encoding: None,
2546                    website_redirect_location: None,
2547                    restore_ongoing: None,
2548                    restore_expiry: None,
2549                    checksum_algorithm: None,
2550                    checksum_value: None,
2551                    lock_mode: None,
2552                    lock_retain_until: None,
2553                    lock_legal_hold: None,
2554                };
2555                bucket.objects.insert(s3_key, obj);
2556            } else {
2557                export_failed = true;
2558                failure_reason = format!("S3 bucket does not exist: {s3_bucket}");
2559            }
2560        }
2561
2562        let export_status = if export_failed { "FAILED" } else { "COMPLETED" };
2563
2564        let export = ExportDescription {
2565            export_arn: export_arn.clone(),
2566            export_status: export_status.to_string(),
2567            table_arn: table_arn.to_string(),
2568            s3_bucket: s3_bucket.to_string(),
2569            s3_prefix: s3_prefix.map(|s| s.to_string()),
2570            export_format: export_format.to_string(),
2571            start_time: now,
2572            end_time: now,
2573            export_time: now,
2574            item_count,
2575            billed_size_bytes: data_size,
2576        };
2577
2578        let mut state = self.state.write();
2579        state.exports.insert(export_arn.clone(), export);
2580
2581        let mut response = json!({
2582            "ExportDescription": {
2583                "ExportArn": export_arn,
2584                "ExportStatus": export_status,
2585                "TableArn": table_arn,
2586                "S3Bucket": s3_bucket,
2587                "S3Prefix": s3_prefix,
2588                "ExportFormat": export_format,
2589                "StartTime": now.timestamp() as f64,
2590                "EndTime": now.timestamp() as f64,
2591                "ExportTime": now.timestamp() as f64,
2592                "ItemCount": item_count,
2593                "BilledSizeBytes": data_size
2594            }
2595        });
2596        if export_failed {
2597            response["ExportDescription"]["FailureCode"] = json!("S3NoSuchBucket");
2598            response["ExportDescription"]["FailureMessage"] = json!(failure_reason);
2599        }
2600        Self::ok_json(response)
2601    }
2602
2603    fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2604        let body = Self::parse_body(req)?;
2605        let export_arn = require_str(&body, "ExportArn")?;
2606
2607        let state = self.state.read();
2608        let export = state.exports.get(export_arn).ok_or_else(|| {
2609            AwsServiceError::aws_error(
2610                StatusCode::BAD_REQUEST,
2611                "ExportNotFoundException",
2612                format!("Export not found: {export_arn}"),
2613            )
2614        })?;
2615
2616        Self::ok_json(json!({
2617            "ExportDescription": {
2618                "ExportArn": export.export_arn,
2619                "ExportStatus": export.export_status,
2620                "TableArn": export.table_arn,
2621                "S3Bucket": export.s3_bucket,
2622                "S3Prefix": export.s3_prefix,
2623                "ExportFormat": export.export_format,
2624                "StartTime": export.start_time.timestamp() as f64,
2625                "EndTime": export.end_time.timestamp() as f64,
2626                "ExportTime": export.export_time.timestamp() as f64,
2627                "ItemCount": export.item_count,
2628                "BilledSizeBytes": export.billed_size_bytes
2629            }
2630        }))
2631    }
2632
2633    fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2634        let body = Self::parse_body(req)?;
2635        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2636        validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2637        let table_arn = body["TableArn"].as_str();
2638
2639        let state = self.state.read();
2640        let summaries: Vec<Value> = state
2641            .exports
2642            .values()
2643            .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2644            .map(|e| {
2645                json!({
2646                    "ExportArn": e.export_arn,
2647                    "ExportStatus": e.export_status,
2648                    "TableArn": e.table_arn
2649                })
2650            })
2651            .collect();
2652
2653        Self::ok_json(json!({
2654            "ExportSummaries": summaries
2655        }))
2656    }
2657
2658    fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2659        let body = Self::parse_body(req)?;
2660        let input_format = require_str(&body, "InputFormat")?;
2661        let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2662            AwsServiceError::aws_error(
2663                StatusCode::BAD_REQUEST,
2664                "ValidationException",
2665                "S3BucketSource is required",
2666            )
2667        })?;
2668        let s3_bucket = s3_source
2669            .get("S3Bucket")
2670            .and_then(|v| v.as_str())
2671            .unwrap_or("");
2672        let s3_key_prefix = s3_source
2673            .get("S3KeyPrefix")
2674            .and_then(|v| v.as_str())
2675            .unwrap_or("");
2676
2677        let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2678            AwsServiceError::aws_error(
2679                StatusCode::BAD_REQUEST,
2680                "ValidationException",
2681                "TableCreationParameters is required",
2682            )
2683        })?;
2684        let table_name = table_params
2685            .get("TableName")
2686            .and_then(|v| v.as_str())
2687            .ok_or_else(|| {
2688                AwsServiceError::aws_error(
2689                    StatusCode::BAD_REQUEST,
2690                    "ValidationException",
2691                    "TableCreationParameters.TableName is required",
2692                )
2693            })?;
2694
2695        let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
2696        let attribute_definitions = parse_attribute_definitions(
2697            table_params
2698                .get("AttributeDefinitions")
2699                .unwrap_or(&Value::Null),
2700        )?;
2701
2702        // Read items from S3 if we have access
2703        let mut imported_items: Vec<HashMap<String, Value>> = Vec::new();
2704        let mut processed_size_bytes: i64 = 0;
2705        if let Some(ref s3_state) = self.s3_state {
2706            let s3 = s3_state.read();
2707            let bucket = s3.buckets.get(s3_bucket).ok_or_else(|| {
2708                AwsServiceError::aws_error(
2709                    StatusCode::BAD_REQUEST,
2710                    "ImportConflictException",
2711                    format!("S3 bucket does not exist: {s3_bucket}"),
2712                )
2713            })?;
2714            // Find all objects under the prefix and try to parse JSON Lines from each
2715            let prefix = if s3_key_prefix.is_empty() {
2716                String::new()
2717            } else {
2718                s3_key_prefix.to_string()
2719            };
2720            for (key, obj) in &bucket.objects {
2721                if !prefix.is_empty() && !key.starts_with(&prefix) {
2722                    continue;
2723                }
2724                let data = std::str::from_utf8(&obj.data).unwrap_or("");
2725                processed_size_bytes += obj.size as i64;
2726                for line in data.lines() {
2727                    let line = line.trim();
2728                    if line.is_empty() {
2729                        continue;
2730                    }
2731                    if let Ok(parsed) = serde_json::from_str::<Value>(line) {
2732                        // DYNAMODB_JSON format wraps items in {"Item": {...}}
2733                        let item = if input_format == "DYNAMODB_JSON" {
2734                            if let Some(item_obj) = parsed.get("Item") {
2735                                item_obj.as_object().cloned().unwrap_or_default()
2736                            } else {
2737                                parsed.as_object().cloned().unwrap_or_default()
2738                            }
2739                        } else {
2740                            parsed.as_object().cloned().unwrap_or_default()
2741                        };
2742                        if !item.is_empty() {
2743                            imported_items
2744                                .push(item.into_iter().collect::<HashMap<String, Value>>());
2745                        }
2746                    }
2747                }
2748            }
2749        }
2750
2751        let mut state = self.state.write();
2752
2753        if state.tables.contains_key(table_name) {
2754            return Err(AwsServiceError::aws_error(
2755                StatusCode::BAD_REQUEST,
2756                "ResourceInUseException",
2757                format!("Table already exists: {table_name}"),
2758            ));
2759        }
2760
2761        let now = Utc::now();
2762        let table_arn = format!(
2763            "arn:aws:dynamodb:{}:{}:table/{}",
2764            state.region, state.account_id, table_name
2765        );
2766        let import_arn = format!(
2767            "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
2768            state.region,
2769            state.account_id,
2770            table_name,
2771            uuid::Uuid::new_v4()
2772        );
2773
2774        let processed_item_count = imported_items.len() as i64;
2775
2776        let mut table = DynamoTable {
2777            name: table_name.to_string(),
2778            arn: table_arn.clone(),
2779            key_schema,
2780            attribute_definitions,
2781            provisioned_throughput: ProvisionedThroughput {
2782                read_capacity_units: 0,
2783                write_capacity_units: 0,
2784            },
2785            items: imported_items,
2786            gsi: Vec::new(),
2787            lsi: Vec::new(),
2788            tags: HashMap::new(),
2789            created_at: now,
2790            status: "ACTIVE".to_string(),
2791            item_count: 0,
2792            size_bytes: 0,
2793            billing_mode: "PAY_PER_REQUEST".to_string(),
2794            ttl_attribute: None,
2795            ttl_enabled: false,
2796            resource_policy: None,
2797            pitr_enabled: false,
2798            kinesis_destinations: Vec::new(),
2799            contributor_insights_status: "DISABLED".to_string(),
2800            contributor_insights_counters: HashMap::new(),
2801        };
2802        table.recalculate_stats();
2803        state.tables.insert(table_name.to_string(), table);
2804
2805        let import_desc = ImportDescription {
2806            import_arn: import_arn.clone(),
2807            import_status: "COMPLETED".to_string(),
2808            table_arn: table_arn.clone(),
2809            table_name: table_name.to_string(),
2810            s3_bucket_source: s3_bucket.to_string(),
2811            input_format: input_format.to_string(),
2812            start_time: now,
2813            end_time: now,
2814            processed_item_count,
2815            processed_size_bytes,
2816        };
2817        state.imports.insert(import_arn.clone(), import_desc);
2818
2819        let table_ref = state.tables.get(table_name).unwrap();
2820        let ks: Vec<Value> = table_ref
2821            .key_schema
2822            .iter()
2823            .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2824            .collect();
2825        let ad: Vec<Value> = table_ref
2826            .attribute_definitions
2827            .iter()
2828            .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
2829            .collect();
2830
2831        Self::ok_json(json!({
2832            "ImportTableDescription": {
2833                "ImportArn": import_arn,
2834                "ImportStatus": "COMPLETED",
2835                "TableArn": table_arn,
2836                "TableId": uuid::Uuid::new_v4().to_string(),
2837                "S3BucketSource": {
2838                    "S3Bucket": s3_bucket
2839                },
2840                "InputFormat": input_format,
2841                "TableCreationParameters": {
2842                    "TableName": table_name,
2843                    "KeySchema": ks,
2844                    "AttributeDefinitions": ad
2845                },
2846                "StartTime": now.timestamp() as f64,
2847                "EndTime": now.timestamp() as f64,
2848                "ProcessedItemCount": processed_item_count,
2849                "ProcessedSizeBytes": processed_size_bytes
2850            }
2851        }))
2852    }
2853
2854    fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2855        let body = Self::parse_body(req)?;
2856        let import_arn = require_str(&body, "ImportArn")?;
2857
2858        let state = self.state.read();
2859        let import = state.imports.get(import_arn).ok_or_else(|| {
2860            AwsServiceError::aws_error(
2861                StatusCode::BAD_REQUEST,
2862                "ImportNotFoundException",
2863                format!("Import not found: {import_arn}"),
2864            )
2865        })?;
2866
2867        Self::ok_json(json!({
2868            "ImportTableDescription": {
2869                "ImportArn": import.import_arn,
2870                "ImportStatus": import.import_status,
2871                "TableArn": import.table_arn,
2872                "S3BucketSource": {
2873                    "S3Bucket": import.s3_bucket_source
2874                },
2875                "InputFormat": import.input_format,
2876                "StartTime": import.start_time.timestamp() as f64,
2877                "EndTime": import.end_time.timestamp() as f64,
2878                "ProcessedItemCount": import.processed_item_count,
2879                "ProcessedSizeBytes": import.processed_size_bytes
2880            }
2881        }))
2882    }
2883
2884    fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2885        let body = Self::parse_body(req)?;
2886        validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2887        validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
2888        validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
2889        let table_arn = body["TableArn"].as_str();
2890
2891        let state = self.state.read();
2892        let summaries: Vec<Value> = state
2893            .imports
2894            .values()
2895            .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
2896            .map(|i| {
2897                json!({
2898                    "ImportArn": i.import_arn,
2899                    "ImportStatus": i.import_status,
2900                    "TableArn": i.table_arn
2901                })
2902            })
2903            .collect();
2904
2905        Self::ok_json(json!({
2906            "ImportSummaryList": summaries
2907        }))
2908    }
2909}
2910
2911#[async_trait]
2912impl AwsService for DynamoDbService {
2913    fn service_name(&self) -> &str {
2914        "dynamodb"
2915    }
2916
2917    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2918        match req.action.as_str() {
2919            "CreateTable" => self.create_table(&req),
2920            "DeleteTable" => self.delete_table(&req),
2921            "DescribeTable" => self.describe_table(&req),
2922            "ListTables" => self.list_tables(&req),
2923            "UpdateTable" => self.update_table(&req),
2924            "PutItem" => self.put_item(&req),
2925            "GetItem" => self.get_item(&req),
2926            "DeleteItem" => self.delete_item(&req),
2927            "UpdateItem" => self.update_item(&req),
2928            "Query" => self.query(&req),
2929            "Scan" => self.scan(&req),
2930            "BatchGetItem" => self.batch_get_item(&req),
2931            "BatchWriteItem" => self.batch_write_item(&req),
2932            "TagResource" => self.tag_resource(&req),
2933            "UntagResource" => self.untag_resource(&req),
2934            "ListTagsOfResource" => self.list_tags_of_resource(&req),
2935            "TransactGetItems" => self.transact_get_items(&req),
2936            "TransactWriteItems" => self.transact_write_items(&req),
2937            "ExecuteStatement" => self.execute_statement(&req),
2938            "BatchExecuteStatement" => self.batch_execute_statement(&req),
2939            "ExecuteTransaction" => self.execute_transaction(&req),
2940            "UpdateTimeToLive" => self.update_time_to_live(&req),
2941            "DescribeTimeToLive" => self.describe_time_to_live(&req),
2942            "PutResourcePolicy" => self.put_resource_policy(&req),
2943            "GetResourcePolicy" => self.get_resource_policy(&req),
2944            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
2945            // Stubs
2946            "DescribeEndpoints" => self.describe_endpoints(&req),
2947            "DescribeLimits" => self.describe_limits(&req),
2948            // Backups
2949            "CreateBackup" => self.create_backup(&req),
2950            "DeleteBackup" => self.delete_backup(&req),
2951            "DescribeBackup" => self.describe_backup(&req),
2952            "ListBackups" => self.list_backups(&req),
2953            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
2954            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
2955            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
2956            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
2957            // Global tables
2958            "CreateGlobalTable" => self.create_global_table(&req),
2959            "DescribeGlobalTable" => self.describe_global_table(&req),
2960            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
2961            "ListGlobalTables" => self.list_global_tables(&req),
2962            "UpdateGlobalTable" => self.update_global_table(&req),
2963            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
2964            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
2965            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
2966            // Kinesis streaming
2967            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
2968            "DisableKinesisStreamingDestination" => {
2969                self.disable_kinesis_streaming_destination(&req)
2970            }
2971            "DescribeKinesisStreamingDestination" => {
2972                self.describe_kinesis_streaming_destination(&req)
2973            }
2974            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
2975            // Contributor insights
2976            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
2977            "UpdateContributorInsights" => self.update_contributor_insights(&req),
2978            "ListContributorInsights" => self.list_contributor_insights(&req),
2979            // Import/Export
2980            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
2981            "DescribeExport" => self.describe_export(&req),
2982            "ListExports" => self.list_exports(&req),
2983            "ImportTable" => self.import_table(&req),
2984            "DescribeImport" => self.describe_import(&req),
2985            "ListImports" => self.list_imports(&req),
2986            _ => Err(AwsServiceError::action_not_implemented(
2987                "dynamodb",
2988                &req.action,
2989            )),
2990        }
2991    }
2992
2993    fn supported_actions(&self) -> &[&str] {
2994        &[
2995            "CreateTable",
2996            "DeleteTable",
2997            "DescribeTable",
2998            "ListTables",
2999            "UpdateTable",
3000            "PutItem",
3001            "GetItem",
3002            "DeleteItem",
3003            "UpdateItem",
3004            "Query",
3005            "Scan",
3006            "BatchGetItem",
3007            "BatchWriteItem",
3008            "TagResource",
3009            "UntagResource",
3010            "ListTagsOfResource",
3011            "TransactGetItems",
3012            "TransactWriteItems",
3013            "ExecuteStatement",
3014            "BatchExecuteStatement",
3015            "ExecuteTransaction",
3016            "UpdateTimeToLive",
3017            "DescribeTimeToLive",
3018            "PutResourcePolicy",
3019            "GetResourcePolicy",
3020            "DeleteResourcePolicy",
3021            "DescribeEndpoints",
3022            "DescribeLimits",
3023            "CreateBackup",
3024            "DeleteBackup",
3025            "DescribeBackup",
3026            "ListBackups",
3027            "RestoreTableFromBackup",
3028            "RestoreTableToPointInTime",
3029            "UpdateContinuousBackups",
3030            "DescribeContinuousBackups",
3031            "CreateGlobalTable",
3032            "DescribeGlobalTable",
3033            "DescribeGlobalTableSettings",
3034            "ListGlobalTables",
3035            "UpdateGlobalTable",
3036            "UpdateGlobalTableSettings",
3037            "DescribeTableReplicaAutoScaling",
3038            "UpdateTableReplicaAutoScaling",
3039            "EnableKinesisStreamingDestination",
3040            "DisableKinesisStreamingDestination",
3041            "DescribeKinesisStreamingDestination",
3042            "UpdateKinesisStreamingDestination",
3043            "DescribeContributorInsights",
3044            "UpdateContributorInsights",
3045            "ListContributorInsights",
3046            "ExportTableToPointInTime",
3047            "DescribeExport",
3048            "ListExports",
3049            "ImportTable",
3050            "DescribeImport",
3051            "ListImports",
3052        ]
3053    }
3054}
3055
3056// ── Helper functions ────────────────────────────────────────────────────
3057
3058fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
3059    body[field].as_str().ok_or_else(|| {
3060        AwsServiceError::aws_error(
3061            StatusCode::BAD_REQUEST,
3062            "ValidationException",
3063            format!("{field} is required"),
3064        )
3065    })
3066}
3067
3068fn require_object(
3069    body: &Value,
3070    field: &str,
3071) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
3072    let obj = body[field].as_object().ok_or_else(|| {
3073        AwsServiceError::aws_error(
3074            StatusCode::BAD_REQUEST,
3075            "ValidationException",
3076            format!("{field} is required"),
3077        )
3078    })?;
3079    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3080}
3081
3082fn get_table<'a>(
3083    tables: &'a HashMap<String, DynamoTable>,
3084    name: &str,
3085) -> Result<&'a DynamoTable, AwsServiceError> {
3086    tables.get(name).ok_or_else(|| {
3087        AwsServiceError::aws_error(
3088            StatusCode::BAD_REQUEST,
3089            "ResourceNotFoundException",
3090            format!("Requested resource not found: Table: {name} not found"),
3091        )
3092    })
3093}
3094
3095fn get_table_mut<'a>(
3096    tables: &'a mut HashMap<String, DynamoTable>,
3097    name: &str,
3098) -> Result<&'a mut DynamoTable, AwsServiceError> {
3099    tables.get_mut(name).ok_or_else(|| {
3100        AwsServiceError::aws_error(
3101            StatusCode::BAD_REQUEST,
3102            "ResourceNotFoundException",
3103            format!("Requested resource not found: Table: {name} not found"),
3104        )
3105    })
3106}
3107
3108fn find_table_by_arn<'a>(
3109    tables: &'a HashMap<String, DynamoTable>,
3110    arn: &str,
3111) -> Result<&'a DynamoTable, AwsServiceError> {
3112    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
3113        AwsServiceError::aws_error(
3114            StatusCode::BAD_REQUEST,
3115            "ResourceNotFoundException",
3116            format!("Requested resource not found: {arn}"),
3117        )
3118    })
3119}
3120
3121fn find_table_by_arn_mut<'a>(
3122    tables: &'a mut HashMap<String, DynamoTable>,
3123    arn: &str,
3124) -> Result<&'a mut DynamoTable, AwsServiceError> {
3125    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
3126        AwsServiceError::aws_error(
3127            StatusCode::BAD_REQUEST,
3128            "ResourceNotFoundException",
3129            format!("Requested resource not found: {arn}"),
3130        )
3131    })
3132}
3133
3134fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
3135    let arr = val.as_array().ok_or_else(|| {
3136        AwsServiceError::aws_error(
3137            StatusCode::BAD_REQUEST,
3138            "ValidationException",
3139            "KeySchema is required",
3140        )
3141    })?;
3142    Ok(arr
3143        .iter()
3144        .map(|elem| KeySchemaElement {
3145            attribute_name: elem["AttributeName"]
3146                .as_str()
3147                .unwrap_or_default()
3148                .to_string(),
3149            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
3150        })
3151        .collect())
3152}
3153
3154fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
3155    let arr = val.as_array().ok_or_else(|| {
3156        AwsServiceError::aws_error(
3157            StatusCode::BAD_REQUEST,
3158            "ValidationException",
3159            "AttributeDefinitions is required",
3160        )
3161    })?;
3162    Ok(arr
3163        .iter()
3164        .map(|elem| AttributeDefinition {
3165            attribute_name: elem["AttributeName"]
3166                .as_str()
3167                .unwrap_or_default()
3168                .to_string(),
3169            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
3170        })
3171        .collect())
3172}
3173
3174fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
3175    Ok(ProvisionedThroughput {
3176        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
3177        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
3178    })
3179}
3180
3181fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
3182    let Some(arr) = val.as_array() else {
3183        return Vec::new();
3184    };
3185    arr.iter()
3186        .filter_map(|g| {
3187            Some(GlobalSecondaryIndex {
3188                index_name: g["IndexName"].as_str()?.to_string(),
3189                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
3190                projection: parse_projection(&g["Projection"]),
3191                provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
3192                    .ok(),
3193            })
3194        })
3195        .collect()
3196}
3197
3198fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
3199    let Some(arr) = val.as_array() else {
3200        return Vec::new();
3201    };
3202    arr.iter()
3203        .filter_map(|l| {
3204            Some(LocalSecondaryIndex {
3205                index_name: l["IndexName"].as_str()?.to_string(),
3206                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
3207                projection: parse_projection(&l["Projection"]),
3208            })
3209        })
3210        .collect()
3211}
3212
3213fn parse_projection(val: &Value) -> Projection {
3214    Projection {
3215        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
3216        non_key_attributes: val["NonKeyAttributes"]
3217            .as_array()
3218            .map(|arr| {
3219                arr.iter()
3220                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
3221                    .collect()
3222            })
3223            .unwrap_or_default(),
3224    }
3225}
3226
3227fn parse_tags(val: &Value) -> HashMap<String, String> {
3228    let mut tags = HashMap::new();
3229    if let Some(arr) = val.as_array() {
3230        for tag in arr {
3231            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
3232                tags.insert(k.to_string(), v.to_string());
3233            }
3234        }
3235    }
3236    tags
3237}
3238
3239fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
3240    let mut names = HashMap::new();
3241    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
3242        for (k, v) in obj {
3243            if let Some(s) = v.as_str() {
3244                names.insert(k.clone(), s.to_string());
3245            }
3246        }
3247    }
3248    names
3249}
3250
3251fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
3252    let mut values = HashMap::new();
3253    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
3254        for (k, v) in obj {
3255            values.insert(k.clone(), v.clone());
3256        }
3257    }
3258    values
3259}
3260
3261fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
3262    if name.starts_with('#') {
3263        expr_attr_names
3264            .get(name)
3265            .cloned()
3266            .unwrap_or_else(|| name.to_string())
3267    } else {
3268        name.to_string()
3269    }
3270}
3271
3272fn extract_key(
3273    table: &DynamoTable,
3274    item: &HashMap<String, AttributeValue>,
3275) -> HashMap<String, AttributeValue> {
3276    let mut key = HashMap::new();
3277    let hash_key = table.hash_key_name();
3278    if let Some(v) = item.get(hash_key) {
3279        key.insert(hash_key.to_string(), v.clone());
3280    }
3281    if let Some(range_key) = table.range_key_name() {
3282        if let Some(v) = item.get(range_key) {
3283            key.insert(range_key.to_string(), v.clone());
3284        }
3285    }
3286    key
3287}
3288
3289/// Parse a JSON object into a key map (used for ExclusiveStartKey).
3290fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
3291    let obj = value.as_object()?;
3292    if obj.is_empty() {
3293        return None;
3294    }
3295    Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3296}
3297
3298/// Check whether an item's key attributes match the given key map.
3299fn item_matches_key(
3300    item: &HashMap<String, AttributeValue>,
3301    key: &HashMap<String, AttributeValue>,
3302    hash_key_name: &str,
3303    range_key_name: Option<&str>,
3304) -> bool {
3305    let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
3306        (Some(a), Some(b)) => a == b,
3307        _ => false,
3308    };
3309    if !hash_match {
3310        return false;
3311    }
3312    match range_key_name {
3313        Some(rk) => match (item.get(rk), key.get(rk)) {
3314            (Some(a), Some(b)) => a == b,
3315            (None, None) => true,
3316            _ => false,
3317        },
3318        None => true,
3319    }
3320}
3321
3322/// Extract the primary key from an item given explicit key attribute names.
3323fn extract_key_for_schema(
3324    item: &HashMap<String, AttributeValue>,
3325    hash_key_name: &str,
3326    range_key_name: Option<&str>,
3327) -> HashMap<String, AttributeValue> {
3328    let mut key = HashMap::new();
3329    if let Some(v) = item.get(hash_key_name) {
3330        key.insert(hash_key_name.to_string(), v.clone());
3331    }
3332    if let Some(rk) = range_key_name {
3333        if let Some(v) = item.get(rk) {
3334            key.insert(rk.to_string(), v.clone());
3335        }
3336    }
3337    key
3338}
3339
3340fn validate_key_in_item(
3341    table: &DynamoTable,
3342    item: &HashMap<String, AttributeValue>,
3343) -> Result<(), AwsServiceError> {
3344    let hash_key = table.hash_key_name();
3345    if !item.contains_key(hash_key) {
3346        return Err(AwsServiceError::aws_error(
3347            StatusCode::BAD_REQUEST,
3348            "ValidationException",
3349            format!("Missing the key {hash_key} in the item"),
3350        ));
3351    }
3352    if let Some(range_key) = table.range_key_name() {
3353        if !item.contains_key(range_key) {
3354            return Err(AwsServiceError::aws_error(
3355                StatusCode::BAD_REQUEST,
3356                "ValidationException",
3357                format!("Missing the key {range_key} in the item"),
3358            ));
3359        }
3360    }
3361    Ok(())
3362}
3363
3364fn validate_key_attributes_in_key(
3365    table: &DynamoTable,
3366    key: &HashMap<String, AttributeValue>,
3367) -> Result<(), AwsServiceError> {
3368    let hash_key = table.hash_key_name();
3369    if !key.contains_key(hash_key) {
3370        return Err(AwsServiceError::aws_error(
3371            StatusCode::BAD_REQUEST,
3372            "ValidationException",
3373            format!("Missing the key {hash_key} in the item"),
3374        ));
3375    }
3376    Ok(())
3377}
3378
3379fn project_item(
3380    item: &HashMap<String, AttributeValue>,
3381    body: &Value,
3382) -> HashMap<String, AttributeValue> {
3383    let projection = body["ProjectionExpression"].as_str();
3384    match projection {
3385        Some(proj) if !proj.is_empty() => {
3386            let expr_attr_names = parse_expression_attribute_names(body);
3387            let attrs: Vec<String> = proj
3388                .split(',')
3389                .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3390                .collect();
3391            let mut result = HashMap::new();
3392            for attr in &attrs {
3393                if let Some(v) = resolve_nested_path(item, attr) {
3394                    insert_nested_value(&mut result, attr, v);
3395                }
3396            }
3397            result
3398        }
3399        _ => item.clone(),
3400    }
3401}
3402
3403/// Resolve expression attribute names within each segment of a projection path.
3404/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
3405fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3406    // Split on dots, resolve each part, rejoin
3407    let mut result = String::new();
3408    for (i, segment) in path.split('.').enumerate() {
3409        if i > 0 {
3410            result.push('.');
3411        }
3412        // A segment might be like "#n" or "people[0]" or "#attr[0]"
3413        if let Some(bracket_pos) = segment.find('[') {
3414            let key_part = &segment[..bracket_pos];
3415            let index_part = &segment[bracket_pos..];
3416            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3417            result.push_str(index_part);
3418        } else {
3419            result.push_str(&resolve_attr_name(segment, expr_attr_names));
3420        }
3421    }
3422    result
3423}
3424
3425/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
3426fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3427    let segments = parse_path_segments(path);
3428    if segments.is_empty() {
3429        return None;
3430    }
3431
3432    let first = &segments[0];
3433    let top_key = match first {
3434        PathSegment::Key(k) => k.as_str(),
3435        _ => return None,
3436    };
3437
3438    let mut current = item.get(top_key)?.clone();
3439
3440    for segment in &segments[1..] {
3441        match segment {
3442            PathSegment::Key(k) => {
3443                // Navigate into a Map: {"M": {"key": ...}}
3444                current = current.get("M")?.get(k)?.clone();
3445            }
3446            PathSegment::Index(idx) => {
3447                // Navigate into a List: {"L": [...]}
3448                current = current.get("L")?.get(*idx)?.clone();
3449            }
3450        }
3451    }
3452
3453    Some(current)
3454}
3455
3456#[derive(Debug)]
3457enum PathSegment {
3458    Key(String),
3459    Index(usize),
3460}
3461
3462/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
3463fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3464    let mut segments = Vec::new();
3465    let mut current = String::new();
3466
3467    let chars: Vec<char> = path.chars().collect();
3468    let mut i = 0;
3469    while i < chars.len() {
3470        match chars[i] {
3471            '.' => {
3472                if !current.is_empty() {
3473                    segments.push(PathSegment::Key(current.clone()));
3474                    current.clear();
3475                }
3476            }
3477            '[' => {
3478                if !current.is_empty() {
3479                    segments.push(PathSegment::Key(current.clone()));
3480                    current.clear();
3481                }
3482                i += 1;
3483                let mut num = String::new();
3484                while i < chars.len() && chars[i] != ']' {
3485                    num.push(chars[i]);
3486                    i += 1;
3487                }
3488                if let Ok(idx) = num.parse::<usize>() {
3489                    segments.push(PathSegment::Index(idx));
3490                }
3491                // skip ']'
3492            }
3493            c => {
3494                current.push(c);
3495            }
3496        }
3497        i += 1;
3498    }
3499    if !current.is_empty() {
3500        segments.push(PathSegment::Key(current));
3501    }
3502    segments
3503}
3504
3505/// Insert a value at a nested path in the result HashMap.
3506/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
3507fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3508    // Simple case: no nesting
3509    if !path.contains('.') && !path.contains('[') {
3510        result.insert(path.to_string(), value);
3511        return;
3512    }
3513
3514    let segments = parse_path_segments(path);
3515    if segments.is_empty() {
3516        return;
3517    }
3518
3519    let top_key = match &segments[0] {
3520        PathSegment::Key(k) => k.clone(),
3521        _ => return,
3522    };
3523
3524    if segments.len() == 1 {
3525        result.insert(top_key, value);
3526        return;
3527    }
3528
3529    // For nested paths, wrap the value back into the nested structure
3530    let wrapped = wrap_value_in_path(&segments[1..], value);
3531    // Merge into existing value if present
3532    let existing = result.remove(&top_key);
3533    let merged = match existing {
3534        Some(existing) => merge_attribute_values(existing, wrapped),
3535        None => wrapped,
3536    };
3537    result.insert(top_key, merged);
3538}
3539
3540/// Wrap a value in the nested path structure.
3541fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3542    if segments.is_empty() {
3543        return value;
3544    }
3545    let inner = wrap_value_in_path(&segments[1..], value);
3546    match &segments[0] {
3547        PathSegment::Key(k) => {
3548            json!({"M": {k.clone(): inner}})
3549        }
3550        PathSegment::Index(idx) => {
3551            let mut arr = vec![Value::Null; idx + 1];
3552            arr[*idx] = inner;
3553            json!({"L": arr})
3554        }
3555    }
3556}
3557
3558/// Merge two attribute values (for overlapping projections).
3559fn merge_attribute_values(a: Value, b: Value) -> Value {
3560    if let (Some(a_map), Some(b_map)) = (
3561        a.get("M").and_then(|v| v.as_object()),
3562        b.get("M").and_then(|v| v.as_object()),
3563    ) {
3564        let mut merged = a_map.clone();
3565        for (k, v) in b_map {
3566            if let Some(existing) = merged.get(k) {
3567                merged.insert(
3568                    k.clone(),
3569                    merge_attribute_values(existing.clone(), v.clone()),
3570                );
3571            } else {
3572                merged.insert(k.clone(), v.clone());
3573            }
3574        }
3575        json!({"M": merged})
3576    } else {
3577        b
3578    }
3579}
3580
3581fn evaluate_condition(
3582    condition: &str,
3583    existing: Option<&HashMap<String, AttributeValue>>,
3584    expr_attr_names: &HashMap<String, String>,
3585    expr_attr_values: &HashMap<String, Value>,
3586) -> Result<(), AwsServiceError> {
3587    let cond = condition.trim();
3588
3589    if let Some(inner) = extract_function_arg(cond, "attribute_not_exists") {
3590        let attr = resolve_attr_name(inner, expr_attr_names);
3591        match existing {
3592            Some(item) if item.contains_key(&attr) => {
3593                return Err(AwsServiceError::aws_error(
3594                    StatusCode::BAD_REQUEST,
3595                    "ConditionalCheckFailedException",
3596                    "The conditional request failed",
3597                ));
3598            }
3599            _ => return Ok(()),
3600        }
3601    }
3602
3603    if let Some(inner) = extract_function_arg(cond, "attribute_exists") {
3604        let attr = resolve_attr_name(inner, expr_attr_names);
3605        match existing {
3606            Some(item) if item.contains_key(&attr) => return Ok(()),
3607            _ => {
3608                return Err(AwsServiceError::aws_error(
3609                    StatusCode::BAD_REQUEST,
3610                    "ConditionalCheckFailedException",
3611                    "The conditional request failed",
3612                ));
3613            }
3614        }
3615    }
3616
3617    if let Some((left, op, right)) = parse_simple_comparison(cond) {
3618        let attr_name = resolve_attr_name(left.trim(), expr_attr_names);
3619        let expected = expr_attr_values.get(right.trim());
3620        let actual = existing.and_then(|item| item.get(&attr_name));
3621
3622        let result = match op {
3623            "=" => actual == expected,
3624            "<>" => actual != expected,
3625            _ => true,
3626        };
3627
3628        if !result {
3629            return Err(AwsServiceError::aws_error(
3630                StatusCode::BAD_REQUEST,
3631                "ConditionalCheckFailedException",
3632                "The conditional request failed",
3633            ));
3634        }
3635    }
3636
3637    Ok(())
3638}
3639
3640fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3641    let prefix = format!("{func_name}(");
3642    if let Some(rest) = expr.strip_prefix(&prefix) {
3643        if let Some(inner) = rest.strip_suffix(')') {
3644            return Some(inner.trim());
3645        }
3646    }
3647    None
3648}
3649
3650fn parse_simple_comparison(expr: &str) -> Option<(&str, &str, &str)> {
3651    for op in &["<>", "=", "<", ">", "<=", ">="] {
3652        if let Some(pos) = expr.find(op) {
3653            let left = &expr[..pos];
3654            let right = &expr[pos + op.len()..];
3655            return Some((left, op, right));
3656        }
3657    }
3658    None
3659}
3660
3661fn evaluate_key_condition(
3662    expr: &str,
3663    item: &HashMap<String, AttributeValue>,
3664    hash_key_name: &str,
3665    _range_key_name: Option<&str>,
3666    expr_attr_names: &HashMap<String, String>,
3667    expr_attr_values: &HashMap<String, Value>,
3668) -> bool {
3669    let parts: Vec<&str> = split_on_and(expr);
3670    for part in &parts {
3671        let part = part.trim();
3672        if !evaluate_single_key_condition(
3673            part,
3674            item,
3675            hash_key_name,
3676            expr_attr_names,
3677            expr_attr_values,
3678        ) {
3679            return false;
3680        }
3681    }
3682    true
3683}
3684
3685fn split_on_and(expr: &str) -> Vec<&str> {
3686    let mut parts = Vec::new();
3687    let mut start = 0;
3688    let len = expr.len();
3689    let mut i = 0;
3690    let mut depth = 0;
3691    while i < len {
3692        let ch = expr.as_bytes()[i];
3693        if ch == b'(' {
3694            depth += 1;
3695        } else if ch == b')' {
3696            if depth > 0 {
3697                depth -= 1;
3698            }
3699        } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
3700            parts.push(&expr[start..i]);
3701            start = i + 5;
3702            i = start;
3703            continue;
3704        }
3705        i += 1;
3706    }
3707    parts.push(&expr[start..]);
3708    parts
3709}
3710
3711fn split_on_or(expr: &str) -> Vec<&str> {
3712    let mut parts = Vec::new();
3713    let mut start = 0;
3714    let len = expr.len();
3715    let mut i = 0;
3716    let mut depth = 0;
3717    while i < len {
3718        let ch = expr.as_bytes()[i];
3719        if ch == b'(' {
3720            depth += 1;
3721        } else if ch == b')' {
3722            if depth > 0 {
3723                depth -= 1;
3724            }
3725        } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
3726            parts.push(&expr[start..i]);
3727            start = i + 4;
3728            i = start;
3729            continue;
3730        }
3731        i += 1;
3732    }
3733    parts.push(&expr[start..]);
3734    parts
3735}
3736
3737fn evaluate_single_key_condition(
3738    part: &str,
3739    item: &HashMap<String, AttributeValue>,
3740    _hash_key_name: &str,
3741    expr_attr_names: &HashMap<String, String>,
3742    expr_attr_values: &HashMap<String, Value>,
3743) -> bool {
3744    let part = part.trim();
3745
3746    // begins_with(attr, :val)
3747    if let Some(rest) = part
3748        .strip_prefix("begins_with(")
3749        .or_else(|| part.strip_prefix("begins_with ("))
3750    {
3751        if let Some(inner) = rest.strip_suffix(')') {
3752            let mut split = inner.splitn(2, ',');
3753            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3754                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3755                let val_ref = val_ref.trim();
3756                let expected = expr_attr_values.get(val_ref);
3757                let actual = item.get(&attr_name);
3758                return match (actual, expected) {
3759                    (Some(a), Some(e)) => {
3760                        let a_str = extract_string_value(a);
3761                        let e_str = extract_string_value(e);
3762                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3763                    }
3764                    _ => false,
3765                };
3766            }
3767        }
3768        return false;
3769    }
3770
3771    // BETWEEN
3772    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
3773        let attr_part = part[..between_pos].trim();
3774        let attr_name = resolve_attr_name(attr_part, expr_attr_names);
3775        let range_part = &part[between_pos + 7..];
3776        if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
3777            let lo_ref = range_part[..and_pos].trim();
3778            let hi_ref = range_part[and_pos + 5..].trim();
3779            let lo = expr_attr_values.get(lo_ref);
3780            let hi = expr_attr_values.get(hi_ref);
3781            let actual = item.get(&attr_name);
3782            return match (actual, lo, hi) {
3783                (Some(a), Some(l), Some(h)) => {
3784                    compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
3785                        && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
3786                }
3787                _ => false,
3788            };
3789        }
3790    }
3791
3792    // Simple comparison: attr <op> :val
3793    for op in &["<=", ">=", "<>", "=", "<", ">"] {
3794        if let Some(pos) = part.find(op) {
3795            let left = part[..pos].trim();
3796            let right = part[pos + op.len()..].trim();
3797            let attr_name = resolve_attr_name(left, expr_attr_names);
3798            let expected = expr_attr_values.get(right);
3799            let actual = item.get(&attr_name);
3800
3801            return match *op {
3802                "=" => actual == expected,
3803                "<>" => actual != expected,
3804                "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
3805                ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
3806                "<=" => {
3807                    let cmp = compare_attribute_values(actual, expected);
3808                    cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
3809                }
3810                ">=" => {
3811                    let cmp = compare_attribute_values(actual, expected);
3812                    cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
3813                }
3814                _ => true,
3815            };
3816        }
3817    }
3818
3819    true
3820}
3821
3822fn extract_string_value(val: &Value) -> Option<String> {
3823    val.get("S")
3824        .and_then(|v| v.as_str())
3825        .map(|s| s.to_string())
3826        .or_else(|| val.get("N").and_then(|v| v.as_str()).map(|n| n.to_string()))
3827}
3828
3829fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
3830    match (a, b) {
3831        (None, None) => std::cmp::Ordering::Equal,
3832        (None, Some(_)) => std::cmp::Ordering::Less,
3833        (Some(_), None) => std::cmp::Ordering::Greater,
3834        (Some(a), Some(b)) => {
3835            let a_type = attribute_type_and_value(a);
3836            let b_type = attribute_type_and_value(b);
3837            match (a_type, b_type) {
3838                (Some(("S", a_val)), Some(("S", b_val))) => {
3839                    let a_str = a_val.as_str().unwrap_or("");
3840                    let b_str = b_val.as_str().unwrap_or("");
3841                    a_str.cmp(b_str)
3842                }
3843                (Some(("N", a_val)), Some(("N", b_val))) => {
3844                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3845                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3846                    a_num
3847                        .partial_cmp(&b_num)
3848                        .unwrap_or(std::cmp::Ordering::Equal)
3849                }
3850                (Some(("B", a_val)), Some(("B", b_val))) => {
3851                    let a_str = a_val.as_str().unwrap_or("");
3852                    let b_str = b_val.as_str().unwrap_or("");
3853                    a_str.cmp(b_str)
3854                }
3855                _ => std::cmp::Ordering::Equal,
3856            }
3857        }
3858    }
3859}
3860
3861fn evaluate_filter_expression(
3862    expr: &str,
3863    item: &HashMap<String, AttributeValue>,
3864    expr_attr_names: &HashMap<String, String>,
3865    expr_attr_values: &HashMap<String, Value>,
3866) -> bool {
3867    let trimmed = expr.trim();
3868
3869    // Split on OR first (lower precedence), respecting parentheses
3870    let or_parts = split_on_or(trimmed);
3871    if or_parts.len() > 1 {
3872        return or_parts.iter().any(|part| {
3873            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3874        });
3875    }
3876
3877    // Then split on AND (higher precedence), respecting parentheses
3878    let and_parts = split_on_and(trimmed);
3879    if and_parts.len() > 1 {
3880        return and_parts.iter().all(|part| {
3881            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3882        });
3883    }
3884
3885    // Strip outer parentheses if present
3886    let stripped = strip_outer_parens(trimmed);
3887    if stripped != trimmed {
3888        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
3889    }
3890
3891    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
3892}
3893
3894/// Strip matching outer parentheses from an expression.
3895fn strip_outer_parens(expr: &str) -> &str {
3896    let trimmed = expr.trim();
3897    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
3898        return trimmed;
3899    }
3900    // Verify the outer parens actually match each other
3901    let inner = &trimmed[1..trimmed.len() - 1];
3902    let mut depth = 0;
3903    for ch in inner.bytes() {
3904        match ch {
3905            b'(' => depth += 1,
3906            b')' => {
3907                if depth == 0 {
3908                    return trimmed; // closing paren matches something inside, not the outer one
3909                }
3910                depth -= 1;
3911            }
3912            _ => {}
3913        }
3914    }
3915    if depth == 0 {
3916        inner
3917    } else {
3918        trimmed
3919    }
3920}
3921
3922fn evaluate_single_filter_condition(
3923    part: &str,
3924    item: &HashMap<String, AttributeValue>,
3925    expr_attr_names: &HashMap<String, String>,
3926    expr_attr_values: &HashMap<String, Value>,
3927) -> bool {
3928    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
3929        let attr = resolve_attr_name(inner, expr_attr_names);
3930        return item.contains_key(&attr);
3931    }
3932
3933    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
3934        let attr = resolve_attr_name(inner, expr_attr_names);
3935        return !item.contains_key(&attr);
3936    }
3937
3938    if let Some(rest) = part
3939        .strip_prefix("begins_with(")
3940        .or_else(|| part.strip_prefix("begins_with ("))
3941    {
3942        if let Some(inner) = rest.strip_suffix(')') {
3943            let mut split = inner.splitn(2, ',');
3944            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3945                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3946                let expected = expr_attr_values.get(val_ref.trim());
3947                let actual = item.get(&attr_name);
3948                return match (actual, expected) {
3949                    (Some(a), Some(e)) => {
3950                        let a_str = extract_string_value(a);
3951                        let e_str = extract_string_value(e);
3952                        matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3953                    }
3954                    _ => false,
3955                };
3956            }
3957        }
3958    }
3959
3960    if let Some(rest) = part
3961        .strip_prefix("contains(")
3962        .or_else(|| part.strip_prefix("contains ("))
3963    {
3964        if let Some(inner) = rest.strip_suffix(')') {
3965            let mut split = inner.splitn(2, ',');
3966            if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3967                let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3968                let expected = expr_attr_values.get(val_ref.trim());
3969                let actual = item.get(&attr_name);
3970                return match (actual, expected) {
3971                    (Some(a), Some(e)) => {
3972                        let a_str = extract_string_value(a);
3973                        let e_str = extract_string_value(e);
3974                        matches!((a_str, e_str), (Some(a), Some(e)) if a.contains(&e))
3975                    }
3976                    _ => false,
3977                };
3978            }
3979        }
3980    }
3981
3982    evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
3983}
3984
3985fn apply_update_expression(
3986    item: &mut HashMap<String, AttributeValue>,
3987    expr: &str,
3988    expr_attr_names: &HashMap<String, String>,
3989    expr_attr_values: &HashMap<String, Value>,
3990) -> Result<(), AwsServiceError> {
3991    let clauses = parse_update_clauses(expr);
3992    for (action, assignments) in &clauses {
3993        match action.to_ascii_uppercase().as_str() {
3994            "SET" => {
3995                for assignment in assignments {
3996                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3997                }
3998            }
3999            "REMOVE" => {
4000                for attr_ref in assignments {
4001                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4002                    item.remove(&attr);
4003                }
4004            }
4005            "ADD" => {
4006                for assignment in assignments {
4007                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4008                }
4009            }
4010            "DELETE" => {
4011                for assignment in assignments {
4012                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4013                }
4014            }
4015            _ => {}
4016        }
4017    }
4018    Ok(())
4019}
4020
4021fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
4022    let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
4023    let upper = expr.to_ascii_uppercase();
4024    let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
4025    let mut positions: Vec<(usize, &str)> = Vec::new();
4026
4027    for kw in &keywords {
4028        let mut search_from = 0;
4029        while let Some(pos) = upper[search_from..].find(kw) {
4030            let abs_pos = search_from + pos;
4031            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
4032            let after_pos = abs_pos + kw.len();
4033            let after_ok =
4034                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
4035            if before_ok && after_ok {
4036                positions.push((abs_pos, kw));
4037            }
4038            search_from = abs_pos + kw.len();
4039        }
4040    }
4041
4042    positions.sort_by_key(|(pos, _)| *pos);
4043
4044    for (i, &(pos, kw)) in positions.iter().enumerate() {
4045        let start = pos + kw.len();
4046        let end = if i + 1 < positions.len() {
4047            positions[i + 1].0
4048        } else {
4049            expr.len()
4050        };
4051        let content = expr[start..end].trim();
4052        let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
4053        clauses.push((kw.to_string(), assignments));
4054    }
4055
4056    clauses
4057}
4058
4059fn apply_set_assignment(
4060    item: &mut HashMap<String, AttributeValue>,
4061    assignment: &str,
4062    expr_attr_names: &HashMap<String, String>,
4063    expr_attr_values: &HashMap<String, Value>,
4064) -> Result<(), AwsServiceError> {
4065    let Some((left, right)) = assignment.split_once('=') else {
4066        return Ok(());
4067    };
4068
4069    let attr = resolve_attr_name(left.trim(), expr_attr_names);
4070    let right = right.trim();
4071
4072    // if_not_exists(attr, :val)
4073    if let Some(rest) = right
4074        .strip_prefix("if_not_exists(")
4075        .or_else(|| right.strip_prefix("if_not_exists ("))
4076    {
4077        if let Some(inner) = rest.strip_suffix(')') {
4078            let mut split = inner.splitn(2, ',');
4079            if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
4080                let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
4081                if !item.contains_key(&check_name) {
4082                    if let Some(val) = expr_attr_values.get(default_ref.trim()) {
4083                        item.insert(attr, val.clone());
4084                    }
4085                }
4086                return Ok(());
4087            }
4088        }
4089    }
4090
4091    // list_append(a, b)
4092    if let Some(rest) = right
4093        .strip_prefix("list_append(")
4094        .or_else(|| right.strip_prefix("list_append ("))
4095    {
4096        if let Some(inner) = rest.strip_suffix(')') {
4097            let mut split = inner.splitn(2, ',');
4098            if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
4099                let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
4100                let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
4101
4102                let mut merged = Vec::new();
4103                if let Some(Value::Object(obj)) = &a_val {
4104                    if let Some(Value::Array(arr)) = obj.get("L") {
4105                        merged.extend(arr.clone());
4106                    }
4107                }
4108                if let Some(Value::Object(obj)) = &b_val {
4109                    if let Some(Value::Array(arr)) = obj.get("L") {
4110                        merged.extend(arr.clone());
4111                    }
4112                }
4113
4114                item.insert(attr, json!({"L": merged}));
4115                return Ok(());
4116            }
4117        }
4118    }
4119
4120    // Arithmetic: attr + :val or attr - :val
4121    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
4122        let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
4123        let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
4124
4125        let left_num = extract_number(&left_val).unwrap_or(0.0);
4126        let right_num = extract_number(&right_val).unwrap_or(0.0);
4127
4128        let result = if is_add {
4129            left_num + right_num
4130        } else {
4131            left_num - right_num
4132        };
4133
4134        let num_str = if result == result.trunc() {
4135            format!("{}", result as i64)
4136        } else {
4137            format!("{result}")
4138        };
4139
4140        item.insert(attr, json!({"N": num_str}));
4141        return Ok(());
4142    }
4143
4144    // Simple assignment
4145    let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
4146    if let Some(v) = val {
4147        item.insert(attr, v);
4148    }
4149
4150    Ok(())
4151}
4152
4153fn resolve_value(
4154    reference: &str,
4155    item: &HashMap<String, AttributeValue>,
4156    expr_attr_names: &HashMap<String, String>,
4157    expr_attr_values: &HashMap<String, Value>,
4158) -> Option<Value> {
4159    let reference = reference.trim();
4160    if reference.starts_with(':') {
4161        expr_attr_values.get(reference).cloned()
4162    } else {
4163        let attr_name = resolve_attr_name(reference, expr_attr_names);
4164        item.get(&attr_name).cloned()
4165    }
4166}
4167
4168fn extract_number(val: &Option<Value>) -> Option<f64> {
4169    val.as_ref()
4170        .and_then(|v| v.get("N"))
4171        .and_then(|n| n.as_str())
4172        .and_then(|s| s.parse().ok())
4173}
4174
4175fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
4176    let mut depth = 0;
4177    for (i, c) in expr.char_indices() {
4178        match c {
4179            '(' => depth += 1,
4180            ')' => depth -= 1,
4181            '+' if depth == 0 && i > 0 => {
4182                return Some((&expr[..i], &expr[i + 1..], true));
4183            }
4184            '-' if depth == 0 && i > 0 => {
4185                return Some((&expr[..i], &expr[i + 1..], false));
4186            }
4187            _ => {}
4188        }
4189    }
4190    None
4191}
4192
4193fn apply_add_assignment(
4194    item: &mut HashMap<String, AttributeValue>,
4195    assignment: &str,
4196    expr_attr_names: &HashMap<String, String>,
4197    expr_attr_values: &HashMap<String, Value>,
4198) -> Result<(), AwsServiceError> {
4199    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4200    if parts.len() != 2 {
4201        return Ok(());
4202    }
4203
4204    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4205    let val_ref = parts[1].trim();
4206    let add_val = expr_attr_values.get(val_ref);
4207
4208    if let Some(add_val) = add_val {
4209        if let Some(existing) = item.get(&attr) {
4210            if let (Some(existing_num), Some(add_num)) = (
4211                extract_number(&Some(existing.clone())),
4212                extract_number(&Some(add_val.clone())),
4213            ) {
4214                let result = existing_num + add_num;
4215                let num_str = if result == result.trunc() {
4216                    format!("{}", result as i64)
4217                } else {
4218                    format!("{result}")
4219                };
4220                item.insert(attr, json!({"N": num_str}));
4221            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
4222                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
4223                    let mut merged: Vec<Value> = existing_set.clone();
4224                    for v in add_set {
4225                        if !merged.contains(v) {
4226                            merged.push(v.clone());
4227                        }
4228                    }
4229                    item.insert(attr, json!({"SS": merged}));
4230                }
4231            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
4232                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
4233                    let mut merged: Vec<Value> = existing_set.clone();
4234                    for v in add_set {
4235                        if !merged.contains(v) {
4236                            merged.push(v.clone());
4237                        }
4238                    }
4239                    item.insert(attr, json!({"NS": merged}));
4240                }
4241            }
4242        } else {
4243            item.insert(attr, add_val.clone());
4244        }
4245    }
4246
4247    Ok(())
4248}
4249
4250fn apply_delete_assignment(
4251    item: &mut HashMap<String, AttributeValue>,
4252    assignment: &str,
4253    expr_attr_names: &HashMap<String, String>,
4254    expr_attr_values: &HashMap<String, Value>,
4255) -> Result<(), AwsServiceError> {
4256    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4257    if parts.len() != 2 {
4258        return Ok(());
4259    }
4260
4261    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4262    let val_ref = parts[1].trim();
4263    let del_val = expr_attr_values.get(val_ref);
4264
4265    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
4266        if let (Some(existing_set), Some(del_set)) = (
4267            existing.get("SS").and_then(|v| v.as_array()),
4268            del_val.get("SS").and_then(|v| v.as_array()),
4269        ) {
4270            let filtered: Vec<Value> = existing_set
4271                .iter()
4272                .filter(|v| !del_set.contains(v))
4273                .cloned()
4274                .collect();
4275            if filtered.is_empty() {
4276                item.remove(&attr);
4277            } else {
4278                item.insert(attr, json!({"SS": filtered}));
4279            }
4280        } else if let (Some(existing_set), Some(del_set)) = (
4281            existing.get("NS").and_then(|v| v.as_array()),
4282            del_val.get("NS").and_then(|v| v.as_array()),
4283        ) {
4284            let filtered: Vec<Value> = existing_set
4285                .iter()
4286                .filter(|v| !del_set.contains(v))
4287                .cloned()
4288                .collect();
4289            if filtered.is_empty() {
4290                item.remove(&attr);
4291            } else {
4292                item.insert(attr, json!({"NS": filtered}));
4293            }
4294        }
4295    }
4296
4297    Ok(())
4298}
4299
4300#[allow(clippy::too_many_arguments)]
4301fn build_table_description_json(
4302    arn: &str,
4303    key_schema: &[KeySchemaElement],
4304    attribute_definitions: &[AttributeDefinition],
4305    provisioned_throughput: &ProvisionedThroughput,
4306    gsi: &[GlobalSecondaryIndex],
4307    lsi: &[LocalSecondaryIndex],
4308    billing_mode: &str,
4309    created_at: chrono::DateTime<chrono::Utc>,
4310    item_count: i64,
4311    size_bytes: i64,
4312    status: &str,
4313) -> Value {
4314    let table_name = arn.rsplit('/').next().unwrap_or("");
4315    let creation_timestamp =
4316        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
4317
4318    let ks: Vec<Value> = key_schema
4319        .iter()
4320        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4321        .collect();
4322
4323    let ad: Vec<Value> = attribute_definitions
4324        .iter()
4325        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
4326        .collect();
4327
4328    let mut desc = json!({
4329        "TableName": table_name,
4330        "TableArn": arn,
4331        "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
4332        "TableStatus": status,
4333        "KeySchema": ks,
4334        "AttributeDefinitions": ad,
4335        "CreationDateTime": creation_timestamp,
4336        "ItemCount": item_count,
4337        "TableSizeBytes": size_bytes,
4338        "BillingModeSummary": { "BillingMode": billing_mode },
4339    });
4340
4341    if billing_mode != "PAY_PER_REQUEST" {
4342        desc["ProvisionedThroughput"] = json!({
4343            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
4344            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
4345            "NumberOfDecreasesToday": 0,
4346        });
4347    } else {
4348        desc["ProvisionedThroughput"] = json!({
4349            "ReadCapacityUnits": 0,
4350            "WriteCapacityUnits": 0,
4351            "NumberOfDecreasesToday": 0,
4352        });
4353    }
4354
4355    if !gsi.is_empty() {
4356        let gsi_json: Vec<Value> = gsi
4357            .iter()
4358            .map(|g| {
4359                let gks: Vec<Value> = g
4360                    .key_schema
4361                    .iter()
4362                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4363                    .collect();
4364                let mut idx = json!({
4365                    "IndexName": g.index_name,
4366                    "KeySchema": gks,
4367                    "Projection": { "ProjectionType": g.projection.projection_type },
4368                    "IndexStatus": "ACTIVE",
4369                    "IndexArn": format!("{arn}/index/{}", g.index_name),
4370                    "ItemCount": 0,
4371                    "IndexSizeBytes": 0,
4372                });
4373                if !g.projection.non_key_attributes.is_empty() {
4374                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
4375                }
4376                if let Some(ref pt) = g.provisioned_throughput {
4377                    idx["ProvisionedThroughput"] = json!({
4378                        "ReadCapacityUnits": pt.read_capacity_units,
4379                        "WriteCapacityUnits": pt.write_capacity_units,
4380                        "NumberOfDecreasesToday": 0,
4381                    });
4382                }
4383                idx
4384            })
4385            .collect();
4386        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
4387    }
4388
4389    if !lsi.is_empty() {
4390        let lsi_json: Vec<Value> = lsi
4391            .iter()
4392            .map(|l| {
4393                let lks: Vec<Value> = l
4394                    .key_schema
4395                    .iter()
4396                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4397                    .collect();
4398                let mut idx = json!({
4399                    "IndexName": l.index_name,
4400                    "KeySchema": lks,
4401                    "Projection": { "ProjectionType": l.projection.projection_type },
4402                    "IndexArn": format!("{arn}/index/{}", l.index_name),
4403                    "ItemCount": 0,
4404                    "IndexSizeBytes": 0,
4405                });
4406                if !l.projection.non_key_attributes.is_empty() {
4407                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
4408                }
4409                idx
4410            })
4411            .collect();
4412        desc["LocalSecondaryIndexes"] = json!(lsi_json);
4413    }
4414
4415    desc
4416}
4417
4418fn build_table_description(table: &DynamoTable) -> Value {
4419    build_table_description_json(
4420        &table.arn,
4421        &table.key_schema,
4422        &table.attribute_definitions,
4423        &table.provisioned_throughput,
4424        &table.gsi,
4425        &table.lsi,
4426        &table.billing_mode,
4427        table.created_at,
4428        table.item_count,
4429        table.size_bytes,
4430        &table.status,
4431    )
4432}
4433
4434fn execute_partiql_statement(
4435    state: &SharedDynamoDbState,
4436    statement: &str,
4437    parameters: &[Value],
4438) -> Result<AwsResponse, AwsServiceError> {
4439    let trimmed = statement.trim();
4440    let upper = trimmed.to_ascii_uppercase();
4441
4442    if upper.starts_with("SELECT") {
4443        execute_partiql_select(state, trimmed, parameters)
4444    } else if upper.starts_with("INSERT") {
4445        execute_partiql_insert(state, trimmed, parameters)
4446    } else if upper.starts_with("UPDATE") {
4447        execute_partiql_update(state, trimmed, parameters)
4448    } else if upper.starts_with("DELETE") {
4449        execute_partiql_delete(state, trimmed, parameters)
4450    } else {
4451        Err(AwsServiceError::aws_error(
4452            StatusCode::BAD_REQUEST,
4453            "ValidationException",
4454            format!("Unsupported PartiQL statement: {trimmed}"),
4455        ))
4456    }
4457}
4458
4459/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
4460fn execute_partiql_select(
4461    state: &SharedDynamoDbState,
4462    statement: &str,
4463    parameters: &[Value],
4464) -> Result<AwsResponse, AwsServiceError> {
4465    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
4466    let upper = statement.to_ascii_uppercase();
4467    let from_pos = upper.find("FROM").ok_or_else(|| {
4468        AwsServiceError::aws_error(
4469            StatusCode::BAD_REQUEST,
4470            "ValidationException",
4471            "Invalid SELECT statement: missing FROM",
4472        )
4473    })?;
4474
4475    let after_from = statement[from_pos + 4..].trim();
4476    let (table_name, rest) = parse_partiql_table_name(after_from);
4477
4478    let state = state.read();
4479    let table = get_table(&state.tables, &table_name)?;
4480
4481    let rest_upper = rest.trim().to_ascii_uppercase();
4482    if rest_upper.starts_with("WHERE") {
4483        let where_clause = rest.trim()[5..].trim();
4484        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
4485        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
4486        DynamoDbService::ok_json(json!({ "Items": items }))
4487    } else {
4488        // No WHERE, return all items
4489        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
4490        DynamoDbService::ok_json(json!({ "Items": items }))
4491    }
4492}
4493
4494fn execute_partiql_insert(
4495    state: &SharedDynamoDbState,
4496    statement: &str,
4497    parameters: &[Value],
4498) -> Result<AwsResponse, AwsServiceError> {
4499    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
4500    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
4501    let upper = statement.to_ascii_uppercase();
4502    let into_pos = upper.find("INTO").ok_or_else(|| {
4503        AwsServiceError::aws_error(
4504            StatusCode::BAD_REQUEST,
4505            "ValidationException",
4506            "Invalid INSERT statement: missing INTO",
4507        )
4508    })?;
4509
4510    let after_into = statement[into_pos + 4..].trim();
4511    let (table_name, rest) = parse_partiql_table_name(after_into);
4512
4513    let rest_upper = rest.trim().to_ascii_uppercase();
4514    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
4515        AwsServiceError::aws_error(
4516            StatusCode::BAD_REQUEST,
4517            "ValidationException",
4518            "Invalid INSERT statement: missing VALUE",
4519        )
4520    })?;
4521
4522    let value_str = rest.trim()[value_pos + 5..].trim();
4523    let item = parse_partiql_value_object(value_str, parameters)?;
4524
4525    let mut state = state.write();
4526    let table = get_table_mut(&mut state.tables, &table_name)?;
4527    let key = extract_key(table, &item);
4528    if table.find_item_index(&key).is_some() {
4529        // DynamoDB PartiQL INSERT fails if item exists
4530        return Err(AwsServiceError::aws_error(
4531            StatusCode::BAD_REQUEST,
4532            "DuplicateItemException",
4533            "Duplicate primary key exists in table",
4534        ));
4535    } else {
4536        table.items.push(item);
4537    }
4538    table.recalculate_stats();
4539
4540    DynamoDbService::ok_json(json!({}))
4541}
4542
4543fn execute_partiql_update(
4544    state: &SharedDynamoDbState,
4545    statement: &str,
4546    parameters: &[Value],
4547) -> Result<AwsResponse, AwsServiceError> {
4548    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
4549    // or: UPDATE "tablename" SET attr=? WHERE pk=?
4550    let after_update = statement[6..].trim(); // skip "UPDATE"
4551    let (table_name, rest) = parse_partiql_table_name(after_update);
4552
4553    let rest_upper = rest.trim().to_ascii_uppercase();
4554    let set_pos = rest_upper.find("SET").ok_or_else(|| {
4555        AwsServiceError::aws_error(
4556            StatusCode::BAD_REQUEST,
4557            "ValidationException",
4558            "Invalid UPDATE statement: missing SET",
4559        )
4560    })?;
4561
4562    let after_set = rest.trim()[set_pos + 3..].trim();
4563
4564    // Split on WHERE
4565    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
4566    let (set_clause, where_clause) = if let Some(wp) = where_pos {
4567        (&after_set[..wp], after_set[wp + 5..].trim())
4568    } else {
4569        (after_set, "")
4570    };
4571
4572    let mut state = state.write();
4573    let table = get_table_mut(&mut state.tables, &table_name)?;
4574
4575    let matched_indices = if !where_clause.is_empty() {
4576        find_partiql_where_indices(table, where_clause, parameters)?
4577    } else {
4578        (0..table.items.len()).collect()
4579    };
4580
4581    // Parse SET assignments: attr=value, attr2=value2
4582    let param_offset = count_params_in_str(where_clause);
4583    let assignments: Vec<&str> = set_clause.split(',').collect();
4584    for idx in &matched_indices {
4585        let mut local_offset = param_offset;
4586        for assignment in &assignments {
4587            let assignment = assignment.trim();
4588            if let Some((attr, val_str)) = assignment.split_once('=') {
4589                let attr = attr.trim().trim_matches('"');
4590                let val_str = val_str.trim();
4591                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
4592                if let Some(v) = value {
4593                    table.items[*idx].insert(attr.to_string(), v);
4594                }
4595            }
4596        }
4597    }
4598    table.recalculate_stats();
4599
4600    DynamoDbService::ok_json(json!({}))
4601}
4602
4603fn execute_partiql_delete(
4604    state: &SharedDynamoDbState,
4605    statement: &str,
4606    parameters: &[Value],
4607) -> Result<AwsResponse, AwsServiceError> {
4608    // Pattern: DELETE FROM "tablename" WHERE pk='val'
4609    let upper = statement.to_ascii_uppercase();
4610    let from_pos = upper.find("FROM").ok_or_else(|| {
4611        AwsServiceError::aws_error(
4612            StatusCode::BAD_REQUEST,
4613            "ValidationException",
4614            "Invalid DELETE statement: missing FROM",
4615        )
4616    })?;
4617
4618    let after_from = statement[from_pos + 4..].trim();
4619    let (table_name, rest) = parse_partiql_table_name(after_from);
4620
4621    let rest_upper = rest.trim().to_ascii_uppercase();
4622    if !rest_upper.starts_with("WHERE") {
4623        return Err(AwsServiceError::aws_error(
4624            StatusCode::BAD_REQUEST,
4625            "ValidationException",
4626            "DELETE requires a WHERE clause",
4627        ));
4628    }
4629    let where_clause = rest.trim()[5..].trim();
4630
4631    let mut state = state.write();
4632    let table = get_table_mut(&mut state.tables, &table_name)?;
4633
4634    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
4635    // Remove from highest index first to avoid invalidating lower indices
4636    indices.sort_unstable();
4637    indices.reverse();
4638    for idx in indices {
4639        table.items.remove(idx);
4640    }
4641    table.recalculate_stats();
4642
4643    DynamoDbService::ok_json(json!({}))
4644}
4645
4646/// Parse a table name that may be quoted with double quotes.
4647/// Returns (table_name, rest_of_string).
4648fn parse_partiql_table_name(s: &str) -> (String, &str) {
4649    let s = s.trim();
4650    if let Some(stripped) = s.strip_prefix('"') {
4651        // Quoted name
4652        if let Some(end) = stripped.find('"') {
4653            let name = &stripped[..end];
4654            let rest = &stripped[end + 1..];
4655            (name.to_string(), rest)
4656        } else {
4657            let end = s.find(' ').unwrap_or(s.len());
4658            (s[..end].trim_matches('"').to_string(), &s[end..])
4659        }
4660    } else {
4661        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
4662        (s[..end].to_string(), &s[end..])
4663    }
4664}
4665
4666/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
4667/// Returns matching items.
4668fn evaluate_partiql_where<'a>(
4669    table: &'a DynamoTable,
4670    where_clause: &str,
4671    parameters: &[Value],
4672) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
4673    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
4674    Ok(indices.iter().map(|i| &table.items[*i]).collect())
4675}
4676
4677fn find_partiql_where_indices(
4678    table: &DynamoTable,
4679    where_clause: &str,
4680    parameters: &[Value],
4681) -> Result<Vec<usize>, AwsServiceError> {
4682    // Support: col = 'val' AND col2 = 'val2'  or  col = ? AND col2 = ?
4683    // Case-insensitive AND splitting
4684    let upper = where_clause.to_uppercase();
4685    let conditions = if upper.contains(" AND ") {
4686        // Find positions of " AND " case-insensitively and split
4687        let mut parts = Vec::new();
4688        let mut last = 0;
4689        for (i, _) in upper.match_indices(" AND ") {
4690            parts.push(where_clause[last..i].trim());
4691            last = i + 5;
4692        }
4693        parts.push(where_clause[last..].trim());
4694        parts
4695    } else {
4696        vec![where_clause.trim()]
4697    };
4698
4699    let mut param_idx = 0usize;
4700    let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
4701
4702    for cond in &conditions {
4703        let cond = cond.trim();
4704        if let Some((left, right)) = cond.split_once('=') {
4705            let attr = left.trim().trim_matches('"').to_string();
4706            let val_str = right.trim();
4707            let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
4708            if let Some(v) = value {
4709                parsed_conditions.push((attr, v));
4710            }
4711        }
4712    }
4713
4714    let mut indices = Vec::new();
4715    for (i, item) in table.items.iter().enumerate() {
4716        let all_match = parsed_conditions
4717            .iter()
4718            .all(|(attr, expected)| item.get(attr) == Some(expected));
4719        if all_match {
4720            indices.push(i);
4721        }
4722    }
4723
4724    Ok(indices)
4725}
4726
4727/// Parse a PartiQL literal value. Supports:
4728/// - 'string' -> {"S": "string"}
4729/// - 123 -> {"N": "123"}
4730/// - ? -> parameter from list
4731fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
4732    let s = s.trim();
4733    if s == "?" {
4734        let idx = *param_idx;
4735        *param_idx += 1;
4736        parameters.get(idx).cloned()
4737    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
4738        let inner = &s[1..s.len() - 1];
4739        Some(json!({"S": inner}))
4740    } else if let Ok(n) = s.parse::<f64>() {
4741        let num_str = if n == n.trunc() {
4742            format!("{}", n as i64)
4743        } else {
4744            format!("{n}")
4745        };
4746        Some(json!({"N": num_str}))
4747    } else {
4748        None
4749    }
4750}
4751
4752/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
4753fn parse_partiql_value_object(
4754    s: &str,
4755    parameters: &[Value],
4756) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
4757    let s = s.trim();
4758    let inner = s
4759        .strip_prefix('{')
4760        .and_then(|s| s.strip_suffix('}'))
4761        .ok_or_else(|| {
4762            AwsServiceError::aws_error(
4763                StatusCode::BAD_REQUEST,
4764                "ValidationException",
4765                "Invalid VALUE: expected object literal",
4766            )
4767        })?;
4768
4769    let mut item = HashMap::new();
4770    let mut param_idx = 0usize;
4771
4772    // Simple comma-separated key:value parsing
4773    for pair in split_partiql_pairs(inner) {
4774        let pair = pair.trim();
4775        if pair.is_empty() {
4776            continue;
4777        }
4778        if let Some((key_part, val_part)) = pair.split_once(':') {
4779            let key = key_part
4780                .trim()
4781                .trim_matches('\'')
4782                .trim_matches('"')
4783                .to_string();
4784            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
4785                item.insert(key, val);
4786            }
4787        }
4788    }
4789
4790    Ok(item)
4791}
4792
4793/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
4794fn split_partiql_pairs(s: &str) -> Vec<&str> {
4795    let mut parts = Vec::new();
4796    let mut start = 0;
4797    let mut depth = 0;
4798    let mut in_quote = false;
4799
4800    for (i, c) in s.char_indices() {
4801        match c {
4802            '\'' if !in_quote => in_quote = true,
4803            '\'' if in_quote => in_quote = false,
4804            '{' if !in_quote => depth += 1,
4805            '}' if !in_quote => depth -= 1,
4806            ',' if !in_quote && depth == 0 => {
4807                parts.push(&s[start..i]);
4808                start = i + 1;
4809            }
4810            _ => {}
4811        }
4812    }
4813    parts.push(&s[start..]);
4814    parts
4815}
4816
4817/// Count ? parameters in a string.
4818fn count_params_in_str(s: &str) -> usize {
4819    s.chars().filter(|c| *c == '?').count()
4820}
4821
4822#[cfg(test)]
4823mod tests {
4824    use super::*;
4825    use serde_json::json;
4826
4827    #[test]
4828    fn test_parse_update_clauses_set() {
4829        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
4830        assert_eq!(clauses.len(), 1);
4831        assert_eq!(clauses[0].0, "SET");
4832        assert_eq!(clauses[0].1.len(), 2);
4833    }
4834
4835    #[test]
4836    fn test_parse_update_clauses_set_and_remove() {
4837        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
4838        assert_eq!(clauses.len(), 2);
4839        assert_eq!(clauses[0].0, "SET");
4840        assert_eq!(clauses[1].0, "REMOVE");
4841    }
4842
4843    #[test]
4844    fn test_evaluate_key_condition_simple() {
4845        let mut item = HashMap::new();
4846        item.insert("pk".to_string(), json!({"S": "user1"}));
4847        item.insert("sk".to_string(), json!({"S": "order1"}));
4848
4849        let mut expr_values = HashMap::new();
4850        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
4851
4852        assert!(evaluate_key_condition(
4853            "pk = :pk",
4854            &item,
4855            "pk",
4856            Some("sk"),
4857            &HashMap::new(),
4858            &expr_values,
4859        ));
4860    }
4861
4862    #[test]
4863    fn test_compare_attribute_values_numbers() {
4864        let a = json!({"N": "10"});
4865        let b = json!({"N": "20"});
4866        assert_eq!(
4867            compare_attribute_values(Some(&a), Some(&b)),
4868            std::cmp::Ordering::Less
4869        );
4870    }
4871
4872    #[test]
4873    fn test_compare_attribute_values_strings() {
4874        let a = json!({"S": "apple"});
4875        let b = json!({"S": "banana"});
4876        assert_eq!(
4877            compare_attribute_values(Some(&a), Some(&b)),
4878            std::cmp::Ordering::Less
4879        );
4880    }
4881
4882    #[test]
4883    fn test_split_on_and() {
4884        let parts = split_on_and("pk = :pk AND sk > :sk");
4885        assert_eq!(parts.len(), 2);
4886        assert_eq!(parts[0].trim(), "pk = :pk");
4887        assert_eq!(parts[1].trim(), "sk > :sk");
4888    }
4889
4890    #[test]
4891    fn test_split_on_and_respects_parentheses() {
4892        // Before fix: split_on_and would split inside the parens
4893        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
4894        // Should NOT split on the AND inside parentheses
4895        assert_eq!(parts.len(), 1);
4896        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
4897    }
4898
4899    #[test]
4900    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
4901        // (a AND b) OR c — should match when c is true but a is false
4902        let mut item = HashMap::new();
4903        item.insert("x".to_string(), json!({"S": "no"}));
4904        item.insert("y".to_string(), json!({"S": "no"}));
4905        item.insert("z".to_string(), json!({"S": "yes"}));
4906
4907        let mut expr_values = HashMap::new();
4908        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
4909
4910        // x=yes AND y=yes => false, but z=yes => true => overall true
4911        let result = evaluate_filter_expression(
4912            "(x = :yes AND y = :yes) OR z = :yes",
4913            &item,
4914            &HashMap::new(),
4915            &expr_values,
4916        );
4917        assert!(result, "should match because z = :yes is true");
4918
4919        // x=yes AND y=yes => false, z=yes => false => overall false
4920        let mut item2 = HashMap::new();
4921        item2.insert("x".to_string(), json!({"S": "no"}));
4922        item2.insert("y".to_string(), json!({"S": "no"}));
4923        item2.insert("z".to_string(), json!({"S": "no"}));
4924
4925        let result2 = evaluate_filter_expression(
4926            "(x = :yes AND y = :yes) OR z = :yes",
4927            &item2,
4928            &HashMap::new(),
4929            &expr_values,
4930        );
4931        assert!(!result2, "should not match because nothing is true");
4932    }
4933
4934    #[test]
4935    fn test_project_item_nested_path() {
4936        // Item with a list attribute containing maps
4937        let mut item = HashMap::new();
4938        item.insert("pk".to_string(), json!({"S": "key1"}));
4939        item.insert(
4940            "data".to_string(),
4941            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
4942        );
4943
4944        let body = json!({
4945            "ProjectionExpression": "data[0].name"
4946        });
4947
4948        let projected = project_item(&item, &body);
4949        // Should contain data[0].name = "Alice", not the entire data[0] element
4950        let name = projected
4951            .get("data")
4952            .and_then(|v| v.get("L"))
4953            .and_then(|v| v.get(0))
4954            .and_then(|v| v.get("M"))
4955            .and_then(|v| v.get("name"))
4956            .and_then(|v| v.get("S"))
4957            .and_then(|v| v.as_str());
4958        assert_eq!(name, Some("Alice"));
4959
4960        // Should NOT contain the "age" field
4961        let age = projected
4962            .get("data")
4963            .and_then(|v| v.get("L"))
4964            .and_then(|v| v.get(0))
4965            .and_then(|v| v.get("M"))
4966            .and_then(|v| v.get("age"));
4967        assert!(age.is_none(), "age should not be present in projection");
4968    }
4969
4970    #[test]
4971    fn test_resolve_nested_path_map() {
4972        let mut item = HashMap::new();
4973        item.insert(
4974            "info".to_string(),
4975            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
4976        );
4977
4978        let result = resolve_nested_path(&item, "info.address.city");
4979        assert_eq!(result, Some(json!({"S": "NYC"})));
4980    }
4981
4982    #[test]
4983    fn test_resolve_nested_path_list_then_map() {
4984        let mut item = HashMap::new();
4985        item.insert(
4986            "items".to_string(),
4987            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
4988        );
4989
4990        let result = resolve_nested_path(&item, "items[0].sku");
4991        assert_eq!(result, Some(json!({"S": "ABC"})));
4992    }
4993
4994    // -- Integration-style tests using DynamoDbService --
4995
4996    use crate::state::SharedDynamoDbState;
4997    use parking_lot::RwLock;
4998    use std::sync::Arc;
4999
5000    fn make_service() -> DynamoDbService {
5001        let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
5002            "123456789012",
5003            "us-east-1",
5004        )));
5005        DynamoDbService::new(state)
5006    }
5007
5008    fn make_request(action: &str, body: Value) -> AwsRequest {
5009        AwsRequest {
5010            service: "dynamodb".to_string(),
5011            action: action.to_string(),
5012            region: "us-east-1".to_string(),
5013            account_id: "123456789012".to_string(),
5014            request_id: "test-id".to_string(),
5015            headers: http::HeaderMap::new(),
5016            query_params: HashMap::new(),
5017            body: serde_json::to_vec(&body).unwrap().into(),
5018            path_segments: vec![],
5019            raw_path: "/".to_string(),
5020            raw_query: String::new(),
5021            method: http::Method::POST,
5022            is_query_protocol: false,
5023            access_key_id: None,
5024        }
5025    }
5026
5027    fn create_test_table(svc: &DynamoDbService) {
5028        let req = make_request(
5029            "CreateTable",
5030            json!({
5031                "TableName": "test-table",
5032                "KeySchema": [
5033                    { "AttributeName": "pk", "KeyType": "HASH" }
5034                ],
5035                "AttributeDefinitions": [
5036                    { "AttributeName": "pk", "AttributeType": "S" }
5037                ],
5038                "BillingMode": "PAY_PER_REQUEST"
5039            }),
5040        );
5041        svc.create_table(&req).unwrap();
5042    }
5043
5044    #[test]
5045    fn delete_item_return_values_all_old() {
5046        let svc = make_service();
5047        create_test_table(&svc);
5048
5049        // Put an item
5050        let req = make_request(
5051            "PutItem",
5052            json!({
5053                "TableName": "test-table",
5054                "Item": {
5055                    "pk": { "S": "key1" },
5056                    "name": { "S": "Alice" },
5057                    "age": { "N": "30" }
5058                }
5059            }),
5060        );
5061        svc.put_item(&req).unwrap();
5062
5063        // Delete with ReturnValues=ALL_OLD
5064        let req = make_request(
5065            "DeleteItem",
5066            json!({
5067                "TableName": "test-table",
5068                "Key": { "pk": { "S": "key1" } },
5069                "ReturnValues": "ALL_OLD"
5070            }),
5071        );
5072        let resp = svc.delete_item(&req).unwrap();
5073        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5074
5075        // Verify the old item is returned
5076        let attrs = &body["Attributes"];
5077        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
5078        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
5079        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
5080
5081        // Verify the item is actually deleted
5082        let req = make_request(
5083            "GetItem",
5084            json!({
5085                "TableName": "test-table",
5086                "Key": { "pk": { "S": "key1" } }
5087            }),
5088        );
5089        let resp = svc.get_item(&req).unwrap();
5090        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5091        assert!(body.get("Item").is_none(), "item should be deleted");
5092    }
5093
5094    #[test]
5095    fn transact_get_items_returns_existing_and_missing() {
5096        let svc = make_service();
5097        create_test_table(&svc);
5098
5099        // Put one item
5100        let req = make_request(
5101            "PutItem",
5102            json!({
5103                "TableName": "test-table",
5104                "Item": {
5105                    "pk": { "S": "exists" },
5106                    "val": { "S": "hello" }
5107                }
5108            }),
5109        );
5110        svc.put_item(&req).unwrap();
5111
5112        let req = make_request(
5113            "TransactGetItems",
5114            json!({
5115                "TransactItems": [
5116                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
5117                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
5118                ]
5119            }),
5120        );
5121        let resp = svc.transact_get_items(&req).unwrap();
5122        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5123        let responses = body["Responses"].as_array().unwrap();
5124        assert_eq!(responses.len(), 2);
5125        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
5126        assert!(responses[1].get("Item").is_none());
5127    }
5128
5129    #[test]
5130    fn transact_write_items_put_and_delete() {
5131        let svc = make_service();
5132        create_test_table(&svc);
5133
5134        // Put initial item
5135        let req = make_request(
5136            "PutItem",
5137            json!({
5138                "TableName": "test-table",
5139                "Item": {
5140                    "pk": { "S": "to-delete" },
5141                    "val": { "S": "bye" }
5142                }
5143            }),
5144        );
5145        svc.put_item(&req).unwrap();
5146
5147        // TransactWrite: put new + delete existing
5148        let req = make_request(
5149            "TransactWriteItems",
5150            json!({
5151                "TransactItems": [
5152                    {
5153                        "Put": {
5154                            "TableName": "test-table",
5155                            "Item": {
5156                                "pk": { "S": "new-item" },
5157                                "val": { "S": "hi" }
5158                            }
5159                        }
5160                    },
5161                    {
5162                        "Delete": {
5163                            "TableName": "test-table",
5164                            "Key": { "pk": { "S": "to-delete" } }
5165                        }
5166                    }
5167                ]
5168            }),
5169        );
5170        let resp = svc.transact_write_items(&req).unwrap();
5171        assert_eq!(resp.status, StatusCode::OK);
5172
5173        // Verify new item exists
5174        let req = make_request(
5175            "GetItem",
5176            json!({
5177                "TableName": "test-table",
5178                "Key": { "pk": { "S": "new-item" } }
5179            }),
5180        );
5181        let resp = svc.get_item(&req).unwrap();
5182        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5183        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
5184
5185        // Verify deleted item is gone
5186        let req = make_request(
5187            "GetItem",
5188            json!({
5189                "TableName": "test-table",
5190                "Key": { "pk": { "S": "to-delete" } }
5191            }),
5192        );
5193        let resp = svc.get_item(&req).unwrap();
5194        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5195        assert!(body.get("Item").is_none());
5196    }
5197
5198    #[test]
5199    fn transact_write_items_condition_check_failure() {
5200        let svc = make_service();
5201        create_test_table(&svc);
5202
5203        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
5204        let req = make_request(
5205            "TransactWriteItems",
5206            json!({
5207                "TransactItems": [
5208                    {
5209                        "ConditionCheck": {
5210                            "TableName": "test-table",
5211                            "Key": { "pk": { "S": "nonexistent" } },
5212                            "ConditionExpression": "attribute_exists(pk)"
5213                        }
5214                    }
5215                ]
5216            }),
5217        );
5218        let resp = svc.transact_write_items(&req).unwrap();
5219        // Should be a 400 error response
5220        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
5221        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5222        assert_eq!(
5223            body["__type"].as_str().unwrap(),
5224            "TransactionCanceledException"
5225        );
5226        assert!(body["CancellationReasons"].as_array().is_some());
5227    }
5228
5229    #[test]
5230    fn update_and_describe_time_to_live() {
5231        let svc = make_service();
5232        create_test_table(&svc);
5233
5234        // Enable TTL
5235        let req = make_request(
5236            "UpdateTimeToLive",
5237            json!({
5238                "TableName": "test-table",
5239                "TimeToLiveSpecification": {
5240                    "AttributeName": "ttl",
5241                    "Enabled": true
5242                }
5243            }),
5244        );
5245        let resp = svc.update_time_to_live(&req).unwrap();
5246        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5247        assert_eq!(
5248            body["TimeToLiveSpecification"]["AttributeName"]
5249                .as_str()
5250                .unwrap(),
5251            "ttl"
5252        );
5253        assert!(body["TimeToLiveSpecification"]["Enabled"]
5254            .as_bool()
5255            .unwrap());
5256
5257        // Describe TTL
5258        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5259        let resp = svc.describe_time_to_live(&req).unwrap();
5260        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5261        assert_eq!(
5262            body["TimeToLiveDescription"]["TimeToLiveStatus"]
5263                .as_str()
5264                .unwrap(),
5265            "ENABLED"
5266        );
5267        assert_eq!(
5268            body["TimeToLiveDescription"]["AttributeName"]
5269                .as_str()
5270                .unwrap(),
5271            "ttl"
5272        );
5273
5274        // Disable TTL
5275        let req = make_request(
5276            "UpdateTimeToLive",
5277            json!({
5278                "TableName": "test-table",
5279                "TimeToLiveSpecification": {
5280                    "AttributeName": "ttl",
5281                    "Enabled": false
5282                }
5283            }),
5284        );
5285        svc.update_time_to_live(&req).unwrap();
5286
5287        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5288        let resp = svc.describe_time_to_live(&req).unwrap();
5289        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5290        assert_eq!(
5291            body["TimeToLiveDescription"]["TimeToLiveStatus"]
5292                .as_str()
5293                .unwrap(),
5294            "DISABLED"
5295        );
5296    }
5297
5298    #[test]
5299    fn resource_policy_lifecycle() {
5300        let svc = make_service();
5301        create_test_table(&svc);
5302
5303        let table_arn = {
5304            let state = svc.state.read();
5305            state.tables.get("test-table").unwrap().arn.clone()
5306        };
5307
5308        // Put policy
5309        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
5310        let req = make_request(
5311            "PutResourcePolicy",
5312            json!({
5313                "ResourceArn": table_arn,
5314                "Policy": policy_doc
5315            }),
5316        );
5317        let resp = svc.put_resource_policy(&req).unwrap();
5318        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5319        assert!(body["RevisionId"].as_str().is_some());
5320
5321        // Get policy
5322        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5323        let resp = svc.get_resource_policy(&req).unwrap();
5324        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5325        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
5326
5327        // Delete policy
5328        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
5329        svc.delete_resource_policy(&req).unwrap();
5330
5331        // Get should return null now
5332        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5333        let resp = svc.get_resource_policy(&req).unwrap();
5334        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5335        assert!(body["Policy"].is_null());
5336    }
5337
5338    #[test]
5339    fn describe_endpoints() {
5340        let svc = make_service();
5341        let req = make_request("DescribeEndpoints", json!({}));
5342        let resp = svc.describe_endpoints(&req).unwrap();
5343        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5344        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
5345    }
5346
5347    #[test]
5348    fn describe_limits() {
5349        let svc = make_service();
5350        let req = make_request("DescribeLimits", json!({}));
5351        let resp = svc.describe_limits(&req).unwrap();
5352        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5353        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
5354    }
5355
5356    #[test]
5357    fn backup_lifecycle() {
5358        let svc = make_service();
5359        create_test_table(&svc);
5360
5361        // Create backup
5362        let req = make_request(
5363            "CreateBackup",
5364            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
5365        );
5366        let resp = svc.create_backup(&req).unwrap();
5367        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5368        let backup_arn = body["BackupDetails"]["BackupArn"]
5369            .as_str()
5370            .unwrap()
5371            .to_string();
5372        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
5373
5374        // Describe backup
5375        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
5376        let resp = svc.describe_backup(&req).unwrap();
5377        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5378        assert_eq!(
5379            body["BackupDescription"]["BackupDetails"]["BackupName"],
5380            "my-backup"
5381        );
5382
5383        // List backups
5384        let req = make_request("ListBackups", json!({}));
5385        let resp = svc.list_backups(&req).unwrap();
5386        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5387        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
5388
5389        // Restore from backup
5390        let req = make_request(
5391            "RestoreTableFromBackup",
5392            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
5393        );
5394        svc.restore_table_from_backup(&req).unwrap();
5395
5396        // Verify restored table exists
5397        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
5398        let resp = svc.describe_table(&req).unwrap();
5399        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5400        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5401
5402        // Delete backup
5403        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
5404        svc.delete_backup(&req).unwrap();
5405
5406        // List should be empty
5407        let req = make_request("ListBackups", json!({}));
5408        let resp = svc.list_backups(&req).unwrap();
5409        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5410        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
5411    }
5412
5413    #[test]
5414    fn continuous_backups() {
5415        let svc = make_service();
5416        create_test_table(&svc);
5417
5418        // Initially disabled
5419        let req = make_request(
5420            "DescribeContinuousBackups",
5421            json!({ "TableName": "test-table" }),
5422        );
5423        let resp = svc.describe_continuous_backups(&req).unwrap();
5424        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5425        assert_eq!(
5426            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5427                ["PointInTimeRecoveryStatus"],
5428            "DISABLED"
5429        );
5430
5431        // Enable
5432        let req = make_request(
5433            "UpdateContinuousBackups",
5434            json!({
5435                "TableName": "test-table",
5436                "PointInTimeRecoverySpecification": {
5437                    "PointInTimeRecoveryEnabled": true
5438                }
5439            }),
5440        );
5441        svc.update_continuous_backups(&req).unwrap();
5442
5443        // Verify
5444        let req = make_request(
5445            "DescribeContinuousBackups",
5446            json!({ "TableName": "test-table" }),
5447        );
5448        let resp = svc.describe_continuous_backups(&req).unwrap();
5449        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5450        assert_eq!(
5451            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5452                ["PointInTimeRecoveryStatus"],
5453            "ENABLED"
5454        );
5455    }
5456
5457    #[test]
5458    fn restore_table_to_point_in_time() {
5459        let svc = make_service();
5460        create_test_table(&svc);
5461
5462        let req = make_request(
5463            "RestoreTableToPointInTime",
5464            json!({
5465                "SourceTableName": "test-table",
5466                "TargetTableName": "pitr-restored"
5467            }),
5468        );
5469        svc.restore_table_to_point_in_time(&req).unwrap();
5470
5471        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
5472        let resp = svc.describe_table(&req).unwrap();
5473        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5474        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5475    }
5476
5477    #[test]
5478    fn global_table_lifecycle() {
5479        let svc = make_service();
5480
5481        // Create global table
5482        let req = make_request(
5483            "CreateGlobalTable",
5484            json!({
5485                "GlobalTableName": "my-global",
5486                "ReplicationGroup": [
5487                    { "RegionName": "us-east-1" },
5488                    { "RegionName": "eu-west-1" }
5489                ]
5490            }),
5491        );
5492        let resp = svc.create_global_table(&req).unwrap();
5493        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5494        assert_eq!(
5495            body["GlobalTableDescription"]["GlobalTableStatus"],
5496            "ACTIVE"
5497        );
5498
5499        // Describe
5500        let req = make_request(
5501            "DescribeGlobalTable",
5502            json!({ "GlobalTableName": "my-global" }),
5503        );
5504        let resp = svc.describe_global_table(&req).unwrap();
5505        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5506        assert_eq!(
5507            body["GlobalTableDescription"]["ReplicationGroup"]
5508                .as_array()
5509                .unwrap()
5510                .len(),
5511            2
5512        );
5513
5514        // List
5515        let req = make_request("ListGlobalTables", json!({}));
5516        let resp = svc.list_global_tables(&req).unwrap();
5517        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5518        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
5519
5520        // Update - add a region
5521        let req = make_request(
5522            "UpdateGlobalTable",
5523            json!({
5524                "GlobalTableName": "my-global",
5525                "ReplicaUpdates": [
5526                    { "Create": { "RegionName": "ap-southeast-1" } }
5527                ]
5528            }),
5529        );
5530        let resp = svc.update_global_table(&req).unwrap();
5531        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5532        assert_eq!(
5533            body["GlobalTableDescription"]["ReplicationGroup"]
5534                .as_array()
5535                .unwrap()
5536                .len(),
5537            3
5538        );
5539
5540        // Describe settings
5541        let req = make_request(
5542            "DescribeGlobalTableSettings",
5543            json!({ "GlobalTableName": "my-global" }),
5544        );
5545        let resp = svc.describe_global_table_settings(&req).unwrap();
5546        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5547        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
5548
5549        // Update settings (no-op, just verify no error)
5550        let req = make_request(
5551            "UpdateGlobalTableSettings",
5552            json!({ "GlobalTableName": "my-global" }),
5553        );
5554        svc.update_global_table_settings(&req).unwrap();
5555    }
5556
5557    #[test]
5558    fn table_replica_auto_scaling() {
5559        let svc = make_service();
5560        create_test_table(&svc);
5561
5562        let req = make_request(
5563            "DescribeTableReplicaAutoScaling",
5564            json!({ "TableName": "test-table" }),
5565        );
5566        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
5567        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5568        assert_eq!(
5569            body["TableAutoScalingDescription"]["TableName"],
5570            "test-table"
5571        );
5572
5573        let req = make_request(
5574            "UpdateTableReplicaAutoScaling",
5575            json!({ "TableName": "test-table" }),
5576        );
5577        svc.update_table_replica_auto_scaling(&req).unwrap();
5578    }
5579
5580    #[test]
5581    fn kinesis_streaming_lifecycle() {
5582        let svc = make_service();
5583        create_test_table(&svc);
5584
5585        // Enable
5586        let req = make_request(
5587            "EnableKinesisStreamingDestination",
5588            json!({
5589                "TableName": "test-table",
5590                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5591            }),
5592        );
5593        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
5594        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5595        assert_eq!(body["DestinationStatus"], "ACTIVE");
5596
5597        // Describe
5598        let req = make_request(
5599            "DescribeKinesisStreamingDestination",
5600            json!({ "TableName": "test-table" }),
5601        );
5602        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
5603        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5604        assert_eq!(
5605            body["KinesisDataStreamDestinations"]
5606                .as_array()
5607                .unwrap()
5608                .len(),
5609            1
5610        );
5611
5612        // Update
5613        let req = make_request(
5614            "UpdateKinesisStreamingDestination",
5615            json!({
5616                "TableName": "test-table",
5617                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
5618                "UpdateKinesisStreamingConfiguration": {
5619                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
5620                }
5621            }),
5622        );
5623        svc.update_kinesis_streaming_destination(&req).unwrap();
5624
5625        // Disable
5626        let req = make_request(
5627            "DisableKinesisStreamingDestination",
5628            json!({
5629                "TableName": "test-table",
5630                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5631            }),
5632        );
5633        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
5634        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5635        assert_eq!(body["DestinationStatus"], "DISABLED");
5636    }
5637
5638    #[test]
5639    fn contributor_insights_lifecycle() {
5640        let svc = make_service();
5641        create_test_table(&svc);
5642
5643        // Initially disabled
5644        let req = make_request(
5645            "DescribeContributorInsights",
5646            json!({ "TableName": "test-table" }),
5647        );
5648        let resp = svc.describe_contributor_insights(&req).unwrap();
5649        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5650        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5651
5652        // Enable
5653        let req = make_request(
5654            "UpdateContributorInsights",
5655            json!({
5656                "TableName": "test-table",
5657                "ContributorInsightsAction": "ENABLE"
5658            }),
5659        );
5660        let resp = svc.update_contributor_insights(&req).unwrap();
5661        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5662        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
5663
5664        // List
5665        let req = make_request("ListContributorInsights", json!({}));
5666        let resp = svc.list_contributor_insights(&req).unwrap();
5667        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5668        assert_eq!(
5669            body["ContributorInsightsSummaries"]
5670                .as_array()
5671                .unwrap()
5672                .len(),
5673            1
5674        );
5675    }
5676
5677    #[test]
5678    fn export_lifecycle() {
5679        let svc = make_service();
5680        create_test_table(&svc);
5681
5682        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
5683
5684        // Export
5685        let req = make_request(
5686            "ExportTableToPointInTime",
5687            json!({
5688                "TableArn": table_arn,
5689                "S3Bucket": "my-bucket"
5690            }),
5691        );
5692        let resp = svc.export_table_to_point_in_time(&req).unwrap();
5693        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5694        let export_arn = body["ExportDescription"]["ExportArn"]
5695            .as_str()
5696            .unwrap()
5697            .to_string();
5698        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
5699
5700        // Describe
5701        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
5702        let resp = svc.describe_export(&req).unwrap();
5703        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5704        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
5705
5706        // List
5707        let req = make_request("ListExports", json!({}));
5708        let resp = svc.list_exports(&req).unwrap();
5709        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5710        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
5711    }
5712
5713    #[test]
5714    fn import_lifecycle() {
5715        let svc = make_service();
5716
5717        let req = make_request(
5718            "ImportTable",
5719            json!({
5720                "InputFormat": "DYNAMODB_JSON",
5721                "S3BucketSource": { "S3Bucket": "import-bucket" },
5722                "TableCreationParameters": {
5723                    "TableName": "imported-table",
5724                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
5725                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
5726                }
5727            }),
5728        );
5729        let resp = svc.import_table(&req).unwrap();
5730        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5731        let import_arn = body["ImportTableDescription"]["ImportArn"]
5732            .as_str()
5733            .unwrap()
5734            .to_string();
5735        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5736
5737        // Describe import
5738        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
5739        let resp = svc.describe_import(&req).unwrap();
5740        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5741        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5742
5743        // List imports
5744        let req = make_request("ListImports", json!({}));
5745        let resp = svc.list_imports(&req).unwrap();
5746        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5747        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
5748
5749        // Verify the table was created
5750        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
5751        let resp = svc.describe_table(&req).unwrap();
5752        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5753        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5754    }
5755
5756    #[test]
5757    fn backup_restore_preserves_items() {
5758        let svc = make_service();
5759        create_test_table(&svc);
5760
5761        // Put 3 items
5762        for i in 1..=3 {
5763            let req = make_request(
5764                "PutItem",
5765                json!({
5766                    "TableName": "test-table",
5767                    "Item": {
5768                        "pk": { "S": format!("key{i}") },
5769                        "data": { "S": format!("value{i}") }
5770                    }
5771                }),
5772            );
5773            svc.put_item(&req).unwrap();
5774        }
5775
5776        // Create backup
5777        let req = make_request(
5778            "CreateBackup",
5779            json!({
5780                "TableName": "test-table",
5781                "BackupName": "my-backup"
5782            }),
5783        );
5784        let resp = svc.create_backup(&req).unwrap();
5785        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5786        let backup_arn = body["BackupDetails"]["BackupArn"]
5787            .as_str()
5788            .unwrap()
5789            .to_string();
5790
5791        // Delete all items from the original table
5792        for i in 1..=3 {
5793            let req = make_request(
5794                "DeleteItem",
5795                json!({
5796                    "TableName": "test-table",
5797                    "Key": { "pk": { "S": format!("key{i}") } }
5798                }),
5799            );
5800            svc.delete_item(&req).unwrap();
5801        }
5802
5803        // Verify original table is empty
5804        let req = make_request("Scan", json!({ "TableName": "test-table" }));
5805        let resp = svc.scan(&req).unwrap();
5806        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5807        assert_eq!(body["Count"], 0);
5808
5809        // Restore from backup
5810        let req = make_request(
5811            "RestoreTableFromBackup",
5812            json!({
5813                "BackupArn": backup_arn,
5814                "TargetTableName": "restored-table"
5815            }),
5816        );
5817        svc.restore_table_from_backup(&req).unwrap();
5818
5819        // Scan restored table — should have 3 items
5820        let req = make_request("Scan", json!({ "TableName": "restored-table" }));
5821        let resp = svc.scan(&req).unwrap();
5822        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5823        assert_eq!(body["Count"], 3);
5824        assert_eq!(body["Items"].as_array().unwrap().len(), 3);
5825    }
5826
5827    #[test]
5828    fn global_table_replicates_writes() {
5829        let svc = make_service();
5830        create_test_table(&svc);
5831
5832        // Create global table with replicas
5833        let req = make_request(
5834            "CreateGlobalTable",
5835            json!({
5836                "GlobalTableName": "test-table",
5837                "ReplicationGroup": [
5838                    { "RegionName": "us-east-1" },
5839                    { "RegionName": "eu-west-1" }
5840                ]
5841            }),
5842        );
5843        let resp = svc.create_global_table(&req).unwrap();
5844        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5845        assert_eq!(
5846            body["GlobalTableDescription"]["GlobalTableStatus"],
5847            "ACTIVE"
5848        );
5849
5850        // Put an item
5851        let req = make_request(
5852            "PutItem",
5853            json!({
5854                "TableName": "test-table",
5855                "Item": {
5856                    "pk": { "S": "replicated-key" },
5857                    "data": { "S": "replicated-value" }
5858                }
5859            }),
5860        );
5861        svc.put_item(&req).unwrap();
5862
5863        // Verify the item is readable (since all replicas share the same table)
5864        let req = make_request(
5865            "GetItem",
5866            json!({
5867                "TableName": "test-table",
5868                "Key": { "pk": { "S": "replicated-key" } }
5869            }),
5870        );
5871        let resp = svc.get_item(&req).unwrap();
5872        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5873        assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
5874        assert_eq!(body["Item"]["data"]["S"], "replicated-value");
5875    }
5876
5877    #[test]
5878    fn contributor_insights_tracks_access() {
5879        let svc = make_service();
5880        create_test_table(&svc);
5881
5882        // Enable contributor insights
5883        let req = make_request(
5884            "UpdateContributorInsights",
5885            json!({
5886                "TableName": "test-table",
5887                "ContributorInsightsAction": "ENABLE"
5888            }),
5889        );
5890        svc.update_contributor_insights(&req).unwrap();
5891
5892        // Put items with different partition keys
5893        for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
5894            let req = make_request(
5895                "PutItem",
5896                json!({
5897                    "TableName": "test-table",
5898                    "Item": {
5899                        "pk": { "S": key },
5900                        "data": { "S": "value" }
5901                    }
5902                }),
5903            );
5904            svc.put_item(&req).unwrap();
5905        }
5906
5907        // Get items (to also track read access)
5908        for _ in 0..3 {
5909            let req = make_request(
5910                "GetItem",
5911                json!({
5912                    "TableName": "test-table",
5913                    "Key": { "pk": { "S": "alpha" } }
5914                }),
5915            );
5916            svc.get_item(&req).unwrap();
5917        }
5918
5919        // Describe contributor insights — should show top contributors
5920        let req = make_request(
5921            "DescribeContributorInsights",
5922            json!({ "TableName": "test-table" }),
5923        );
5924        let resp = svc.describe_contributor_insights(&req).unwrap();
5925        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5926        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
5927
5928        let contributors = body["TopContributors"].as_array().unwrap();
5929        assert!(
5930            !contributors.is_empty(),
5931            "TopContributors should not be empty"
5932        );
5933
5934        // alpha was accessed 3 (put) + 3 (get) = 6 times, beta 2 times
5935        // alpha should be the top contributor
5936        let top = &contributors[0];
5937        assert!(top["Count"].as_u64().unwrap() > 0);
5938
5939        // Verify the rule list is populated
5940        let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
5941        assert!(!rules.is_empty());
5942    }
5943
5944    #[test]
5945    fn contributor_insights_not_tracked_when_disabled() {
5946        let svc = make_service();
5947        create_test_table(&svc);
5948
5949        // Put items without enabling insights
5950        let req = make_request(
5951            "PutItem",
5952            json!({
5953                "TableName": "test-table",
5954                "Item": {
5955                    "pk": { "S": "key1" },
5956                    "data": { "S": "value" }
5957                }
5958            }),
5959        );
5960        svc.put_item(&req).unwrap();
5961
5962        // Describe — should show empty contributors
5963        let req = make_request(
5964            "DescribeContributorInsights",
5965            json!({ "TableName": "test-table" }),
5966        );
5967        let resp = svc.describe_contributor_insights(&req).unwrap();
5968        let body: Value = serde_json::from_slice(&resp.body).unwrap();
5969        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5970
5971        let contributors = body["TopContributors"].as_array().unwrap();
5972        assert!(contributors.is_empty());
5973    }
5974
5975    #[test]
5976    fn contributor_insights_disabled_table_no_counters_after_scan() {
5977        let svc = make_service();
5978        create_test_table(&svc);
5979
5980        // Put items
5981        for key in &["alpha", "beta"] {
5982            let req = make_request(
5983                "PutItem",
5984                json!({
5985                    "TableName": "test-table",
5986                    "Item": { "pk": { "S": key } }
5987                }),
5988            );
5989            svc.put_item(&req).unwrap();
5990        }
5991
5992        // Enable insights, then scan, then disable, then check counters are cleared
5993        let req = make_request(
5994            "UpdateContributorInsights",
5995            json!({
5996                "TableName": "test-table",
5997                "ContributorInsightsAction": "ENABLE"
5998            }),
5999        );
6000        svc.update_contributor_insights(&req).unwrap();
6001
6002        // Scan to trigger counter collection
6003        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6004        svc.scan(&req).unwrap();
6005
6006        // Verify counters were collected
6007        let req = make_request(
6008            "DescribeContributorInsights",
6009            json!({ "TableName": "test-table" }),
6010        );
6011        let resp = svc.describe_contributor_insights(&req).unwrap();
6012        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6013        let contributors = body["TopContributors"].as_array().unwrap();
6014        assert!(
6015            !contributors.is_empty(),
6016            "counters should be non-empty while enabled"
6017        );
6018
6019        // Disable insights (this clears counters)
6020        let req = make_request(
6021            "UpdateContributorInsights",
6022            json!({
6023                "TableName": "test-table",
6024                "ContributorInsightsAction": "DISABLE"
6025            }),
6026        );
6027        svc.update_contributor_insights(&req).unwrap();
6028
6029        // Scan again -- should NOT accumulate counters since insights is disabled
6030        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6031        svc.scan(&req).unwrap();
6032
6033        // Verify counters are still empty
6034        let req = make_request(
6035            "DescribeContributorInsights",
6036            json!({ "TableName": "test-table" }),
6037        );
6038        let resp = svc.describe_contributor_insights(&req).unwrap();
6039        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6040        let contributors = body["TopContributors"].as_array().unwrap();
6041        assert!(
6042            contributors.is_empty(),
6043            "counters should be empty after disabling insights"
6044        );
6045    }
6046
6047    #[test]
6048    fn scan_pagination_with_limit() {
6049        let svc = make_service();
6050        create_test_table(&svc);
6051
6052        // Insert 5 items
6053        for i in 0..5 {
6054            let req = make_request(
6055                "PutItem",
6056                json!({
6057                    "TableName": "test-table",
6058                    "Item": {
6059                        "pk": { "S": format!("item{i}") },
6060                        "data": { "S": format!("value{i}") }
6061                    }
6062                }),
6063            );
6064            svc.put_item(&req).unwrap();
6065        }
6066
6067        // Scan with limit=2
6068        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
6069        let resp = svc.scan(&req).unwrap();
6070        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6071        assert_eq!(body["Count"], 2);
6072        assert!(
6073            body["LastEvaluatedKey"].is_object(),
6074            "should have LastEvaluatedKey when limit < total items"
6075        );
6076        assert!(body["LastEvaluatedKey"]["pk"].is_object());
6077
6078        // Page through all items
6079        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6080        let mut lek = body["LastEvaluatedKey"].clone();
6081
6082        while lek.is_object() {
6083            let req = make_request(
6084                "Scan",
6085                json!({
6086                    "TableName": "test-table",
6087                    "Limit": 2,
6088                    "ExclusiveStartKey": lek
6089                }),
6090            );
6091            let resp = svc.scan(&req).unwrap();
6092            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6093            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6094            lek = body["LastEvaluatedKey"].clone();
6095        }
6096
6097        assert_eq!(
6098            all_items.len(),
6099            5,
6100            "should retrieve all 5 items via pagination"
6101        );
6102    }
6103
6104    #[test]
6105    fn scan_no_pagination_when_all_fit() {
6106        let svc = make_service();
6107        create_test_table(&svc);
6108
6109        for i in 0..3 {
6110            let req = make_request(
6111                "PutItem",
6112                json!({
6113                    "TableName": "test-table",
6114                    "Item": {
6115                        "pk": { "S": format!("item{i}") }
6116                    }
6117                }),
6118            );
6119            svc.put_item(&req).unwrap();
6120        }
6121
6122        // Scan with limit > item count
6123        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
6124        let resp = svc.scan(&req).unwrap();
6125        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6126        assert_eq!(body["Count"], 3);
6127        assert!(
6128            body["LastEvaluatedKey"].is_null(),
6129            "should not have LastEvaluatedKey when all items fit"
6130        );
6131
6132        // Scan without limit
6133        let req = make_request("Scan", json!({ "TableName": "test-table" }));
6134        let resp = svc.scan(&req).unwrap();
6135        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6136        assert_eq!(body["Count"], 3);
6137        assert!(body["LastEvaluatedKey"].is_null());
6138    }
6139
6140    fn create_composite_table(svc: &DynamoDbService) {
6141        let req = make_request(
6142            "CreateTable",
6143            json!({
6144                "TableName": "composite-table",
6145                "KeySchema": [
6146                    { "AttributeName": "pk", "KeyType": "HASH" },
6147                    { "AttributeName": "sk", "KeyType": "RANGE" }
6148                ],
6149                "AttributeDefinitions": [
6150                    { "AttributeName": "pk", "AttributeType": "S" },
6151                    { "AttributeName": "sk", "AttributeType": "S" }
6152                ],
6153                "BillingMode": "PAY_PER_REQUEST"
6154            }),
6155        );
6156        svc.create_table(&req).unwrap();
6157    }
6158
6159    #[test]
6160    fn query_pagination_with_composite_key() {
6161        let svc = make_service();
6162        create_composite_table(&svc);
6163
6164        // Insert 5 items under the same partition key
6165        for i in 0..5 {
6166            let req = make_request(
6167                "PutItem",
6168                json!({
6169                    "TableName": "composite-table",
6170                    "Item": {
6171                        "pk": { "S": "user1" },
6172                        "sk": { "S": format!("item{i:03}") },
6173                        "data": { "S": format!("value{i}") }
6174                    }
6175                }),
6176            );
6177            svc.put_item(&req).unwrap();
6178        }
6179
6180        // Query with limit=2
6181        let req = make_request(
6182            "Query",
6183            json!({
6184                "TableName": "composite-table",
6185                "KeyConditionExpression": "pk = :pk",
6186                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6187                "Limit": 2
6188            }),
6189        );
6190        let resp = svc.query(&req).unwrap();
6191        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6192        assert_eq!(body["Count"], 2);
6193        assert!(body["LastEvaluatedKey"].is_object());
6194        assert!(body["LastEvaluatedKey"]["pk"].is_object());
6195        assert!(body["LastEvaluatedKey"]["sk"].is_object());
6196
6197        // Page through all items
6198        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6199        let mut lek = body["LastEvaluatedKey"].clone();
6200
6201        while lek.is_object() {
6202            let req = make_request(
6203                "Query",
6204                json!({
6205                    "TableName": "composite-table",
6206                    "KeyConditionExpression": "pk = :pk",
6207                    "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6208                    "Limit": 2,
6209                    "ExclusiveStartKey": lek
6210                }),
6211            );
6212            let resp = svc.query(&req).unwrap();
6213            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6214            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6215            lek = body["LastEvaluatedKey"].clone();
6216        }
6217
6218        assert_eq!(
6219            all_items.len(),
6220            5,
6221            "should retrieve all 5 items via pagination"
6222        );
6223
6224        // Verify items came back sorted by sort key
6225        let sks: Vec<String> = all_items
6226            .iter()
6227            .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
6228            .collect();
6229        let mut sorted = sks.clone();
6230        sorted.sort();
6231        assert_eq!(sks, sorted, "items should be sorted by sort key");
6232    }
6233
6234    #[test]
6235    fn query_no_pagination_when_all_fit() {
6236        let svc = make_service();
6237        create_composite_table(&svc);
6238
6239        for i in 0..2 {
6240            let req = make_request(
6241                "PutItem",
6242                json!({
6243                    "TableName": "composite-table",
6244                    "Item": {
6245                        "pk": { "S": "user1" },
6246                        "sk": { "S": format!("item{i}") }
6247                    }
6248                }),
6249            );
6250            svc.put_item(&req).unwrap();
6251        }
6252
6253        let req = make_request(
6254            "Query",
6255            json!({
6256                "TableName": "composite-table",
6257                "KeyConditionExpression": "pk = :pk",
6258                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6259                "Limit": 10
6260            }),
6261        );
6262        let resp = svc.query(&req).unwrap();
6263        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6264        assert_eq!(body["Count"], 2);
6265        assert!(
6266            body["LastEvaluatedKey"].is_null(),
6267            "should not have LastEvaluatedKey when all items fit"
6268        );
6269    }
6270
6271    fn create_gsi_table(svc: &DynamoDbService) {
6272        let req = make_request(
6273            "CreateTable",
6274            json!({
6275                "TableName": "gsi-table",
6276                "KeySchema": [
6277                    { "AttributeName": "pk", "KeyType": "HASH" }
6278                ],
6279                "AttributeDefinitions": [
6280                    { "AttributeName": "pk", "AttributeType": "S" },
6281                    { "AttributeName": "gsi_pk", "AttributeType": "S" },
6282                    { "AttributeName": "gsi_sk", "AttributeType": "S" }
6283                ],
6284                "BillingMode": "PAY_PER_REQUEST",
6285                "GlobalSecondaryIndexes": [
6286                    {
6287                        "IndexName": "gsi-index",
6288                        "KeySchema": [
6289                            { "AttributeName": "gsi_pk", "KeyType": "HASH" },
6290                            { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
6291                        ],
6292                        "Projection": { "ProjectionType": "ALL" }
6293                    }
6294                ]
6295            }),
6296        );
6297        svc.create_table(&req).unwrap();
6298    }
6299
6300    #[test]
6301    fn gsi_query_last_evaluated_key_includes_table_pk() {
6302        let svc = make_service();
6303        create_gsi_table(&svc);
6304
6305        // Insert 3 items with the SAME GSI key but different table PKs
6306        for i in 0..3 {
6307            let req = make_request(
6308                "PutItem",
6309                json!({
6310                    "TableName": "gsi-table",
6311                    "Item": {
6312                        "pk": { "S": format!("item{i}") },
6313                        "gsi_pk": { "S": "shared" },
6314                        "gsi_sk": { "S": "sort" }
6315                    }
6316                }),
6317            );
6318            svc.put_item(&req).unwrap();
6319        }
6320
6321        // Query GSI with Limit=1 to trigger pagination
6322        let req = make_request(
6323            "Query",
6324            json!({
6325                "TableName": "gsi-table",
6326                "IndexName": "gsi-index",
6327                "KeyConditionExpression": "gsi_pk = :v",
6328                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6329                "Limit": 1
6330            }),
6331        );
6332        let resp = svc.query(&req).unwrap();
6333        let body: Value = serde_json::from_slice(&resp.body).unwrap();
6334        assert_eq!(body["Count"], 1);
6335        let lek = &body["LastEvaluatedKey"];
6336        assert!(lek.is_object(), "should have LastEvaluatedKey");
6337        // Must contain the index keys
6338        assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
6339        assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
6340        // Must also contain the table PK
6341        assert!(
6342            lek["pk"].is_object(),
6343            "LEK must contain table PK for GSI queries"
6344        );
6345    }
6346
6347    #[test]
6348    fn gsi_query_pagination_returns_all_items() {
6349        let svc = make_service();
6350        create_gsi_table(&svc);
6351
6352        // Insert 4 items with the SAME GSI key but different table PKs
6353        for i in 0..4 {
6354            let req = make_request(
6355                "PutItem",
6356                json!({
6357                    "TableName": "gsi-table",
6358                    "Item": {
6359                        "pk": { "S": format!("item{i:03}") },
6360                        "gsi_pk": { "S": "shared" },
6361                        "gsi_sk": { "S": "sort" }
6362                    }
6363                }),
6364            );
6365            svc.put_item(&req).unwrap();
6366        }
6367
6368        // Paginate through all items with Limit=2
6369        let mut all_pks = Vec::new();
6370        let mut lek: Option<Value> = None;
6371
6372        loop {
6373            let mut query = json!({
6374                "TableName": "gsi-table",
6375                "IndexName": "gsi-index",
6376                "KeyConditionExpression": "gsi_pk = :v",
6377                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6378                "Limit": 2
6379            });
6380            if let Some(ref start_key) = lek {
6381                query["ExclusiveStartKey"] = start_key.clone();
6382            }
6383
6384            let req = make_request("Query", query);
6385            let resp = svc.query(&req).unwrap();
6386            let body: Value = serde_json::from_slice(&resp.body).unwrap();
6387
6388            for item in body["Items"].as_array().unwrap() {
6389                let pk = item["pk"]["S"].as_str().unwrap().to_string();
6390                all_pks.push(pk);
6391            }
6392
6393            if body["LastEvaluatedKey"].is_object() {
6394                lek = Some(body["LastEvaluatedKey"].clone());
6395            } else {
6396                break;
6397            }
6398        }
6399
6400        all_pks.sort();
6401        assert_eq!(
6402            all_pks,
6403            vec!["item000", "item001", "item002", "item003"],
6404            "pagination should return all items without duplicates"
6405        );
6406    }
6407}