Skip to main content

dynoxide/actions/
create_table.rs

1use crate::actions::{TableDescription, build_table_description};
2use crate::errors::{DynoxideError, Result};
3use crate::storage::Storage;
4use crate::streams;
5use crate::types::{
6    AttributeDefinition, GlobalSecondaryIndex, KeySchemaElement, KeyType, LocalSecondaryIndex,
7    Projection, ProjectionType, ProvisionedThroughput,
8};
9use serde::{Deserialize, Serialize};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12/// Internal raw deserialization struct — uses serde_json::Value for fields
13/// that participate in DynamoDB's multi-field constraint validation.
14#[derive(Debug, Default, Deserialize)]
15struct RawRequest {
16    #[serde(rename = "TableName", default)]
17    table_name: Option<String>,
18    #[serde(rename = "KeySchema", default)]
19    key_schema: Option<serde_json::Value>,
20    #[serde(rename = "AttributeDefinitions", default)]
21    attribute_definitions: Option<serde_json::Value>,
22    #[serde(rename = "GlobalSecondaryIndexes", default)]
23    global_secondary_indexes: Option<serde_json::Value>,
24    #[serde(rename = "LocalSecondaryIndexes", default)]
25    local_secondary_indexes: Option<serde_json::Value>,
26    #[serde(rename = "BillingMode", default)]
27    billing_mode: Option<String>,
28    #[serde(rename = "ProvisionedThroughput", default)]
29    provisioned_throughput: Option<serde_json::Value>,
30    #[serde(rename = "StreamSpecification", default)]
31    stream_specification: Option<StreamSpecification>,
32    #[serde(rename = "SSESpecification", default)]
33    sse_specification: Option<crate::types::SseSpecification>,
34    #[serde(rename = "TableClass", default)]
35    table_class: Option<String>,
36    #[serde(rename = "Tags", default)]
37    tags: Option<Vec<crate::types::Tag>>,
38    #[serde(rename = "DeletionProtectionEnabled", default)]
39    deletion_protection_enabled: Option<bool>,
40}
41
42/// Public request type — fully validated, typed fields.
43/// Can be constructed directly (programmatic use) or deserialized from JSON.
44#[derive(Debug, Default)]
45pub struct CreateTableRequest {
46    pub table_name: String,
47    pub key_schema: Vec<KeySchemaElement>,
48    pub attribute_definitions: Vec<AttributeDefinition>,
49    pub global_secondary_indexes: Option<Vec<GlobalSecondaryIndex>>,
50    pub local_secondary_indexes: Option<Vec<LocalSecondaryIndex>>,
51    pub billing_mode: Option<String>,
52    pub provisioned_throughput: Option<ProvisionedThroughput>,
53    pub stream_specification: Option<StreamSpecification>,
54    pub sse_specification: Option<crate::types::SseSpecification>,
55    pub table_class: Option<String>,
56    pub tags: Option<Vec<crate::types::Tag>>,
57    pub deletion_protection_enabled: Option<bool>,
58}
59
60/// Custom Deserialize that does loose JSON parsing first, validates, then builds typed fields.
61/// Validation errors use "VALIDATION:" prefix so server.rs converts them to ValidationException.
62impl<'de> serde::Deserialize<'de> for CreateTableRequest {
63    fn deserialize<D: serde::Deserializer<'de>>(
64        deserializer: D,
65    ) -> std::result::Result<Self, D::Error> {
66        let raw = RawRequest::deserialize(deserializer)?;
67        match validate_raw_and_build(raw) {
68            Ok(req) => Ok(req),
69            Err(msg) => Err(serde::de::Error::custom(format!("VALIDATION:{}", msg))),
70        }
71    }
72}
73
74#[derive(Debug, Default, Deserialize)]
75pub struct StreamSpecification {
76    #[serde(rename = "StreamEnabled", alias = "stream_enabled")]
77    pub stream_enabled: bool,
78    #[serde(rename = "StreamViewType", alias = "stream_view_type", default)]
79    pub stream_view_type: Option<String>,
80}
81
82#[derive(Debug, Default, Serialize)]
83pub struct CreateTableResponse {
84    #[serde(rename = "TableDescription")]
85    pub table_description: TableDescription,
86}
87
88pub fn execute(storage: &Storage, request: CreateTableRequest) -> Result<CreateTableResponse> {
89    // Structural validation (runs for both programmatic and JSON paths)
90    validate_typed_request(&request)?;
91
92    if let Some(ref tc) = request.table_class {
93        if tc != "STANDARD" && tc != "STANDARD_INFREQUENT_ACCESS" {
94            return Err(DynoxideError::ValidationException(format!(
95                "1 validation error detected: Value '{tc}' at 'tableClass' failed to satisfy \
96                 constraint: Member must satisfy enum value set: \
97                 [STANDARD, STANDARD_INFREQUENT_ACCESS]"
98            )));
99        }
100    }
101
102    if storage.table_exists(&request.table_name)? {
103        return Err(DynoxideError::ResourceInUseException(format!(
104            "Table already exists: {}",
105            request.table_name
106        )));
107    }
108
109    let now = SystemTime::now()
110        .duration_since(UNIX_EPOCH)
111        .unwrap_or_default()
112        .as_secs() as i64;
113
114    let key_schema_json = serde_json::to_string(&request.key_schema)
115        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
116    let attr_defs_json = serde_json::to_string(&request.attribute_definitions)
117        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
118    let gsi_json = request
119        .global_secondary_indexes
120        .as_ref()
121        .map(serde_json::to_string)
122        .transpose()
123        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
124    let lsi_json = request
125        .local_secondary_indexes
126        .as_ref()
127        .map(serde_json::to_string)
128        .transpose()
129        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
130    let pt_json = request
131        .provisioned_throughput
132        .as_ref()
133        .map(serde_json::to_string)
134        .transpose()
135        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
136    let sse_json = request
137        .sse_specification
138        .as_ref()
139        .map(serde_json::to_string)
140        .transpose()
141        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
142    let deletion_protection = request.deletion_protection_enabled.unwrap_or(false);
143
144    let billing_mode_str = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
145    storage.insert_table_metadata(&crate::storage::CreateTableMetadata {
146        table_name: &request.table_name,
147        key_schema: &key_schema_json,
148        attribute_definitions: &attr_defs_json,
149        gsi_definitions: gsi_json.as_deref(),
150        lsi_definitions: lsi_json.as_deref(),
151        provisioned_throughput: pt_json.as_deref(),
152        created_at: now,
153        sse_specification: sse_json.as_deref(),
154        table_class: request.table_class.as_deref(),
155        deletion_protection_enabled: deletion_protection,
156        billing_mode: Some(billing_mode_str),
157    })?;
158
159    storage.create_data_table(&request.table_name)?;
160
161    if let Some(ref gsis) = request.global_secondary_indexes {
162        for gsi in gsis {
163            storage.create_gsi_table(&request.table_name, &gsi.index_name)?;
164        }
165    }
166
167    if let Some(ref lsis) = request.local_secondary_indexes {
168        for lsi in lsis {
169            storage.create_lsi_table(&request.table_name, &lsi.index_name)?;
170        }
171    }
172
173    if let Some(ref spec) = request.stream_specification {
174        if spec.stream_enabled {
175            let view_type = spec
176                .stream_view_type
177                .as_deref()
178                .unwrap_or("NEW_AND_OLD_IMAGES");
179            let label = streams::generate_stream_label();
180            storage.enable_stream(&request.table_name, view_type, &label)?;
181        }
182    }
183
184    if let Some(ref tags) = request.tags {
185        if !tags.is_empty() {
186            storage.set_tags(&request.table_name, tags)?;
187        }
188    }
189
190    let meta = storage
191        .get_table_metadata(&request.table_name)?
192        .ok_or_else(|| {
193            DynoxideError::InternalServerError("Table metadata not found after creation".into())
194        })?;
195
196    let mut desc = build_table_description(&meta, Some(0), Some(0));
197    // CreateTable response shows CREATING status (table is usable immediately
198    // but DynamoDB API contract says newly-created tables start as CREATING)
199    desc.table_status = "CREATING".to_string();
200
201    // Override billing mode fields based on the actual request
202    let billing_mode_str = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
203    if billing_mode_str == "PROVISIONED" {
204        desc.billing_mode_summary = None;
205        desc.table_throughput_mode_summary = None;
206    } else if billing_mode_str == "PAY_PER_REQUEST" {
207        desc.billing_mode_summary = Some(crate::actions::BillingModeSummary {
208            billing_mode: "PAY_PER_REQUEST".to_string(),
209            last_update_to_pay_per_request_date_time: None,
210        });
211        desc.table_throughput_mode_summary = Some(crate::actions::TableThroughputModeSummary {
212            table_throughput_mode: "PAY_PER_REQUEST".to_string(),
213            last_update_to_pay_per_request_date_time: None,
214        });
215        // Ensure provisioned throughput shows zeros for PAY_PER_REQUEST
216        desc.provisioned_throughput = Some(crate::actions::TableProvisionedThroughputDescription {
217            read_capacity_units: 0,
218            write_capacity_units: 0,
219            number_of_decreases_today: 0,
220            last_increase_date_time: None,
221            last_decrease_date_time: None,
222        });
223    }
224
225    // Set all GSI statuses to CREATING for newly created tables
226    if let Some(ref mut gsis) = desc.global_secondary_indexes {
227        for gsi in gsis {
228            gsi.index_status = "CREATING".to_string();
229        }
230    }
231
232    // Remove DeletionProtectionEnabled from response if not explicitly set
233    // (DynamoDB doesn't include it in basic CreateTable response)
234    if request.deletion_protection_enabled.is_none() {
235        desc.deletion_protection_enabled = None;
236    }
237
238    Ok(CreateTableResponse {
239        table_description: desc,
240    })
241}
242
243/// Convert a String error to DynoxideError::ValidationException.
244fn ve(msg: String) -> DynoxideError {
245    DynoxideError::ValidationException(msg)
246}
247
248/// Validate a programmatically-constructed request (used when not deserialised from JSON).
249///
250/// The validation order matches DynamoDB's actual behaviour (as verified by the Dynalite
251/// conformance suite):
252///
253/// 1. Table name (missing, length, pattern)
254/// 2. BillingMode + ProvisionedThroughput consistency
255/// 3. ProvisionedThroughput out-of-bounds
256/// 4. Missing ProvisionedThroughput (default PROVISIONED billing)
257/// 5. Key attribute definition checks ("Invalid KeySchema" / detailed missing-attr message)
258/// 6. Key schema structure (duplicate names, wrong types)
259/// 7. Empty LSI/GSI lists
260/// 8. LSI/GSI structural validation (key schema, projections, duplicates, limits)
261/// 9. Cross-index duplicate names
262/// 10. Attribute definition count mismatch
263fn validate_typed_request(request: &CreateTableRequest) -> Result<()> {
264    if request.table_name.is_empty() {
265        return Err(DynoxideError::ValidationException(
266            "The parameter 'TableName' is required but was not present in the request".to_string(),
267        ));
268    }
269    if request.table_name.len() < 3 || request.table_name.len() > 255 {
270        return Err(DynoxideError::ValidationException(
271            "TableName must be at least 3 characters long and at most 255 characters long"
272                .to_string(),
273        ));
274    }
275
276    // Table name pattern
277    if !request
278        .table_name
279        .chars()
280        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
281    {
282        return Err(DynoxideError::ValidationException(format!(
283            "1 validation error detected: Value '{}' at 'tableName' failed to satisfy constraint: \
284             Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+",
285            request.table_name
286        )));
287    }
288
289    // BillingMode + ProvisionedThroughput consistency
290    let billing_mode_str = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
291    if billing_mode_str == "PAY_PER_REQUEST" && request.provisioned_throughput.is_some() {
292        return Err(DynoxideError::ValidationException(
293            "One or more parameter values were invalid: Neither ReadCapacityUnits nor \
294             WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST"
295                .to_string(),
296        ));
297    }
298
299    // ProvisionedThroughput out-of-bounds
300    if let Some(ref pt) = request.provisioned_throughput {
301        const MAX_THROUGHPUT: i64 = 1_000_000_000_000;
302        let rcu = pt.read_capacity_units.unwrap_or(0);
303        let wcu = pt.write_capacity_units.unwrap_or(0);
304        if rcu > MAX_THROUGHPUT {
305            return Err(DynoxideError::ValidationException(format!(
306                "Given value {} for ReadCapacityUnits is out of bounds",
307                rcu
308            )));
309        }
310        if wcu > MAX_THROUGHPUT {
311            return Err(DynoxideError::ValidationException(format!(
312                "Given value {} for WriteCapacityUnits is out of bounds",
313                wcu
314            )));
315        }
316    }
317
318    // Missing ProvisionedThroughput when billing mode is explicitly PROVISIONED.
319    // For the programmatic API, when BillingMode is not specified we default to
320    // PAY_PER_REQUEST for convenience. The HTTP/JSON path (validate_raw_and_build)
321    // applies the stricter DynamoDB default of PROVISIONED.
322    if request.billing_mode.is_some()
323        && billing_mode_str == "PROVISIONED"
324        && request.provisioned_throughput.is_none()
325    {
326        return Err(DynoxideError::ValidationException(
327            "One or more parameter values were invalid: ReadCapacityUnits and \
328             WriteCapacityUnits must both be specified when BillingMode is PROVISIONED"
329                .to_string(),
330        ));
331    }
332
333    // Key attribute definition checks (before key schema structure)
334    validate_key_attrs_in_defs(&request.key_schema, &request.attribute_definitions).map_err(ve)?;
335
336    // Key schema structure
337    validate_key_schema_structure(&request.key_schema).map_err(ve)?;
338
339    // Empty LSI/GSI lists (before structural validation and attr count)
340    if let Some(ref lsis) = request.local_secondary_indexes {
341        if lsis.is_empty() {
342            return Err(ve(
343                "One or more parameter values were invalid: List of LocalSecondaryIndexes is empty"
344                    .to_string(),
345            ));
346        }
347    }
348    if let Some(ref gsis) = request.global_secondary_indexes {
349        if gsis.is_empty() {
350            return Err(ve(
351                "One or more parameter values were invalid: List of GlobalSecondaryIndexes is empty"
352                    .to_string(),
353            ));
354        }
355    }
356
357    // LSI structural validation
358    if let Some(ref lsis) = request.local_secondary_indexes {
359        validate_lsi_list(lsis, &request.key_schema, &request.attribute_definitions).map_err(ve)?;
360    }
361
362    // GSI structural validation
363    if let Some(ref gsis) = request.global_secondary_indexes {
364        let bm = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
365        validate_gsi_list(gsis, &request.attribute_definitions, bm).map_err(ve)?;
366    }
367
368    // Cross-index duplicate names (checked before attr def count)
369    check_cross_index_duplicates(
370        &request.local_secondary_indexes,
371        &request.global_secondary_indexes,
372    )
373    .map_err(ve)?;
374
375    // Attribute definition count (last)
376    validate_attr_def_count(
377        &request.key_schema,
378        &request.attribute_definitions,
379        &request.local_secondary_indexes,
380        &request.global_secondary_indexes,
381    )
382    .map_err(ve)?;
383
384    Ok(())
385}
386
387fn check_cross_index_duplicates(
388    lsis: &Option<Vec<LocalSecondaryIndex>>,
389    gsis: &Option<Vec<GlobalSecondaryIndex>>,
390) -> std::result::Result<(), String> {
391    if let (Some(lsis), Some(gsis)) = (lsis, gsis) {
392        let mut all_names = std::collections::HashSet::new();
393        for lsi in lsis {
394            all_names.insert(&lsi.index_name);
395        }
396        for gsi in gsis {
397            if !all_names.insert(&gsi.index_name) {
398                return Err(format!(
399                    "One or more parameter values were invalid: Duplicate index name: {}",
400                    gsi.index_name
401                ));
402            }
403        }
404    }
405    Ok(())
406}
407
408// ---- Raw JSON validation (for deserialization path) ----
409
410fn validate_raw_and_build(raw: RawRequest) -> std::result::Result<CreateTableRequest, String> {
411    // Missing TableName is a different error format from invalid TableName
412    if raw.table_name.is_none() {
413        return Err(
414            "The parameter 'TableName' is required but was not present in the request".to_string(),
415        );
416    }
417
418    // Use the shared constraint error collector for table name validation.
419    // This produces the correct multi-field constraint format for empty,
420    // too-short, too-long, or invalid-pattern table names.
421    let name_errors = crate::validation::table_name_constraint_errors(raw.table_name.as_deref());
422    if !name_errors.is_empty() {
423        let msg = format!(
424            "{} validation error{} detected: {}",
425            name_errors.len(),
426            if name_errors.len() > 1 { "s" } else { "" },
427            name_errors.join("; ")
428        );
429        return Err(msg);
430    }
431    let table_name = raw.table_name.unwrap();
432
433    let mut errors = Vec::new();
434
435    if let Some(ref bm) = raw.billing_mode {
436        if bm != "PROVISIONED" && bm != "PAY_PER_REQUEST" {
437            errors.push(format!(
438                "Value '{}' at 'billingMode' failed to satisfy constraint: \
439                 Member must satisfy enum value set: [PROVISIONED, PAY_PER_REQUEST]",
440                bm
441            ));
442        }
443    }
444
445    collect_pt_errors(&raw.provisioned_throughput, &mut errors);
446    collect_ks_errors(&raw.key_schema, &mut errors);
447    collect_ad_errors(&raw.attribute_definitions, &mut errors);
448    collect_lsi_errors(&raw.local_secondary_indexes, &mut errors);
449    collect_gsi_errors(&raw.global_secondary_indexes, &mut errors);
450
451    // DynamoDB caps multi-field constraint errors at 10
452    errors.truncate(10);
453
454    if !errors.is_empty() {
455        let prefix = format!(
456            "{} validation error{} detected: ",
457            errors.len(),
458            if errors.len() == 1 { "" } else { "s" }
459        );
460        return Err(format!("{}{}", prefix, errors.join("; ")));
461    }
462
463    // BillingMode + ProvisionedThroughput consistency (HTTP path only)
464    let billing_mode_str = raw.billing_mode.as_deref().unwrap_or("PROVISIONED");
465    if billing_mode_str == "PAY_PER_REQUEST" && raw.provisioned_throughput.is_some() {
466        return Err(
467            "One or more parameter values were invalid: Neither ReadCapacityUnits nor \
468             WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST"
469                .to_string(),
470        );
471    }
472
473    // ProvisionedThroughput out-of-bounds (after multi-field but before struct checks)
474    if let Some(ref pt) = raw.provisioned_throughput {
475        if let Some(obj) = pt.as_object() {
476            let rcu = obj
477                .get("ReadCapacityUnits")
478                .and_then(|v| v.as_i64())
479                .unwrap_or(0);
480            let wcu = obj
481                .get("WriteCapacityUnits")
482                .and_then(|v| v.as_i64())
483                .unwrap_or(0);
484            const MAX_THROUGHPUT: i64 = 1_000_000_000_000;
485            if rcu > MAX_THROUGHPUT {
486                return Err(format!(
487                    "Given value {} for ReadCapacityUnits is out of bounds",
488                    rcu
489                ));
490            }
491            if wcu > MAX_THROUGHPUT {
492                return Err(format!(
493                    "Given value {} for WriteCapacityUnits is out of bounds",
494                    wcu
495                ));
496            }
497        }
498    }
499
500    // Missing ProvisionedThroughput when BillingMode is explicitly PROVISIONED.
501    if raw.billing_mode.as_deref() == Some("PROVISIONED") && raw.provisioned_throughput.is_none() {
502        return Err(
503            "One or more parameter values were invalid: ReadCapacityUnits and \
504             WriteCapacityUnits must both be specified when BillingMode is PROVISIONED"
505                .to_string(),
506        );
507    }
508
509    // Parse typed structures
510    let key_schema: Vec<KeySchemaElement> = raw
511        .key_schema
512        .as_ref()
513        .map(|v| serde_json::from_value(v.clone()))
514        .transpose()
515        .map_err(|e| e.to_string())?
516        .unwrap_or_default();
517    let attribute_definitions: Vec<AttributeDefinition> = raw
518        .attribute_definitions
519        .as_ref()
520        .map(|v| serde_json::from_value(v.clone()))
521        .transpose()
522        .map_err(|e| e.to_string())?
523        .unwrap_or_default();
524    let provisioned_throughput: Option<ProvisionedThroughput> = raw
525        .provisioned_throughput
526        .as_ref()
527        .map(|v| serde_json::from_value(v.clone()))
528        .transpose()
529        .map_err(|e| e.to_string())?;
530    let global_secondary_indexes: Option<Vec<GlobalSecondaryIndex>> = raw
531        .global_secondary_indexes
532        .as_ref()
533        .map(|v| serde_json::from_value(v.clone()))
534        .transpose()
535        .map_err(|e| e.to_string())?;
536    let local_secondary_indexes: Option<Vec<LocalSecondaryIndex>> = raw
537        .local_secondary_indexes
538        .as_ref()
539        .map(|v| serde_json::from_value(v.clone()))
540        .transpose()
541        .map_err(|e| e.to_string())?;
542
543    Ok(CreateTableRequest {
544        table_name,
545        key_schema,
546        attribute_definitions,
547        global_secondary_indexes,
548        local_secondary_indexes,
549        billing_mode: raw.billing_mode,
550        provisioned_throughput,
551        stream_specification: raw.stream_specification,
552        sse_specification: raw.sse_specification,
553        table_class: raw.table_class,
554        tags: raw.tags,
555        deletion_protection_enabled: raw.deletion_protection_enabled,
556    })
557}
558
559// ---- Multi-field constraint error collectors ----
560
561fn collect_pt_errors(pt_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
562    if let Some(v) = pt_val {
563        if let Some(obj) = v.as_object() {
564            let wcu = obj.get("WriteCapacityUnits");
565            let rcu = obj.get("ReadCapacityUnits");
566            if wcu.is_none() || wcu == Some(&serde_json::Value::Null) {
567                errors.push("Value null at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
568            } else if let Some(w) = wcu.and_then(|v| v.as_i64()) {
569                if w < 1 {
570                    errors.push(format!("Value '{}' at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w));
571                }
572            }
573            if rcu.is_none() || rcu == Some(&serde_json::Value::Null) {
574                errors.push("Value null at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
575            } else if let Some(r) = rcu.and_then(|v| v.as_i64()) {
576                if r < 1 {
577                    errors.push(format!("Value '{}' at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r));
578                }
579            }
580        }
581    }
582}
583
584fn collect_ks_errors(ks_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
585    match ks_val {
586        None => {
587            errors.push(
588                "Value null at 'keySchema' failed to satisfy constraint: Member must not be null"
589                    .to_string(),
590            );
591        }
592        Some(v) => {
593            if let Some(arr) = v.as_array() {
594                if arr.is_empty() {
595                    errors.push("Value '[]' at 'keySchema' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string());
596                } else if arr.len() > 2 {
597                    errors.push(format!("Value '{}' at 'keySchema' failed to satisfy constraint: Member must have length less than or equal to 2", v));
598                }
599                for (i, elem) in arr.iter().enumerate().take(10) {
600                    collect_ks_elem_errors(elem, i + 1, errors);
601                }
602            }
603        }
604    }
605}
606
607fn collect_ks_elem_errors(elem: &serde_json::Value, idx: usize, errors: &mut Vec<String>) {
608    if let Some(obj) = elem.as_object() {
609        if !obj.contains_key("AttributeName")
610            || obj.get("AttributeName") == Some(&serde_json::Value::Null)
611        {
612            errors.push(format!("Value null at 'keySchema.{}.member.attributeName' failed to satisfy constraint: Member must not be null", idx));
613        }
614        let kt = obj.get("KeyType");
615        if kt.is_none() || kt == Some(&serde_json::Value::Null) {
616            errors.push(format!("Value null at 'keySchema.{}.member.keyType' failed to satisfy constraint: Member must not be null", idx));
617        } else if let Some(s) = kt.and_then(|v| v.as_str()) {
618            if s != "HASH" && s != "RANGE" {
619                errors.push(format!("Value '{}' at 'keySchema.{}.member.keyType' failed to satisfy constraint: Member must satisfy enum value set: [HASH, RANGE]", s, idx));
620            }
621        }
622    }
623}
624
625fn collect_ad_errors(ad_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
626    match ad_val {
627        None => {
628            errors.push("Value null at 'attributeDefinitions' failed to satisfy constraint: Member must not be null".to_string());
629        }
630        Some(v) => {
631            if let Some(arr) = v.as_array() {
632                for (i, elem) in arr.iter().enumerate() {
633                    if let Some(obj) = elem.as_object() {
634                        if !obj.contains_key("AttributeName")
635                            || obj.get("AttributeName") == Some(&serde_json::Value::Null)
636                        {
637                            errors.push(format!("Value null at 'attributeDefinitions.{}.member.attributeName' failed to satisfy constraint: Member must not be null", i + 1));
638                        }
639                        let at = obj.get("AttributeType");
640                        if at.is_none() || at == Some(&serde_json::Value::Null) {
641                            errors.push(format!("Value null at 'attributeDefinitions.{}.member.attributeType' failed to satisfy constraint: Member must not be null", i + 1));
642                        } else if let Some(s) = at.and_then(|v| v.as_str()) {
643                            if s != "S" && s != "N" && s != "B" {
644                                errors.push(format!("Value '{}' at 'attributeDefinitions.{}.member.attributeType' failed to satisfy constraint: Member must satisfy enum value set: [B, N, S]", s, i + 1));
645                            }
646                        }
647                    }
648                }
649            }
650        }
651    }
652}
653
654fn collect_lsi_errors(lsi_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
655    if let Some(v) = lsi_val {
656        if let Some(arr) = v.as_array() {
657            for (i, elem) in arr.iter().enumerate().take(10) {
658                if let Some(obj) = elem.as_object() {
659                    // Order: indexName, keySchema, projection
660                    if !obj.contains_key("IndexName")
661                        || obj.get("IndexName") == Some(&serde_json::Value::Null)
662                    {
663                        errors.push(format!("Value null at 'localSecondaryIndexes.{}.member.indexName' failed to satisfy constraint: Member must not be null", i + 1));
664                    } else if let Some(name) = obj.get("IndexName").and_then(|v| v.as_str()) {
665                        collect_idx_name_errors(name, "localSecondaryIndexes", i + 1, errors);
666                    }
667                    if !obj.contains_key("KeySchema")
668                        || obj.get("KeySchema") == Some(&serde_json::Value::Null)
669                    {
670                        errors.push(format!("Value null at 'localSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must not be null", i + 1));
671                    } else if let Some(ks) = obj.get("KeySchema").and_then(|v| v.as_array()) {
672                        if ks.is_empty() {
673                            errors.push(format!("Value '[]' at 'localSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must have length greater than or equal to 1", i + 1));
674                        }
675                    }
676                    if !obj.contains_key("Projection")
677                        || obj.get("Projection") == Some(&serde_json::Value::Null)
678                    {
679                        errors.push(format!("Value null at 'localSecondaryIndexes.{}.member.projection' failed to satisfy constraint: Member must not be null", i + 1));
680                    } else if let Some(p) = obj.get("Projection").and_then(|v| v.as_object()) {
681                        collect_proj_errors(p, &format!("localSecondaryIndexes.{}", i + 1), errors);
682                    }
683                }
684            }
685        }
686    }
687}
688
689fn collect_gsi_errors(gsi_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
690    if let Some(v) = gsi_val {
691        if let Some(arr) = v.as_array() {
692            for (i, elem) in arr.iter().enumerate().take(10) {
693                if let Some(obj) = elem.as_object() {
694                    // Order for GSI: keySchema, projection, indexName
695                    if !obj.contains_key("KeySchema")
696                        || obj.get("KeySchema") == Some(&serde_json::Value::Null)
697                    {
698                        errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must not be null", i + 1));
699                    } else if let Some(ks) = obj.get("KeySchema").and_then(|v| v.as_array()) {
700                        if ks.is_empty() {
701                            errors.push(format!("Value '[]' at 'globalSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must have length greater than or equal to 1", i + 1));
702                        }
703                    }
704                    if !obj.contains_key("Projection")
705                        || obj.get("Projection") == Some(&serde_json::Value::Null)
706                    {
707                        errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.projection' failed to satisfy constraint: Member must not be null", i + 1));
708                    } else if let Some(p) = obj.get("Projection").and_then(|v| v.as_object()) {
709                        collect_proj_errors(
710                            p,
711                            &format!("globalSecondaryIndexes.{}", i + 1),
712                            errors,
713                        );
714                    }
715                    if !obj.contains_key("IndexName")
716                        || obj.get("IndexName") == Some(&serde_json::Value::Null)
717                    {
718                        errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.indexName' failed to satisfy constraint: Member must not be null", i + 1));
719                    } else if let Some(name) = obj.get("IndexName").and_then(|v| v.as_str()) {
720                        collect_idx_name_errors(name, "globalSecondaryIndexes", i + 1, errors);
721                    }
722                    // GSI ProvisionedThroughput
723                    if let Some(pt) = obj.get("ProvisionedThroughput").and_then(|v| v.as_object()) {
724                        let wcu = pt.get("WriteCapacityUnits");
725                        let rcu = pt.get("ReadCapacityUnits");
726                        if let Some(w) = wcu.and_then(|v| v.as_i64()) {
727                            if w < 1 {
728                                errors.push(format!("Value '{}' at 'globalSecondaryIndexes.{}.member.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w, i + 1));
729                            }
730                        } else if wcu.is_none() || wcu == Some(&serde_json::Value::Null) {
731                            errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
732                        }
733                        if let Some(r) = rcu.and_then(|v| v.as_i64()) {
734                            if r < 1 {
735                                errors.push(format!("Value '{}' at 'globalSecondaryIndexes.{}.member.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r, i + 1));
736                            }
737                        } else if rcu.is_none() || rcu == Some(&serde_json::Value::Null) {
738                            errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
739                        }
740                    }
741                }
742            }
743        }
744    }
745}
746
747fn collect_idx_name_errors(name: &str, prefix: &str, idx: usize, errors: &mut Vec<String>) {
748    if !name
749        .chars()
750        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
751    {
752        errors.push(format!("Value '{}' at '{}.{}.member.indexName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", name, prefix, idx));
753    }
754    if name.len() < 3 {
755        errors.push(format!("Value '{}' at '{}.{}.member.indexName' failed to satisfy constraint: Member must have length greater than or equal to 3", name, prefix, idx));
756    }
757    if name.len() > 255 {
758        errors.push(format!("Value '{}' at '{}.{}.member.indexName' failed to satisfy constraint: Member must have length less than or equal to 255", name, prefix, idx));
759    }
760}
761
762fn collect_proj_errors(
763    proj: &serde_json::Map<String, serde_json::Value>,
764    prefix: &str,
765    errors: &mut Vec<String>,
766) {
767    if let Some(pt) = proj.get("ProjectionType") {
768        if let Some(s) = pt.as_str() {
769            if s != "ALL" && s != "KEYS_ONLY" && s != "INCLUDE" {
770                errors.push(format!("Value '{}' at '{}.member.projection.projectionType' failed to satisfy constraint: Member must satisfy enum value set: [ALL, INCLUDE, KEYS_ONLY]", s, prefix));
771            }
772        }
773    }
774    if let Some(nka) = proj.get("NonKeyAttributes") {
775        if let Some(arr) = nka.as_array() {
776            if arr.is_empty() {
777                errors.push(format!("Value '[]' at '{}.member.projection.nonKeyAttributes' failed to satisfy constraint: Member must have length greater than or equal to 1", prefix));
778            }
779        }
780    }
781}
782
783// ---- Structural validation helpers ----
784
785fn validate_key_schema_structure(ks: &[KeySchemaElement]) -> std::result::Result<(), String> {
786    if ks.is_empty() {
787        return Err("1 validation error detected: Value null at 'keySchema' failed to satisfy constraint: Member must have length less than or equal to 2".to_string());
788    }
789    if ks[0].key_type != KeyType::HASH {
790        return Err(
791            "Invalid KeySchema: The first KeySchemaElement is not a HASH key type".to_string(),
792        );
793    }
794    if ks.len() == 2 && ks[0].attribute_name == ks[1].attribute_name {
795        return Err(
796            "Both the Hash Key and the Range Key element in the KeySchema have the same name"
797                .to_string(),
798        );
799    }
800    if ks.len() == 2 && ks[1].key_type != KeyType::RANGE {
801        return Err(
802            "Invalid KeySchema: The second KeySchemaElement is not a RANGE key type".to_string(),
803        );
804    }
805    Ok(())
806}
807
808fn validate_key_attrs_in_defs(
809    ks: &[KeySchemaElement],
810    defs: &[AttributeDefinition],
811) -> std::result::Result<(), String> {
812    // Collect missing key attribute names
813    let missing: Vec<&str> = ks
814        .iter()
815        .filter(|k| !defs.iter().any(|d| d.attribute_name == k.attribute_name))
816        .map(|k| k.attribute_name.as_str())
817        .collect();
818
819    if missing.is_empty() {
820        // Even if no keys are missing, check for structural issues (dup names/types)
821        // which DynamoDB reports as generic "no definition" when defs exist
822        let has_dup_names = ks.len() == 2 && ks[0].attribute_name == ks[1].attribute_name;
823        if has_dup_names {
824            return Err(
825                "Invalid KeySchema: Some index key attribute have no definition".to_string(),
826            );
827        }
828        return Ok(());
829    }
830
831    // Use generic message when:
832    // - defs is empty (fewer defs than unique key attrs)
833    // - key schema has 2 elements (regardless of structural validity)
834    // - key schema has structural issues (dup names, dup types)
835    // Use detailed message only when defs is non-empty AND key schema has 1 element
836    let has_dup_names = ks.len() == 2 && ks[0].attribute_name == ks[1].attribute_name;
837    let has_dup_types = ks.len() == 2 && ks[0].key_type == ks[1].key_type;
838    let use_generic = defs.is_empty() || ks.len() >= 2 || has_dup_names || has_dup_types;
839
840    if use_generic {
841        return Err("Invalid KeySchema: Some index key attribute have no definition".to_string());
842    }
843
844    // Detailed message for single-key schema with non-empty defs
845    let key_names: Vec<&str> = missing.to_vec();
846    let def_names: Vec<&str> = defs.iter().map(|d| d.attribute_name.as_str()).collect();
847    Err(format!(
848        "One or more parameter values were invalid: Some index key attributes are not defined in \
849         AttributeDefinitions. Keys: [{}], AttributeDefinitions: [{}]",
850        key_names.join(", "),
851        def_names.join(", ")
852    ))
853}
854
855fn validate_attr_def_count(
856    ks: &[KeySchemaElement],
857    defs: &[AttributeDefinition],
858    lsis: &Option<Vec<LocalSecondaryIndex>>,
859    gsis: &Option<Vec<GlobalSecondaryIndex>>,
860) -> std::result::Result<(), String> {
861    let mut all_key_attrs = std::collections::HashSet::new();
862    for k in ks {
863        all_key_attrs.insert(k.attribute_name.as_str());
864    }
865    if let Some(lsis) = lsis {
866        for lsi in lsis {
867            for k in &lsi.key_schema {
868                all_key_attrs.insert(k.attribute_name.as_str());
869            }
870        }
871    }
872    if let Some(gsis) = gsis {
873        for gsi in gsis {
874            for k in &gsi.key_schema {
875                all_key_attrs.insert(k.attribute_name.as_str());
876            }
877        }
878    }
879    if defs.len() != all_key_attrs.len() {
880        return Err("One or more parameter values were invalid: Number of attributes in KeySchema does not exactly match number of attributes defined in AttributeDefinitions".to_string());
881    }
882    Ok(())
883}
884
885fn validate_lsi_list(
886    lsis: &[LocalSecondaryIndex],
887    ks: &[KeySchemaElement],
888    defs: &[AttributeDefinition],
889) -> std::result::Result<(), String> {
890    // Empty check is done earlier in validate_typed_request
891
892    if !ks.iter().any(|k| k.key_type == KeyType::RANGE) {
893        return Err("One or more parameter values were invalid: Table KeySchema does not have a range key, which is required when specifying a LocalSecondaryIndex".to_string());
894    }
895
896    // Check missing attribute definitions across all LSI keys
897    let def_names: Vec<&str> = defs.iter().map(|d| d.attribute_name.as_str()).collect();
898    let mut missing_keys = Vec::new();
899    for lsi in lsis {
900        for k in &lsi.key_schema {
901            if !def_names.contains(&k.attribute_name.as_str())
902                && !missing_keys.contains(&k.attribute_name.as_str())
903            {
904                missing_keys.push(k.attribute_name.as_str());
905            }
906        }
907    }
908    if !missing_keys.is_empty() {
909        let mut all_keys = Vec::new();
910        for lsi in lsis {
911            for k in &lsi.key_schema {
912                if !all_keys.contains(&k.attribute_name.as_str()) {
913                    all_keys.push(k.attribute_name.as_str());
914                }
915            }
916        }
917        return Err(format!(
918            "One or more parameter values were invalid: Some index key attributes are not defined in AttributeDefinitions. Keys: [{}], AttributeDefinitions: [{}]",
919            all_keys.join(", "),
920            def_names.join(", ")
921        ));
922    }
923
924    // Structural validation for each LSI
925    for lsi in lsis {
926        validate_lsi_structure(lsi, ks)?;
927    }
928
929    // Duplicate index names
930    let mut seen = std::collections::HashSet::new();
931    for lsi in lsis {
932        if !seen.insert(&lsi.index_name) {
933            return Err(format!(
934                "One or more parameter values were invalid: Duplicate index name: {}",
935                lsi.index_name
936            ));
937        }
938    }
939
940    // Count limit
941    if lsis.len() > 5 {
942        return Err("One or more parameter values were invalid: Number of LocalSecondaryIndexes exceeds per-table limit of 5".to_string());
943    }
944
945    Ok(())
946}
947
948fn validate_gsi_list(
949    gsis: &[GlobalSecondaryIndex],
950    defs: &[AttributeDefinition],
951    bm: &str,
952) -> std::result::Result<(), String> {
953    // Empty check is done earlier in validate_typed_request
954
955    // Check missing attribute definitions across all GSI keys
956    let def_names: Vec<&str> = defs.iter().map(|d| d.attribute_name.as_str()).collect();
957    let mut missing_keys = Vec::new();
958    for gsi in gsis {
959        for k in &gsi.key_schema {
960            if !def_names.contains(&k.attribute_name.as_str())
961                && !missing_keys.contains(&k.attribute_name.as_str())
962            {
963                missing_keys.push(k.attribute_name.as_str());
964            }
965        }
966    }
967    if !missing_keys.is_empty() {
968        let mut all_keys = Vec::new();
969        for gsi in gsis {
970            for k in &gsi.key_schema {
971                if !all_keys.contains(&k.attribute_name.as_str()) {
972                    all_keys.push(k.attribute_name.as_str());
973                }
974            }
975        }
976        return Err(format!(
977            "One or more parameter values were invalid: Some index key attributes are not defined in AttributeDefinitions. Keys: [{}], AttributeDefinitions: [{}]",
978            all_keys.join(", "),
979            def_names.join(", ")
980        ));
981    }
982
983    // Structural validation for each GSI
984    for gsi in gsis {
985        validate_gsi_structure(gsi)?;
986    }
987
988    // Duplicate index names
989    let mut seen = std::collections::HashSet::new();
990    for gsi in gsis {
991        if !seen.insert(&gsi.index_name) {
992            return Err(format!(
993                "One or more parameter values were invalid: Duplicate index name: {}",
994                gsi.index_name
995            ));
996        }
997    }
998
999    // Count limit
1000    if gsis.len() > 20 {
1001        return Err("One or more parameter values were invalid: GlobalSecondaryIndex count exceeds the per-table limit of 20".to_string());
1002    }
1003
1004    // PAY_PER_REQUEST billing mode check
1005    if bm == "PAY_PER_REQUEST" {
1006        for gsi in gsis {
1007            if gsi.provisioned_throughput.is_some() {
1008                return Err(format!(
1009                    "One or more parameter values were invalid: ProvisionedThroughput should not be specified for index: {} when BillingMode is PAY_PER_REQUEST",
1010                    gsi.index_name
1011                ));
1012            }
1013        }
1014    }
1015
1016    Ok(())
1017}
1018
1019fn validate_lsi_structure(
1020    lsi: &LocalSecondaryIndex,
1021    table_ks: &[KeySchemaElement],
1022) -> std::result::Result<(), String> {
1023    // Key schema structure first
1024    validate_key_schema_structure(&lsi.key_schema)?;
1025
1026    // Range key presence (before projection, per DynamoDB ordering)
1027    let lsi_sk = lsi.key_schema.iter().find(|k| k.key_type == KeyType::RANGE);
1028    if lsi_sk.is_none() {
1029        return Err(format!(
1030            "One or more parameter values were invalid: Index KeySchema does not have a range key for index: {}",
1031            lsi.index_name
1032        ));
1033    }
1034
1035    // Hash key must match table hash key (before projection)
1036    let table_pk = table_ks
1037        .iter()
1038        .find(|k| k.key_type == KeyType::HASH)
1039        .map(|k| k.attribute_name.as_str());
1040    let lsi_pk = lsi
1041        .key_schema
1042        .iter()
1043        .find(|k| k.key_type == KeyType::HASH)
1044        .map(|k| k.attribute_name.as_str());
1045    if lsi_pk != table_pk {
1046        return Err(format!(
1047            "One or more parameter values were invalid: \
1048             Index KeySchema does not have the same leading hash key as table KeySchema \
1049             for index: {}. index hash key: {}, table hash key: {}",
1050            lsi.index_name,
1051            lsi_pk.unwrap_or("null"),
1052            table_pk.unwrap_or("null")
1053        ));
1054    }
1055
1056    // Projection (after range key and hash key checks)
1057    validate_proj_structure(&lsi.projection)?;
1058
1059    Ok(())
1060}
1061
1062fn validate_gsi_structure(gsi: &GlobalSecondaryIndex) -> std::result::Result<(), String> {
1063    validate_key_schema_structure(&gsi.key_schema)?;
1064    validate_proj_structure(&gsi.projection)?;
1065    Ok(())
1066}
1067
1068fn validate_proj_structure(p: &Projection) -> std::result::Result<(), String> {
1069    match &p.projection_type {
1070        None => Err(
1071            "One or more parameter values were invalid: Unknown ProjectionType: null".to_string(),
1072        ),
1073        Some(pt) => {
1074            if let Some(ref nka) = p.non_key_attributes {
1075                match pt {
1076                    ProjectionType::ALL => return Err("One or more parameter values were invalid: ProjectionType is ALL, but NonKeyAttributes is specified".to_string()),
1077                    ProjectionType::KEYS_ONLY => return Err("One or more parameter values were invalid: ProjectionType is KEYS_ONLY, but NonKeyAttributes is specified".to_string()),
1078                    ProjectionType::INCLUDE => { if nka.is_empty() { return Err("One or more parameter values were invalid: NonKeyAttributes must not be empty".to_string()); } }
1079                }
1080            }
1081            Ok(())
1082        }
1083    }
1084}