Skip to main content

dynoxide/actions/
create_table.rs

1use crate::actions::{TableDescription, build_table_description};
2use crate::errors::{DynoxideError, Result};
3use crate::storage::Storage;
4use crate::streams;
5use crate::types::{
6    AttributeDefinition, GlobalSecondaryIndex, KeySchemaElement, KeyType, LocalSecondaryIndex,
7    Projection, ProjectionType, ProvisionedThroughput,
8};
9use serde::{Deserialize, Serialize};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12/// Internal raw deserialization struct — uses serde_json::Value for fields
13/// that participate in DynamoDB's multi-field constraint validation.
14#[derive(Debug, Default, Deserialize)]
15struct RawRequest {
16    #[serde(rename = "TableName", default)]
17    table_name: Option<String>,
18    #[serde(rename = "KeySchema", default)]
19    key_schema: Option<serde_json::Value>,
20    #[serde(rename = "AttributeDefinitions", default)]
21    attribute_definitions: Option<serde_json::Value>,
22    #[serde(rename = "GlobalSecondaryIndexes", default)]
23    global_secondary_indexes: Option<serde_json::Value>,
24    #[serde(rename = "LocalSecondaryIndexes", default)]
25    local_secondary_indexes: Option<serde_json::Value>,
26    #[serde(rename = "BillingMode", default)]
27    billing_mode: Option<String>,
28    #[serde(rename = "ProvisionedThroughput", default)]
29    provisioned_throughput: Option<serde_json::Value>,
30    #[serde(rename = "StreamSpecification", default)]
31    stream_specification: Option<StreamSpecification>,
32    #[serde(rename = "SSESpecification", default)]
33    sse_specification: Option<crate::types::SseSpecification>,
34    #[serde(rename = "TableClass", default)]
35    table_class: Option<String>,
36    #[serde(rename = "Tags", default)]
37    tags: Option<Vec<crate::types::Tag>>,
38    #[serde(rename = "DeletionProtectionEnabled", default)]
39    deletion_protection_enabled: Option<bool>,
40}
41
42/// Public request type — fully validated, typed fields.
43/// Can be constructed directly (programmatic use) or deserialized from JSON.
44#[derive(Debug, Default)]
45pub struct CreateTableRequest {
46    pub table_name: String,
47    pub key_schema: Vec<KeySchemaElement>,
48    pub attribute_definitions: Vec<AttributeDefinition>,
49    pub global_secondary_indexes: Option<Vec<GlobalSecondaryIndex>>,
50    pub local_secondary_indexes: Option<Vec<LocalSecondaryIndex>>,
51    pub billing_mode: Option<String>,
52    pub provisioned_throughput: Option<ProvisionedThroughput>,
53    pub stream_specification: Option<StreamSpecification>,
54    pub sse_specification: Option<crate::types::SseSpecification>,
55    pub table_class: Option<String>,
56    pub tags: Option<Vec<crate::types::Tag>>,
57    pub deletion_protection_enabled: Option<bool>,
58}
59
60/// Custom Deserialize that does loose JSON parsing first, validates, then builds typed fields.
61/// Validation errors use "VALIDATION:" prefix so server.rs converts them to ValidationException.
62impl<'de> serde::Deserialize<'de> for CreateTableRequest {
63    fn deserialize<D: serde::Deserializer<'de>>(
64        deserializer: D,
65    ) -> std::result::Result<Self, D::Error> {
66        let raw = RawRequest::deserialize(deserializer)?;
67        match validate_raw_and_build(raw) {
68            Ok(req) => Ok(req),
69            Err(msg) => Err(serde::de::Error::custom(format!("VALIDATION:{}", msg))),
70        }
71    }
72}
73
74#[derive(Debug, Default, Deserialize)]
75pub struct StreamSpecification {
76    #[serde(rename = "StreamEnabled", alias = "stream_enabled")]
77    pub stream_enabled: bool,
78    #[serde(rename = "StreamViewType", alias = "stream_view_type", default)]
79    pub stream_view_type: Option<String>,
80}
81
82#[derive(Debug, Default, Serialize)]
83pub struct CreateTableResponse {
84    #[serde(rename = "TableDescription")]
85    pub table_description: TableDescription,
86}
87
88pub fn execute(storage: &Storage, request: CreateTableRequest) -> Result<CreateTableResponse> {
89    // Structural validation (runs for both programmatic and JSON paths)
90    validate_typed_request(&request)?;
91
92    if let Some(ref tc) = request.table_class {
93        if tc != "STANDARD" && tc != "STANDARD_INFREQUENT_ACCESS" {
94            return Err(DynoxideError::ValidationException(format!(
95                "1 validation error detected: Value '{tc}' at 'tableClass' failed to satisfy \
96                 constraint: Member must satisfy enum value set: \
97                 [STANDARD, STANDARD_INFREQUENT_ACCESS]"
98            )));
99        }
100    }
101
102    if storage.table_exists(&request.table_name)? {
103        return Err(DynoxideError::ResourceInUseException(format!(
104            "Table already exists: {}",
105            request.table_name
106        )));
107    }
108
109    let now = SystemTime::now()
110        .duration_since(UNIX_EPOCH)
111        .unwrap_or_default()
112        .as_secs() as i64;
113
114    let key_schema_json = serde_json::to_string(&request.key_schema)
115        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
116    let attr_defs_json = serde_json::to_string(&request.attribute_definitions)
117        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
118    let gsi_json = request
119        .global_secondary_indexes
120        .as_ref()
121        .map(serde_json::to_string)
122        .transpose()
123        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
124    let lsi_json = request
125        .local_secondary_indexes
126        .as_ref()
127        .map(serde_json::to_string)
128        .transpose()
129        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
130    let pt_json = request
131        .provisioned_throughput
132        .as_ref()
133        .map(serde_json::to_string)
134        .transpose()
135        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
136    let sse_json = request
137        .sse_specification
138        .as_ref()
139        .map(serde_json::to_string)
140        .transpose()
141        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
142    let deletion_protection = request.deletion_protection_enabled.unwrap_or(false);
143
144    let billing_mode_str = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
145    storage.insert_table_metadata(&crate::storage::CreateTableMetadata {
146        table_name: &request.table_name,
147        key_schema: &key_schema_json,
148        attribute_definitions: &attr_defs_json,
149        gsi_definitions: gsi_json.as_deref(),
150        lsi_definitions: lsi_json.as_deref(),
151        provisioned_throughput: pt_json.as_deref(),
152        created_at: now,
153        sse_specification: sse_json.as_deref(),
154        table_class: request.table_class.as_deref(),
155        deletion_protection_enabled: deletion_protection,
156        billing_mode: Some(billing_mode_str),
157    })?;
158
159    storage.create_data_table(&request.table_name)?;
160
161    if let Some(ref gsis) = request.global_secondary_indexes {
162        for gsi in gsis {
163            storage.create_gsi_table(&request.table_name, &gsi.index_name)?;
164        }
165    }
166
167    if let Some(ref lsis) = request.local_secondary_indexes {
168        for lsi in lsis {
169            storage.create_lsi_table(&request.table_name, &lsi.index_name)?;
170        }
171    }
172
173    if let Some(ref spec) = request.stream_specification {
174        if spec.stream_enabled {
175            let view_type = spec
176                .stream_view_type
177                .as_deref()
178                .unwrap_or("NEW_AND_OLD_IMAGES");
179            let label = streams::generate_stream_label();
180            storage.enable_stream(&request.table_name, view_type, &label)?;
181        }
182    }
183
184    if let Some(ref tags) = request.tags {
185        if !tags.is_empty() {
186            storage.set_tags(&request.table_name, tags)?;
187        }
188    }
189
190    let meta = storage
191        .get_table_metadata(&request.table_name)?
192        .ok_or_else(|| {
193            DynoxideError::InternalServerError("Table metadata not found after creation".into())
194        })?;
195
196    let mut desc = build_table_description(&meta, Some(0), Some(0));
197    // CreateTable response shows CREATING status (table is usable immediately
198    // but DynamoDB API contract says newly-created tables start as CREATING)
199    desc.table_status = "CREATING".to_string();
200
201    // Override billing mode fields based on the actual request
202    let billing_mode_str = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
203    if billing_mode_str == "PROVISIONED" {
204        desc.billing_mode_summary = None;
205        desc.table_throughput_mode_summary = None;
206    } else if billing_mode_str == "PAY_PER_REQUEST" {
207        desc.billing_mode_summary = Some(crate::actions::BillingModeSummary {
208            billing_mode: "PAY_PER_REQUEST".to_string(),
209            last_update_to_pay_per_request_date_time: None,
210        });
211        desc.table_throughput_mode_summary = Some(crate::actions::TableThroughputModeSummary {
212            table_throughput_mode: "PAY_PER_REQUEST".to_string(),
213            last_update_to_pay_per_request_date_time: None,
214        });
215        // Ensure provisioned throughput shows zeros for PAY_PER_REQUEST
216        desc.provisioned_throughput = Some(crate::actions::TableProvisionedThroughputDescription {
217            read_capacity_units: 0,
218            write_capacity_units: 0,
219            number_of_decreases_today: 0,
220            last_increase_date_time: None,
221            last_decrease_date_time: None,
222        });
223    }
224
225    // Set all GSI statuses to CREATING for newly created tables
226    if let Some(ref mut gsis) = desc.global_secondary_indexes {
227        for gsi in gsis {
228            gsi.index_status = "CREATING".to_string();
229        }
230    }
231
232    // Remove DeletionProtectionEnabled from response if not explicitly set
233    // (DynamoDB doesn't include it in basic CreateTable response)
234    if request.deletion_protection_enabled.is_none() {
235        desc.deletion_protection_enabled = None;
236    }
237
238    Ok(CreateTableResponse {
239        table_description: desc,
240    })
241}
242
243/// Convert a String error to DynoxideError::ValidationException.
244fn ve(msg: String) -> DynoxideError {
245    DynoxideError::ValidationException(msg)
246}
247
248/// Validate a programmatically-constructed request (used when not deserialised from JSON).
249///
250/// The validation order matches DynamoDB's actual behaviour (as verified by the Dynalite
251/// conformance suite):
252///
253/// 1. Table name (missing, length, pattern)
254/// 2. BillingMode + ProvisionedThroughput consistency
255/// 3. ProvisionedThroughput out-of-bounds
256/// 4. Missing ProvisionedThroughput (default PROVISIONED billing)
257/// 5. Key attribute definition checks ("Invalid KeySchema" / detailed missing-attr message)
258/// 6. Key schema structure (duplicate names, wrong types)
259/// 7. Empty LSI/GSI lists
260/// 8. LSI/GSI structural validation (key schema, projections, duplicates, limits)
261/// 9. Cross-index duplicate names
262/// 10. Attribute definition count mismatch
263fn validate_typed_request(request: &CreateTableRequest) -> Result<()> {
264    if request.table_name.is_empty() {
265        return Err(DynoxideError::ValidationException(
266            "The parameter 'TableName' is required but was not present in the request".to_string(),
267        ));
268    }
269    if request.table_name.len() < 3 || request.table_name.len() > 255 {
270        return Err(DynoxideError::ValidationException(
271            "TableName must be at least 3 characters long and at most 255 characters long"
272                .to_string(),
273        ));
274    }
275
276    // Table name pattern
277    if !request
278        .table_name
279        .chars()
280        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
281    {
282        return Err(DynoxideError::ValidationException(format!(
283            "1 validation error detected: Value '{}' at 'tableName' failed to satisfy constraint: \
284             Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+",
285            request.table_name
286        )));
287    }
288
289    // BillingMode + ProvisionedThroughput consistency
290    let billing_mode_str = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
291    if billing_mode_str == "PAY_PER_REQUEST" && request.provisioned_throughput.is_some() {
292        return Err(DynoxideError::ValidationException(
293            "One or more parameter values were invalid: Neither ReadCapacityUnits nor \
294             WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST"
295                .to_string(),
296        ));
297    }
298
299    // ProvisionedThroughput out-of-bounds
300    if let Some(ref pt) = request.provisioned_throughput {
301        const MAX_THROUGHPUT: i64 = 1_000_000_000_000;
302        let rcu = pt.read_capacity_units.unwrap_or(0);
303        let wcu = pt.write_capacity_units.unwrap_or(0);
304        if rcu > MAX_THROUGHPUT {
305            return Err(DynoxideError::ValidationException(format!(
306                "Given value {} for ReadCapacityUnits is out of bounds",
307                rcu
308            )));
309        }
310        if wcu > MAX_THROUGHPUT {
311            return Err(DynoxideError::ValidationException(format!(
312                "Given value {} for WriteCapacityUnits is out of bounds",
313                wcu
314            )));
315        }
316    }
317
318    // Missing ProvisionedThroughput when billing mode is explicitly PROVISIONED.
319    // For the programmatic API, when BillingMode is not specified we default to
320    // PAY_PER_REQUEST for convenience. The HTTP/JSON path (validate_raw_and_build)
321    // applies the stricter DynamoDB default of PROVISIONED.
322    if request.billing_mode.is_some()
323        && billing_mode_str == "PROVISIONED"
324        && request.provisioned_throughput.is_none()
325    {
326        return Err(DynoxideError::ValidationException(
327            "One or more parameter values were invalid: ReadCapacityUnits and \
328             WriteCapacityUnits must both be specified when BillingMode is PROVISIONED"
329                .to_string(),
330        ));
331    }
332
333    // Key attribute definition checks (before key schema structure)
334    validate_key_attrs_in_defs(&request.key_schema, &request.attribute_definitions).map_err(ve)?;
335
336    // Key schema structure
337    validate_key_schema_structure(&request.key_schema).map_err(ve)?;
338
339    // Empty LSI/GSI lists (before structural validation and attr count)
340    if let Some(ref lsis) = request.local_secondary_indexes {
341        if lsis.is_empty() {
342            return Err(ve(
343                "One or more parameter values were invalid: List of LocalSecondaryIndexes is empty"
344                    .to_string(),
345            ));
346        }
347    }
348    if let Some(ref gsis) = request.global_secondary_indexes {
349        if gsis.is_empty() {
350            return Err(ve(
351                "One or more parameter values were invalid: List of GlobalSecondaryIndexes is empty"
352                    .to_string(),
353            ));
354        }
355    }
356
357    // LSI structural validation
358    if let Some(ref lsis) = request.local_secondary_indexes {
359        validate_lsi_list(lsis, &request.key_schema, &request.attribute_definitions).map_err(ve)?;
360    }
361
362    // GSI structural validation
363    if let Some(ref gsis) = request.global_secondary_indexes {
364        let bm = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
365        validate_gsi_list(gsis, &request.attribute_definitions, bm).map_err(ve)?;
366    }
367
368    // Cross-index duplicate names (checked before attr def count)
369    check_cross_index_duplicates(
370        &request.local_secondary_indexes,
371        &request.global_secondary_indexes,
372    )
373    .map_err(ve)?;
374
375    // Attribute definition count (last)
376    validate_attr_def_count(
377        &request.key_schema,
378        &request.attribute_definitions,
379        &request.local_secondary_indexes,
380        &request.global_secondary_indexes,
381    )
382    .map_err(ve)?;
383
384    Ok(())
385}
386
387fn check_cross_index_duplicates(
388    lsis: &Option<Vec<LocalSecondaryIndex>>,
389    gsis: &Option<Vec<GlobalSecondaryIndex>>,
390) -> std::result::Result<(), String> {
391    if let (Some(lsis), Some(gsis)) = (lsis, gsis) {
392        let mut all_names = std::collections::HashSet::new();
393        for lsi in lsis {
394            all_names.insert(&lsi.index_name);
395        }
396        for gsi in gsis {
397            if !all_names.insert(&gsi.index_name) {
398                return Err(format!(
399                    "One or more parameter values were invalid: Duplicate index name: {}",
400                    gsi.index_name
401                ));
402            }
403        }
404    }
405    Ok(())
406}
407
408// ---- Raw JSON validation (for deserialization path) ----
409
410fn validate_raw_and_build(raw: RawRequest) -> std::result::Result<CreateTableRequest, String> {
411    // Missing TableName is a different error format from invalid TableName
412    if raw.table_name.is_none() {
413        return Err(
414            "The parameter 'TableName' is required but was not present in the request".to_string(),
415        );
416    }
417
418    // Use the shared constraint error collector for table name validation.
419    // This produces the correct multi-field constraint format for empty,
420    // too-short, too-long, or invalid-pattern table names.
421    let name_errors = crate::validation::table_name_constraint_errors(
422        raw.table_name.as_deref(),
423        crate::validation::TableNameContext::CreateTable,
424    );
425    if !name_errors.is_empty() {
426        let msg = format!(
427            "{} validation error{} detected: {}",
428            name_errors.len(),
429            if name_errors.len() > 1 { "s" } else { "" },
430            name_errors.join("; ")
431        );
432        return Err(msg);
433    }
434    let table_name = raw.table_name.unwrap();
435
436    let mut errors = Vec::new();
437
438    if let Some(ref bm) = raw.billing_mode {
439        if bm != "PROVISIONED" && bm != "PAY_PER_REQUEST" {
440            errors.push(format!(
441                "Value '{}' at 'billingMode' failed to satisfy constraint: \
442                 Member must satisfy enum value set: [PROVISIONED, PAY_PER_REQUEST]",
443                bm
444            ));
445        }
446    }
447
448    collect_pt_errors(&raw.provisioned_throughput, &mut errors);
449    collect_ks_errors(&raw.key_schema, &mut errors);
450    collect_ad_errors(&raw.attribute_definitions, &mut errors);
451    collect_lsi_errors(&raw.local_secondary_indexes, &mut errors);
452    collect_gsi_errors(&raw.global_secondary_indexes, &mut errors);
453
454    // DynamoDB caps multi-field constraint errors at 10
455    errors.truncate(10);
456
457    if !errors.is_empty() {
458        let prefix = format!(
459            "{} validation error{} detected: ",
460            errors.len(),
461            if errors.len() == 1 { "" } else { "s" }
462        );
463        return Err(format!("{}{}", prefix, errors.join("; ")));
464    }
465
466    // BillingMode + ProvisionedThroughput consistency (HTTP path only)
467    let billing_mode_str = raw.billing_mode.as_deref().unwrap_or("PROVISIONED");
468    if billing_mode_str == "PAY_PER_REQUEST" && raw.provisioned_throughput.is_some() {
469        return Err(
470            "One or more parameter values were invalid: Neither ReadCapacityUnits nor \
471             WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST"
472                .to_string(),
473        );
474    }
475
476    // ProvisionedThroughput out-of-bounds (after multi-field but before struct checks)
477    if let Some(ref pt) = raw.provisioned_throughput {
478        if let Some(obj) = pt.as_object() {
479            let rcu = obj
480                .get("ReadCapacityUnits")
481                .and_then(|v| v.as_i64())
482                .unwrap_or(0);
483            let wcu = obj
484                .get("WriteCapacityUnits")
485                .and_then(|v| v.as_i64())
486                .unwrap_or(0);
487            const MAX_THROUGHPUT: i64 = 1_000_000_000_000;
488            if rcu > MAX_THROUGHPUT {
489                return Err(format!(
490                    "Given value {} for ReadCapacityUnits is out of bounds",
491                    rcu
492                ));
493            }
494            if wcu > MAX_THROUGHPUT {
495                return Err(format!(
496                    "Given value {} for WriteCapacityUnits is out of bounds",
497                    wcu
498                ));
499            }
500        }
501    }
502
503    // Missing ProvisionedThroughput when BillingMode is explicitly PROVISIONED.
504    if raw.billing_mode.as_deref() == Some("PROVISIONED") && raw.provisioned_throughput.is_none() {
505        return Err(
506            "One or more parameter values were invalid: ReadCapacityUnits and \
507             WriteCapacityUnits must both be specified when BillingMode is PROVISIONED"
508                .to_string(),
509        );
510    }
511
512    // Parse typed structures
513    let key_schema: Vec<KeySchemaElement> = raw
514        .key_schema
515        .as_ref()
516        .map(|v| serde_json::from_value(v.clone()))
517        .transpose()
518        .map_err(|e| e.to_string())?
519        .unwrap_or_default();
520    let attribute_definitions: Vec<AttributeDefinition> = raw
521        .attribute_definitions
522        .as_ref()
523        .map(|v| serde_json::from_value(v.clone()))
524        .transpose()
525        .map_err(|e| e.to_string())?
526        .unwrap_or_default();
527    let provisioned_throughput: Option<ProvisionedThroughput> = raw
528        .provisioned_throughput
529        .as_ref()
530        .map(|v| serde_json::from_value(v.clone()))
531        .transpose()
532        .map_err(|e| e.to_string())?;
533    let global_secondary_indexes: Option<Vec<GlobalSecondaryIndex>> = raw
534        .global_secondary_indexes
535        .as_ref()
536        .map(|v| serde_json::from_value(v.clone()))
537        .transpose()
538        .map_err(|e| e.to_string())?;
539    let local_secondary_indexes: Option<Vec<LocalSecondaryIndex>> = raw
540        .local_secondary_indexes
541        .as_ref()
542        .map(|v| serde_json::from_value(v.clone()))
543        .transpose()
544        .map_err(|e| e.to_string())?;
545
546    Ok(CreateTableRequest {
547        table_name,
548        key_schema,
549        attribute_definitions,
550        global_secondary_indexes,
551        local_secondary_indexes,
552        billing_mode: raw.billing_mode,
553        provisioned_throughput,
554        stream_specification: raw.stream_specification,
555        sse_specification: raw.sse_specification,
556        table_class: raw.table_class,
557        tags: raw.tags,
558        deletion_protection_enabled: raw.deletion_protection_enabled,
559    })
560}
561
562// ---- Multi-field constraint error collectors ----
563
564fn collect_pt_errors(pt_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
565    if let Some(v) = pt_val {
566        if let Some(obj) = v.as_object() {
567            let wcu = obj.get("WriteCapacityUnits");
568            let rcu = obj.get("ReadCapacityUnits");
569            if wcu.is_none() || wcu == Some(&serde_json::Value::Null) {
570                errors.push("Value null at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
571            } else if let Some(w) = wcu.and_then(|v| v.as_i64()) {
572                if w < 1 {
573                    errors.push(format!("Value '{}' at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w));
574                }
575            }
576            if rcu.is_none() || rcu == Some(&serde_json::Value::Null) {
577                errors.push("Value null at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
578            } else if let Some(r) = rcu.and_then(|v| v.as_i64()) {
579                if r < 1 {
580                    errors.push(format!("Value '{}' at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r));
581                }
582            }
583        }
584    }
585}
586
587fn collect_ks_errors(ks_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
588    match ks_val {
589        None => {
590            errors.push(
591                "Value null at 'keySchema' failed to satisfy constraint: Member must not be null"
592                    .to_string(),
593            );
594        }
595        Some(v) => {
596            if let Some(arr) = v.as_array() {
597                if arr.is_empty() {
598                    errors.push("Value '[]' at 'keySchema' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string());
599                } else if arr.len() > 2 {
600                    let dump = render_key_schema_java_toString(arr);
601                    errors.push(format!("Value '{}' at 'keySchema' failed to satisfy constraint: Member must have length less than or equal to 2", dump));
602                }
603                for (i, elem) in arr.iter().enumerate().take(10) {
604                    collect_ks_elem_errors(elem, i + 1, errors);
605                }
606            }
607        }
608    }
609}
610
611fn collect_ks_elem_errors(elem: &serde_json::Value, idx: usize, errors: &mut Vec<String>) {
612    if let Some(obj) = elem.as_object() {
613        if !obj.contains_key("AttributeName")
614            || obj.get("AttributeName") == Some(&serde_json::Value::Null)
615        {
616            errors.push(format!("Value null at 'keySchema.{}.member.attributeName' failed to satisfy constraint: Member must not be null", idx));
617        }
618        let kt = obj.get("KeyType");
619        if kt.is_none() || kt == Some(&serde_json::Value::Null) {
620            errors.push(format!("Value null at 'keySchema.{}.member.keyType' failed to satisfy constraint: Member must not be null", idx));
621        } else if let Some(s) = kt.and_then(|v| v.as_str()) {
622            if s != "HASH" && s != "RANGE" {
623                errors.push(format!("Value '{}' at 'keySchema.{}.member.keyType' failed to satisfy constraint: Member must satisfy enum value set: [HASH, RANGE]", s, idx));
624            }
625        }
626    }
627}
628
629/// Render a KeySchema array the way AWS does in validation messages: Java's
630/// default toString() shape over the SDK's KeySchemaElement model, e.g.
631/// `[KeySchemaElement(attributeName=pk, keyType=HASH), KeySchemaElement(attributeName=sk, keyType=RANGE)]`.
632/// Missing attribute fields render as the empty string, matching what AWS
633/// emits when the upstream object hasn't been populated.
634#[allow(non_snake_case)]
635fn render_key_schema_java_toString(arr: &[serde_json::Value]) -> String {
636    let parts: Vec<String> = arr
637        .iter()
638        .map(|elem| {
639            let an = elem
640                .get("AttributeName")
641                .and_then(|v| v.as_str())
642                .unwrap_or("");
643            let kt = elem.get("KeyType").and_then(|v| v.as_str()).unwrap_or("");
644            format!("KeySchemaElement(attributeName={an}, keyType={kt})")
645        })
646        .collect();
647    format!("[{}]", parts.join(", "))
648}
649
650fn collect_ad_errors(ad_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
651    match ad_val {
652        None => {
653            errors.push("Value null at 'attributeDefinitions' failed to satisfy constraint: Member must not be null".to_string());
654        }
655        Some(v) => {
656            if let Some(arr) = v.as_array() {
657                for (i, elem) in arr.iter().enumerate() {
658                    if let Some(obj) = elem.as_object() {
659                        if !obj.contains_key("AttributeName")
660                            || obj.get("AttributeName") == Some(&serde_json::Value::Null)
661                        {
662                            errors.push(format!("Value null at 'attributeDefinitions.{}.member.attributeName' failed to satisfy constraint: Member must not be null", i + 1));
663                        }
664                        let at = obj.get("AttributeType");
665                        if at.is_none() || at == Some(&serde_json::Value::Null) {
666                            errors.push(format!("Value null at 'attributeDefinitions.{}.member.attributeType' failed to satisfy constraint: Member must not be null", i + 1));
667                        } else if let Some(s) = at.and_then(|v| v.as_str()) {
668                            if s != "S" && s != "N" && s != "B" {
669                                errors.push(format!("Value '{}' at 'attributeDefinitions.{}.member.attributeType' failed to satisfy constraint: Member must satisfy enum value set: [B, N, S]", s, i + 1));
670                            }
671                        }
672                    }
673                }
674            }
675        }
676    }
677}
678
679fn collect_lsi_errors(lsi_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
680    if let Some(v) = lsi_val {
681        if let Some(arr) = v.as_array() {
682            for (i, elem) in arr.iter().enumerate().take(10) {
683                if let Some(obj) = elem.as_object() {
684                    // Order: indexName, keySchema, projection
685                    if !obj.contains_key("IndexName")
686                        || obj.get("IndexName") == Some(&serde_json::Value::Null)
687                    {
688                        errors.push(format!("Value null at 'localSecondaryIndexes.{}.member.indexName' failed to satisfy constraint: Member must not be null", i + 1));
689                    } else if let Some(name) = obj.get("IndexName").and_then(|v| v.as_str()) {
690                        collect_idx_name_errors(name, "localSecondaryIndexes", i + 1, errors);
691                    }
692                    if !obj.contains_key("KeySchema")
693                        || obj.get("KeySchema") == Some(&serde_json::Value::Null)
694                    {
695                        errors.push(format!("Value null at 'localSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must not be null", i + 1));
696                    } else if let Some(ks) = obj.get("KeySchema").and_then(|v| v.as_array()) {
697                        if ks.is_empty() {
698                            errors.push(format!("Value '[]' at 'localSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must have length greater than or equal to 1", i + 1));
699                        }
700                    }
701                    if !obj.contains_key("Projection")
702                        || obj.get("Projection") == Some(&serde_json::Value::Null)
703                    {
704                        errors.push(format!("Value null at 'localSecondaryIndexes.{}.member.projection' failed to satisfy constraint: Member must not be null", i + 1));
705                    } else if let Some(p) = obj.get("Projection").and_then(|v| v.as_object()) {
706                        collect_proj_errors(p, &format!("localSecondaryIndexes.{}", i + 1), errors);
707                    }
708                }
709            }
710        }
711    }
712}
713
714fn collect_gsi_errors(gsi_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
715    if let Some(v) = gsi_val {
716        if let Some(arr) = v.as_array() {
717            for (i, elem) in arr.iter().enumerate().take(10) {
718                if let Some(obj) = elem.as_object() {
719                    // Order for GSI: keySchema, projection, indexName
720                    if !obj.contains_key("KeySchema")
721                        || obj.get("KeySchema") == Some(&serde_json::Value::Null)
722                    {
723                        errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must not be null", i + 1));
724                    } else if let Some(ks) = obj.get("KeySchema").and_then(|v| v.as_array()) {
725                        if ks.is_empty() {
726                            errors.push(format!("Value '[]' at 'globalSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must have length greater than or equal to 1", i + 1));
727                        }
728                    }
729                    if !obj.contains_key("Projection")
730                        || obj.get("Projection") == Some(&serde_json::Value::Null)
731                    {
732                        errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.projection' failed to satisfy constraint: Member must not be null", i + 1));
733                    } else if let Some(p) = obj.get("Projection").and_then(|v| v.as_object()) {
734                        collect_proj_errors(
735                            p,
736                            &format!("globalSecondaryIndexes.{}", i + 1),
737                            errors,
738                        );
739                    }
740                    if !obj.contains_key("IndexName")
741                        || obj.get("IndexName") == Some(&serde_json::Value::Null)
742                    {
743                        errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.indexName' failed to satisfy constraint: Member must not be null", i + 1));
744                    } else if let Some(name) = obj.get("IndexName").and_then(|v| v.as_str()) {
745                        collect_idx_name_errors(name, "globalSecondaryIndexes", i + 1, errors);
746                    }
747                    // GSI ProvisionedThroughput
748                    if let Some(pt) = obj.get("ProvisionedThroughput").and_then(|v| v.as_object()) {
749                        let wcu = pt.get("WriteCapacityUnits");
750                        let rcu = pt.get("ReadCapacityUnits");
751                        if let Some(w) = wcu.and_then(|v| v.as_i64()) {
752                            if w < 1 {
753                                errors.push(format!("Value '{}' at 'globalSecondaryIndexes.{}.member.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w, i + 1));
754                            }
755                        } else if wcu.is_none() || wcu == Some(&serde_json::Value::Null) {
756                            errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
757                        }
758                        if let Some(r) = rcu.and_then(|v| v.as_i64()) {
759                            if r < 1 {
760                                errors.push(format!("Value '{}' at 'globalSecondaryIndexes.{}.member.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r, i + 1));
761                            }
762                        } else if rcu.is_none() || rcu == Some(&serde_json::Value::Null) {
763                            errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
764                        }
765                    }
766                }
767            }
768        }
769    }
770}
771
772fn collect_idx_name_errors(name: &str, prefix: &str, idx: usize, errors: &mut Vec<String>) {
773    if !name
774        .chars()
775        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
776    {
777        errors.push(format!("Value '{}' at '{}.{}.member.indexName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", name, prefix, idx));
778    }
779    if name.len() < 3 {
780        errors.push(format!("Value '{}' at '{}.{}.member.indexName' failed to satisfy constraint: Member must have length greater than or equal to 3", name, prefix, idx));
781    }
782    if name.len() > 255 {
783        errors.push(format!("Value '{}' at '{}.{}.member.indexName' failed to satisfy constraint: Member must have length less than or equal to 255", name, prefix, idx));
784    }
785}
786
787fn collect_proj_errors(
788    proj: &serde_json::Map<String, serde_json::Value>,
789    prefix: &str,
790    errors: &mut Vec<String>,
791) {
792    if let Some(pt) = proj.get("ProjectionType") {
793        if let Some(s) = pt.as_str() {
794            if s != "ALL" && s != "KEYS_ONLY" && s != "INCLUDE" {
795                errors.push(format!("Value '{}' at '{}.member.projection.projectionType' failed to satisfy constraint: Member must satisfy enum value set: [ALL, INCLUDE, KEYS_ONLY]", s, prefix));
796            }
797        }
798    }
799    if let Some(nka) = proj.get("NonKeyAttributes") {
800        if let Some(arr) = nka.as_array() {
801            if arr.is_empty() {
802                errors.push(format!("Value '[]' at '{}.member.projection.nonKeyAttributes' failed to satisfy constraint: Member must have length greater than or equal to 1", prefix));
803            }
804        }
805    }
806}
807
808// ---- Structural validation helpers ----
809
810fn validate_key_schema_structure(ks: &[KeySchemaElement]) -> std::result::Result<(), String> {
811    if ks.is_empty() {
812        return Err("1 validation error detected: Value null at 'keySchema' failed to satisfy constraint: Member must have length less than or equal to 2".to_string());
813    }
814    if ks[0].key_type != KeyType::HASH {
815        return Err(
816            "Invalid KeySchema: The first KeySchemaElement is not a HASH key type".to_string(),
817        );
818    }
819    if ks.len() == 2 && ks[0].attribute_name == ks[1].attribute_name {
820        return Err(
821            "Both the Hash Key and the Range Key element in the KeySchema have the same name"
822                .to_string(),
823        );
824    }
825    if ks.len() == 2 && ks[1].key_type != KeyType::RANGE {
826        return Err(
827            "Invalid KeySchema: The second KeySchemaElement is not a RANGE key type".to_string(),
828        );
829    }
830    Ok(())
831}
832
833fn validate_key_attrs_in_defs(
834    ks: &[KeySchemaElement],
835    defs: &[AttributeDefinition],
836) -> std::result::Result<(), String> {
837    // Collect missing key attribute names
838    let missing: Vec<&str> = ks
839        .iter()
840        .filter(|k| !defs.iter().any(|d| d.attribute_name == k.attribute_name))
841        .map(|k| k.attribute_name.as_str())
842        .collect();
843
844    if missing.is_empty() {
845        // Even if no keys are missing, check for structural issues (dup names/types)
846        // which DynamoDB reports as generic "no definition" when defs exist
847        let has_dup_names = ks.len() == 2 && ks[0].attribute_name == ks[1].attribute_name;
848        if has_dup_names {
849            return Err(
850                "Invalid KeySchema: Some index key attribute have no definition".to_string(),
851            );
852        }
853        return Ok(());
854    }
855
856    // Use generic message when:
857    // - defs is empty (fewer defs than unique key attrs)
858    // - key schema has 2 elements (regardless of structural validity)
859    // - key schema has structural issues (dup names, dup types)
860    // Use detailed message only when defs is non-empty AND key schema has 1 element
861    let has_dup_names = ks.len() == 2 && ks[0].attribute_name == ks[1].attribute_name;
862    let has_dup_types = ks.len() == 2 && ks[0].key_type == ks[1].key_type;
863    let use_generic = defs.is_empty() || ks.len() >= 2 || has_dup_names || has_dup_types;
864
865    if use_generic {
866        return Err("Invalid KeySchema: Some index key attribute have no definition".to_string());
867    }
868
869    // Detailed message for single-key schema with non-empty defs
870    let key_names: Vec<&str> = missing.to_vec();
871    let def_names: Vec<&str> = defs.iter().map(|d| d.attribute_name.as_str()).collect();
872    Err(format!(
873        "One or more parameter values were invalid: Some index key attributes are not defined in \
874         AttributeDefinitions. Keys: [{}], AttributeDefinitions: [{}]",
875        key_names.join(", "),
876        def_names.join(", ")
877    ))
878}
879
880fn validate_attr_def_count(
881    ks: &[KeySchemaElement],
882    defs: &[AttributeDefinition],
883    lsis: &Option<Vec<LocalSecondaryIndex>>,
884    gsis: &Option<Vec<GlobalSecondaryIndex>>,
885) -> std::result::Result<(), String> {
886    let mut all_key_attrs = std::collections::HashSet::new();
887    for k in ks {
888        all_key_attrs.insert(k.attribute_name.as_str());
889    }
890    if let Some(lsis) = lsis {
891        for lsi in lsis {
892            for k in &lsi.key_schema {
893                all_key_attrs.insert(k.attribute_name.as_str());
894            }
895        }
896    }
897    if let Some(gsis) = gsis {
898        for gsi in gsis {
899            for k in &gsi.key_schema {
900                all_key_attrs.insert(k.attribute_name.as_str());
901            }
902        }
903    }
904    if defs.len() != all_key_attrs.len() {
905        return Err("One or more parameter values were invalid: Number of attributes in KeySchema does not exactly match number of attributes defined in AttributeDefinitions".to_string());
906    }
907    Ok(())
908}
909
910fn validate_lsi_list(
911    lsis: &[LocalSecondaryIndex],
912    ks: &[KeySchemaElement],
913    defs: &[AttributeDefinition],
914) -> std::result::Result<(), String> {
915    // Empty check is done earlier in validate_typed_request
916
917    if !ks.iter().any(|k| k.key_type == KeyType::RANGE) {
918        return Err("One or more parameter values were invalid: Table KeySchema does not have a range key, which is required when specifying a LocalSecondaryIndex".to_string());
919    }
920
921    // Check missing attribute definitions across all LSI keys
922    let def_names: Vec<&str> = defs.iter().map(|d| d.attribute_name.as_str()).collect();
923    let mut missing_keys = Vec::new();
924    for lsi in lsis {
925        for k in &lsi.key_schema {
926            if !def_names.contains(&k.attribute_name.as_str())
927                && !missing_keys.contains(&k.attribute_name.as_str())
928            {
929                missing_keys.push(k.attribute_name.as_str());
930            }
931        }
932    }
933    if !missing_keys.is_empty() {
934        let mut all_keys = Vec::new();
935        for lsi in lsis {
936            for k in &lsi.key_schema {
937                if !all_keys.contains(&k.attribute_name.as_str()) {
938                    all_keys.push(k.attribute_name.as_str());
939                }
940            }
941        }
942        return Err(format!(
943            "One or more parameter values were invalid: Some index key attributes are not defined in AttributeDefinitions. Keys: [{}], AttributeDefinitions: [{}]",
944            all_keys.join(", "),
945            def_names.join(", ")
946        ));
947    }
948
949    // Structural validation for each LSI
950    for lsi in lsis {
951        validate_lsi_structure(lsi, ks)?;
952    }
953
954    // Duplicate index names
955    let mut seen = std::collections::HashSet::new();
956    for lsi in lsis {
957        if !seen.insert(&lsi.index_name) {
958            return Err(format!(
959                "One or more parameter values were invalid: Duplicate index name: {}",
960                lsi.index_name
961            ));
962        }
963    }
964
965    // Count limit
966    if lsis.len() > 5 {
967        return Err("One or more parameter values were invalid: Number of LocalSecondaryIndexes exceeds per-table limit of 5".to_string());
968    }
969
970    Ok(())
971}
972
973fn validate_gsi_list(
974    gsis: &[GlobalSecondaryIndex],
975    defs: &[AttributeDefinition],
976    bm: &str,
977) -> std::result::Result<(), String> {
978    // Empty check is done earlier in validate_typed_request
979
980    // Check missing attribute definitions across all GSI keys
981    let def_names: Vec<&str> = defs.iter().map(|d| d.attribute_name.as_str()).collect();
982    let mut missing_keys = Vec::new();
983    for gsi in gsis {
984        for k in &gsi.key_schema {
985            if !def_names.contains(&k.attribute_name.as_str())
986                && !missing_keys.contains(&k.attribute_name.as_str())
987            {
988                missing_keys.push(k.attribute_name.as_str());
989            }
990        }
991    }
992    if !missing_keys.is_empty() {
993        let mut all_keys = Vec::new();
994        for gsi in gsis {
995            for k in &gsi.key_schema {
996                if !all_keys.contains(&k.attribute_name.as_str()) {
997                    all_keys.push(k.attribute_name.as_str());
998                }
999            }
1000        }
1001        return Err(format!(
1002            "One or more parameter values were invalid: Some index key attributes are not defined in AttributeDefinitions. Keys: [{}], AttributeDefinitions: [{}]",
1003            all_keys.join(", "),
1004            def_names.join(", ")
1005        ));
1006    }
1007
1008    // Structural validation for each GSI
1009    for gsi in gsis {
1010        validate_gsi_structure(gsi)?;
1011    }
1012
1013    // Duplicate index names
1014    let mut seen = std::collections::HashSet::new();
1015    for gsi in gsis {
1016        if !seen.insert(&gsi.index_name) {
1017            return Err(format!(
1018                "One or more parameter values were invalid: Duplicate index name: {}",
1019                gsi.index_name
1020            ));
1021        }
1022    }
1023
1024    // Count limit
1025    if gsis.len() > 20 {
1026        return Err("One or more parameter values were invalid: GlobalSecondaryIndex count exceeds the per-table limit of 20".to_string());
1027    }
1028
1029    // PAY_PER_REQUEST billing mode check
1030    if bm == "PAY_PER_REQUEST" {
1031        for gsi in gsis {
1032            if gsi.provisioned_throughput.is_some() {
1033                return Err(format!(
1034                    "One or more parameter values were invalid: ProvisionedThroughput should not be specified for index: {} when BillingMode is PAY_PER_REQUEST",
1035                    gsi.index_name
1036                ));
1037            }
1038        }
1039    }
1040
1041    Ok(())
1042}
1043
1044fn validate_lsi_structure(
1045    lsi: &LocalSecondaryIndex,
1046    table_ks: &[KeySchemaElement],
1047) -> std::result::Result<(), String> {
1048    // Key schema structure first
1049    validate_key_schema_structure(&lsi.key_schema)?;
1050
1051    // Range key presence (before projection, per DynamoDB ordering)
1052    let lsi_sk = lsi.key_schema.iter().find(|k| k.key_type == KeyType::RANGE);
1053    if lsi_sk.is_none() {
1054        return Err(format!(
1055            "One or more parameter values were invalid: Index KeySchema does not have a range key for index: {}",
1056            lsi.index_name
1057        ));
1058    }
1059
1060    // Hash key must match table hash key (before projection)
1061    let table_pk = table_ks
1062        .iter()
1063        .find(|k| k.key_type == KeyType::HASH)
1064        .map(|k| k.attribute_name.as_str());
1065    let lsi_pk = lsi
1066        .key_schema
1067        .iter()
1068        .find(|k| k.key_type == KeyType::HASH)
1069        .map(|k| k.attribute_name.as_str());
1070    if lsi_pk != table_pk {
1071        return Err(format!(
1072            "One or more parameter values were invalid: \
1073             Index KeySchema does not have the same leading hash key as table KeySchema \
1074             for index: {}. index hash key: {}, table hash key: {}",
1075            lsi.index_name,
1076            lsi_pk.unwrap_or("null"),
1077            table_pk.unwrap_or("null")
1078        ));
1079    }
1080
1081    // Projection (after range key and hash key checks)
1082    validate_proj_structure(&lsi.projection)?;
1083
1084    Ok(())
1085}
1086
1087fn validate_gsi_structure(gsi: &GlobalSecondaryIndex) -> std::result::Result<(), String> {
1088    validate_key_schema_structure(&gsi.key_schema)?;
1089    validate_proj_structure(&gsi.projection)?;
1090    Ok(())
1091}
1092
1093fn validate_proj_structure(p: &Projection) -> std::result::Result<(), String> {
1094    match &p.projection_type {
1095        None => Err(
1096            "One or more parameter values were invalid: Unknown ProjectionType: null".to_string(),
1097        ),
1098        Some(pt) => {
1099            if let Some(ref nka) = p.non_key_attributes {
1100                match pt {
1101                    ProjectionType::ALL => return Err("One or more parameter values were invalid: ProjectionType is ALL, but NonKeyAttributes is specified".to_string()),
1102                    ProjectionType::KEYS_ONLY => return Err("One or more parameter values were invalid: ProjectionType is KEYS_ONLY, but NonKeyAttributes is specified".to_string()),
1103                    ProjectionType::INCLUDE => { if nka.is_empty() { return Err("One or more parameter values were invalid: NonKeyAttributes must not be empty".to_string()); } }
1104                }
1105            }
1106            Ok(())
1107        }
1108    }
1109}