Skip to main content

dynoxide/actions/
create_table.rs

1use crate::actions::{TableDescription, build_table_description};
2use crate::errors::{DynoxideError, Result};
3use crate::storage_backend::StorageBackend;
4use crate::streams;
5use crate::types::{
6    AttributeDefinition, GlobalSecondaryIndex, KeySchemaElement, KeyType, LocalSecondaryIndex,
7    Projection, ProjectionType, ProvisionedThroughput,
8};
9use serde::{Deserialize, Serialize};
10use web_time::{SystemTime, UNIX_EPOCH};
11
12/// Internal raw deserialization struct — uses serde_json::Value for fields
13/// that participate in DynamoDB's multi-field constraint validation.
14#[derive(Debug, Default, Deserialize)]
15struct RawRequest {
16    #[serde(rename = "TableName", default)]
17    table_name: Option<String>,
18    #[serde(rename = "KeySchema", default)]
19    key_schema: Option<serde_json::Value>,
20    #[serde(rename = "AttributeDefinitions", default)]
21    attribute_definitions: Option<serde_json::Value>,
22    #[serde(rename = "GlobalSecondaryIndexes", default)]
23    global_secondary_indexes: Option<serde_json::Value>,
24    #[serde(rename = "LocalSecondaryIndexes", default)]
25    local_secondary_indexes: Option<serde_json::Value>,
26    #[serde(rename = "BillingMode", default)]
27    billing_mode: Option<String>,
28    #[serde(rename = "ProvisionedThroughput", default)]
29    provisioned_throughput: Option<serde_json::Value>,
30    #[serde(rename = "StreamSpecification", default)]
31    stream_specification: Option<StreamSpecification>,
32    #[serde(rename = "SSESpecification", default)]
33    sse_specification: Option<crate::types::SseSpecification>,
34    #[serde(rename = "TableClass", default)]
35    table_class: Option<String>,
36    #[serde(rename = "Tags", default)]
37    tags: Option<Vec<crate::types::Tag>>,
38    #[serde(rename = "DeletionProtectionEnabled", default)]
39    deletion_protection_enabled: Option<bool>,
40    #[serde(rename = "OnDemandThroughput", default)]
41    on_demand_throughput: Option<crate::types::OnDemandThroughput>,
42}
43
44/// Public request type — fully validated, typed fields.
45/// Can be constructed directly (programmatic use) or deserialized from JSON.
46#[derive(Debug, Default)]
47pub struct CreateTableRequest {
48    pub table_name: String,
49    pub key_schema: Vec<KeySchemaElement>,
50    pub attribute_definitions: Vec<AttributeDefinition>,
51    pub global_secondary_indexes: Option<Vec<GlobalSecondaryIndex>>,
52    pub local_secondary_indexes: Option<Vec<LocalSecondaryIndex>>,
53    pub billing_mode: Option<String>,
54    pub provisioned_throughput: Option<ProvisionedThroughput>,
55    pub stream_specification: Option<StreamSpecification>,
56    pub sse_specification: Option<crate::types::SseSpecification>,
57    pub table_class: Option<String>,
58    pub tags: Option<Vec<crate::types::Tag>>,
59    pub deletion_protection_enabled: Option<bool>,
60    pub on_demand_throughput: Option<crate::types::OnDemandThroughput>,
61}
62
63/// Custom Deserialize that does loose JSON parsing first, validates, then builds typed fields.
64/// Validation errors use "VALIDATION:" prefix so server.rs converts them to ValidationException.
65impl<'de> serde::Deserialize<'de> for CreateTableRequest {
66    fn deserialize<D: serde::Deserializer<'de>>(
67        deserializer: D,
68    ) -> std::result::Result<Self, D::Error> {
69        let raw = RawRequest::deserialize(deserializer)?;
70        match validate_raw_and_build(raw) {
71            Ok(req) => Ok(req),
72            Err(msg) => Err(serde::de::Error::custom(format!("VALIDATION:{}", msg))),
73        }
74    }
75}
76
77#[derive(Debug, Default, Deserialize)]
78pub struct StreamSpecification {
79    #[serde(rename = "StreamEnabled", alias = "stream_enabled")]
80    pub stream_enabled: bool,
81    #[serde(rename = "StreamViewType", alias = "stream_view_type", default)]
82    pub stream_view_type: Option<String>,
83}
84
85#[derive(Debug, Default, Serialize)]
86pub struct CreateTableResponse {
87    #[serde(rename = "TableDescription")]
88    pub table_description: TableDescription,
89}
90
91pub async fn execute<S: StorageBackend>(
92    storage: &S,
93    request: CreateTableRequest,
94) -> Result<CreateTableResponse> {
95    // Structural validation (runs for both programmatic and JSON paths)
96    validate_typed_request(&request)?;
97
98    if let Some(ref tc) = request.table_class {
99        if tc != "STANDARD" && tc != "STANDARD_INFREQUENT_ACCESS" {
100            return Err(DynoxideError::ValidationException(format!(
101                "1 validation error detected: Value '{tc}' at 'tableClass' failed to satisfy \
102                 constraint: Member must satisfy enum value set: \
103                 [STANDARD, STANDARD_INFREQUENT_ACCESS]"
104            )));
105        }
106    }
107
108    if storage.table_exists(&request.table_name).await? {
109        return Err(DynoxideError::ResourceInUseException(format!(
110            "Table already exists: {}",
111            request.table_name
112        )));
113    }
114
115    let now = SystemTime::now()
116        .duration_since(UNIX_EPOCH)
117        .unwrap_or_default()
118        .as_secs() as i64;
119
120    let key_schema_json = serde_json::to_string(&request.key_schema)
121        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
122    let attr_defs_json = serde_json::to_string(&request.attribute_definitions)
123        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
124    let gsi_json = request
125        .global_secondary_indexes
126        .as_ref()
127        .map(serde_json::to_string)
128        .transpose()
129        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
130    let lsi_json = request
131        .local_secondary_indexes
132        .as_ref()
133        .map(serde_json::to_string)
134        .transpose()
135        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
136    let pt_json = request
137        .provisioned_throughput
138        .as_ref()
139        .map(serde_json::to_string)
140        .transpose()
141        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
142    // Normalise the SSE spec so a DescribeTable round-trip matches AWS: when
143    // encryption is enabled without an explicit type/key, AWS reports SSEType=KMS
144    // and a KMS key ARN (the AWS-managed `aws/dynamodb` key). Persisting the
145    // synthesised key id keeps the reported ARN stable across DescribeTable calls.
146    let normalized_sse = request.sse_specification.as_ref().map(|spec| {
147        if spec.enabled == Some(true) {
148            crate::types::SseSpecification {
149                enabled: Some(true),
150                sse_type: spec.sse_type.clone().or_else(|| Some("KMS".to_string())),
151                kms_master_key_id: spec.kms_master_key_id.clone().or_else(|| {
152                    Some(crate::streams::kms_key_arn(
153                        &uuid::Uuid::new_v4().to_string(),
154                    ))
155                }),
156            }
157        } else {
158            spec.clone()
159        }
160    });
161    let sse_json = normalized_sse
162        .as_ref()
163        .map(serde_json::to_string)
164        .transpose()
165        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
166    let on_demand_json = request
167        .on_demand_throughput
168        .as_ref()
169        .map(serde_json::to_string)
170        .transpose()
171        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
172    let deletion_protection = request.deletion_protection_enabled.unwrap_or(false);
173
174    let billing_mode_str = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
175    storage
176        .insert_table_metadata(&crate::storage::CreateTableMetadata {
177            table_name: &request.table_name,
178            key_schema: &key_schema_json,
179            attribute_definitions: &attr_defs_json,
180            gsi_definitions: gsi_json.as_deref(),
181            lsi_definitions: lsi_json.as_deref(),
182            provisioned_throughput: pt_json.as_deref(),
183            created_at: now,
184            sse_specification: sse_json.as_deref(),
185            table_class: request.table_class.as_deref(),
186            deletion_protection_enabled: deletion_protection,
187            billing_mode: Some(billing_mode_str),
188            on_demand_throughput: on_demand_json.as_deref(),
189        })
190        .await?;
191
192    storage.create_data_table(&request.table_name).await?;
193
194    if let Some(ref gsis) = request.global_secondary_indexes {
195        for gsi in gsis {
196            storage
197                .create_gsi_table(&request.table_name, &gsi.index_name)
198                .await?;
199        }
200    }
201
202    if let Some(ref lsis) = request.local_secondary_indexes {
203        for lsi in lsis {
204            storage
205                .create_lsi_table(&request.table_name, &lsi.index_name)
206                .await?;
207        }
208    }
209
210    if let Some(ref spec) = request.stream_specification {
211        if spec.stream_enabled {
212            let view_type = spec
213                .stream_view_type
214                .as_deref()
215                .unwrap_or("NEW_AND_OLD_IMAGES");
216            let label = streams::generate_stream_label(storage.clock());
217            storage
218                .enable_stream(&request.table_name, view_type, &label)
219                .await?;
220        }
221    }
222
223    if let Some(ref tags) = request.tags {
224        if !tags.is_empty() {
225            storage.set_tags(&request.table_name, tags).await?;
226        }
227    }
228
229    let meta = storage
230        .get_table_metadata(&request.table_name)
231        .await?
232        .ok_or_else(|| {
233            DynoxideError::InternalServerError("Table metadata not found after creation".into())
234        })?;
235
236    let mut desc = build_table_description(&meta, Some(0), Some(0));
237    // CreateTable response shows CREATING status (table is usable immediately
238    // but DynamoDB API contract says newly-created tables start as CREATING)
239    desc.table_status = "CREATING".to_string();
240
241    // Override billing mode fields based on the actual request
242    let billing_mode_str = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
243    if billing_mode_str == "PROVISIONED" {
244        desc.billing_mode_summary = None;
245        desc.table_throughput_mode_summary = None;
246    } else if billing_mode_str == "PAY_PER_REQUEST" {
247        desc.billing_mode_summary = Some(crate::actions::BillingModeSummary {
248            billing_mode: "PAY_PER_REQUEST".to_string(),
249            last_update_to_pay_per_request_date_time: None,
250        });
251        desc.table_throughput_mode_summary = Some(crate::actions::TableThroughputModeSummary {
252            table_throughput_mode: "PAY_PER_REQUEST".to_string(),
253            last_update_to_pay_per_request_date_time: None,
254        });
255        // Ensure provisioned throughput shows zeros for PAY_PER_REQUEST
256        desc.provisioned_throughput = Some(crate::actions::TableProvisionedThroughputDescription {
257            read_capacity_units: 0,
258            write_capacity_units: 0,
259            number_of_decreases_today: 0,
260            last_increase_date_time: None,
261            last_decrease_date_time: None,
262        });
263    }
264
265    // Set all GSI statuses to CREATING for newly created tables
266    if let Some(ref mut gsis) = desc.global_secondary_indexes {
267        for gsi in gsis {
268            gsi.index_status = "CREATING".to_string();
269        }
270    }
271
272    // Remove DeletionProtectionEnabled from response if not explicitly set
273    // (DynamoDB doesn't include it in basic CreateTable response)
274    if request.deletion_protection_enabled.is_none() {
275        desc.deletion_protection_enabled = None;
276    }
277
278    Ok(CreateTableResponse {
279        table_description: desc,
280    })
281}
282
283/// Convert a String error to DynoxideError::ValidationException.
284fn ve(msg: String) -> DynoxideError {
285    DynoxideError::ValidationException(msg)
286}
287
288/// Validate a programmatically-constructed request (used when not deserialised from JSON).
289///
290/// The validation order matches DynamoDB's actual behaviour (as verified by the Dynalite
291/// conformance suite):
292///
293/// 1. Table name (missing, length, pattern)
294/// 2. BillingMode + ProvisionedThroughput consistency
295/// 3. ProvisionedThroughput out-of-bounds
296/// 4. Missing ProvisionedThroughput (default PROVISIONED billing)
297/// 5. Key attribute definition checks ("Invalid KeySchema" / detailed missing-attr message)
298/// 6. Key schema structure (duplicate names, wrong types)
299/// 7. Empty LSI/GSI lists
300/// 8. LSI/GSI structural validation (key schema, projections, duplicates, limits)
301/// 9. Cross-index duplicate names
302/// 10. Attribute definition count mismatch
303fn validate_typed_request(request: &CreateTableRequest) -> Result<()> {
304    if request.table_name.is_empty() {
305        return Err(DynoxideError::ValidationException(
306            "The parameter 'TableName' is required but was not present in the request".to_string(),
307        ));
308    }
309    if request.table_name.len() < 3 || request.table_name.len() > 255 {
310        return Err(DynoxideError::ValidationException(
311            "TableName must be at least 3 characters long and at most 255 characters long"
312                .to_string(),
313        ));
314    }
315
316    // Table name pattern
317    if !request
318        .table_name
319        .chars()
320        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
321    {
322        return Err(DynoxideError::ValidationException(format!(
323            "1 validation error detected: Value '{}' at 'tableName' failed to satisfy constraint: \
324             Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+",
325            request.table_name
326        )));
327    }
328
329    // BillingMode + ProvisionedThroughput consistency
330    let billing_mode_str = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
331    if billing_mode_str == "PAY_PER_REQUEST" && request.provisioned_throughput.is_some() {
332        return Err(DynoxideError::ValidationException(
333            "One or more parameter values were invalid: Neither ReadCapacityUnits nor \
334             WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST"
335                .to_string(),
336        ));
337    }
338
339    // ProvisionedThroughput out-of-bounds
340    if let Some(ref pt) = request.provisioned_throughput {
341        const MAX_THROUGHPUT: i64 = 1_000_000_000_000;
342        let rcu = pt.read_capacity_units.unwrap_or(0);
343        let wcu = pt.write_capacity_units.unwrap_or(0);
344        if rcu > MAX_THROUGHPUT {
345            return Err(DynoxideError::ValidationException(format!(
346                "Given value {} for ReadCapacityUnits is out of bounds",
347                rcu
348            )));
349        }
350        if wcu > MAX_THROUGHPUT {
351            return Err(DynoxideError::ValidationException(format!(
352                "Given value {} for WriteCapacityUnits is out of bounds",
353                wcu
354            )));
355        }
356    }
357
358    // Missing ProvisionedThroughput when billing mode is explicitly PROVISIONED.
359    // For the programmatic API, when BillingMode is not specified we default to
360    // PAY_PER_REQUEST for convenience. The HTTP/JSON path (validate_raw_and_build)
361    // applies the stricter DynamoDB default of PROVISIONED.
362    if request.billing_mode.is_some()
363        && billing_mode_str == "PROVISIONED"
364        && request.provisioned_throughput.is_none()
365    {
366        return Err(DynoxideError::ValidationException(
367            "One or more parameter values were invalid: ReadCapacityUnits and \
368             WriteCapacityUnits must both be specified when BillingMode is PROVISIONED"
369                .to_string(),
370        ));
371    }
372
373    // Key attribute definition checks (before key schema structure)
374    validate_key_attrs_in_defs(&request.key_schema, &request.attribute_definitions).map_err(ve)?;
375
376    // Key schema structure
377    validate_key_schema_structure(&request.key_schema).map_err(ve)?;
378
379    // Empty LSI/GSI lists (before structural validation and attr count)
380    if let Some(ref lsis) = request.local_secondary_indexes {
381        if lsis.is_empty() {
382            return Err(ve(
383                "One or more parameter values were invalid: List of LocalSecondaryIndexes is empty"
384                    .to_string(),
385            ));
386        }
387    }
388    if let Some(ref gsis) = request.global_secondary_indexes {
389        if gsis.is_empty() {
390            return Err(ve(
391                "One or more parameter values were invalid: List of GlobalSecondaryIndexes is empty"
392                    .to_string(),
393            ));
394        }
395    }
396
397    // LSI structural validation
398    if let Some(ref lsis) = request.local_secondary_indexes {
399        validate_lsi_list(lsis, &request.key_schema, &request.attribute_definitions).map_err(ve)?;
400    }
401
402    // GSI structural validation
403    if let Some(ref gsis) = request.global_secondary_indexes {
404        let bm = request.billing_mode.as_deref().unwrap_or("PROVISIONED");
405        validate_gsi_list(gsis, &request.attribute_definitions, bm).map_err(ve)?;
406    }
407
408    // Cross-index duplicate names (checked before attr def count)
409    check_cross_index_duplicates(
410        &request.local_secondary_indexes,
411        &request.global_secondary_indexes,
412    )
413    .map_err(ve)?;
414
415    // Attribute definition count (last)
416    validate_attr_def_count(
417        &request.key_schema,
418        &request.attribute_definitions,
419        &request.local_secondary_indexes,
420        &request.global_secondary_indexes,
421    )
422    .map_err(ve)?;
423
424    Ok(())
425}
426
427fn check_cross_index_duplicates(
428    lsis: &Option<Vec<LocalSecondaryIndex>>,
429    gsis: &Option<Vec<GlobalSecondaryIndex>>,
430) -> std::result::Result<(), String> {
431    if let (Some(lsis), Some(gsis)) = (lsis, gsis) {
432        let mut all_names = std::collections::HashSet::new();
433        for lsi in lsis {
434            all_names.insert(&lsi.index_name);
435        }
436        for gsi in gsis {
437            if !all_names.insert(&gsi.index_name) {
438                return Err(format!(
439                    "One or more parameter values were invalid: Duplicate index name: {}",
440                    gsi.index_name
441                ));
442            }
443        }
444    }
445    Ok(())
446}
447
448// ---- Raw JSON validation (for deserialization path) ----
449
450fn validate_raw_and_build(raw: RawRequest) -> std::result::Result<CreateTableRequest, String> {
451    // Missing TableName is a different error format from invalid TableName
452    if raw.table_name.is_none() {
453        return Err(
454            "The parameter 'TableName' is required but was not present in the request".to_string(),
455        );
456    }
457
458    // Use the shared constraint error collector for table name validation.
459    // This produces the correct multi-field constraint format for empty,
460    // too-short, too-long, or invalid-pattern table names.
461    let name_errors = crate::validation::table_name_constraint_errors(
462        raw.table_name.as_deref(),
463        crate::validation::TableNameContext::CreateTable,
464    );
465    if !name_errors.is_empty() {
466        let msg = format!(
467            "{} validation error{} detected: {}",
468            name_errors.len(),
469            if name_errors.len() > 1 { "s" } else { "" },
470            name_errors.join("; ")
471        );
472        return Err(msg);
473    }
474    let table_name = raw.table_name.unwrap();
475
476    let mut errors = Vec::new();
477
478    if let Some(ref bm) = raw.billing_mode {
479        if bm != "PROVISIONED" && bm != "PAY_PER_REQUEST" {
480            errors.push(format!(
481                "Value '{}' at 'billingMode' failed to satisfy constraint: \
482                 Member must satisfy enum value set: [PROVISIONED, PAY_PER_REQUEST]",
483                bm
484            ));
485        }
486    }
487
488    collect_pt_errors(&raw.provisioned_throughput, &mut errors);
489    collect_ks_errors(&raw.key_schema, &mut errors);
490    collect_ad_errors(&raw.attribute_definitions, &mut errors);
491    collect_lsi_errors(&raw.local_secondary_indexes, &mut errors);
492    collect_gsi_errors(&raw.global_secondary_indexes, &mut errors);
493
494    // DynamoDB caps multi-field constraint errors at 10
495    errors.truncate(10);
496
497    if !errors.is_empty() {
498        let prefix = format!(
499            "{} validation error{} detected: ",
500            errors.len(),
501            if errors.len() == 1 { "" } else { "s" }
502        );
503        return Err(format!("{}{}", prefix, errors.join("; ")));
504    }
505
506    // BillingMode + ProvisionedThroughput consistency (HTTP path only)
507    let billing_mode_str = raw.billing_mode.as_deref().unwrap_or("PROVISIONED");
508    if billing_mode_str == "PAY_PER_REQUEST" && raw.provisioned_throughput.is_some() {
509        return Err(
510            "One or more parameter values were invalid: Neither ReadCapacityUnits nor \
511             WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST"
512                .to_string(),
513        );
514    }
515
516    // ProvisionedThroughput out-of-bounds (after multi-field but before struct checks)
517    if let Some(ref pt) = raw.provisioned_throughput {
518        if let Some(obj) = pt.as_object() {
519            let rcu = obj
520                .get("ReadCapacityUnits")
521                .and_then(|v| v.as_i64())
522                .unwrap_or(0);
523            let wcu = obj
524                .get("WriteCapacityUnits")
525                .and_then(|v| v.as_i64())
526                .unwrap_or(0);
527            const MAX_THROUGHPUT: i64 = 1_000_000_000_000;
528            if rcu > MAX_THROUGHPUT {
529                return Err(format!(
530                    "Given value {} for ReadCapacityUnits is out of bounds",
531                    rcu
532                ));
533            }
534            if wcu > MAX_THROUGHPUT {
535                return Err(format!(
536                    "Given value {} for WriteCapacityUnits is out of bounds",
537                    wcu
538                ));
539            }
540        }
541    }
542
543    // Missing ProvisionedThroughput when BillingMode is explicitly PROVISIONED.
544    if raw.billing_mode.as_deref() == Some("PROVISIONED") && raw.provisioned_throughput.is_none() {
545        return Err(
546            "One or more parameter values were invalid: ReadCapacityUnits and \
547             WriteCapacityUnits must both be specified when BillingMode is PROVISIONED"
548                .to_string(),
549        );
550    }
551
552    // Parse typed structures
553    let key_schema: Vec<KeySchemaElement> = raw
554        .key_schema
555        .as_ref()
556        .map(|v| serde_json::from_value(v.clone()))
557        .transpose()
558        .map_err(|e| e.to_string())?
559        .unwrap_or_default();
560    let attribute_definitions: Vec<AttributeDefinition> = raw
561        .attribute_definitions
562        .as_ref()
563        .map(|v| serde_json::from_value(v.clone()))
564        .transpose()
565        .map_err(|e| e.to_string())?
566        .unwrap_or_default();
567    let provisioned_throughput: Option<ProvisionedThroughput> = raw
568        .provisioned_throughput
569        .as_ref()
570        .map(|v| serde_json::from_value(v.clone()))
571        .transpose()
572        .map_err(|e| e.to_string())?;
573    let global_secondary_indexes: Option<Vec<GlobalSecondaryIndex>> = raw
574        .global_secondary_indexes
575        .as_ref()
576        .map(|v| serde_json::from_value(v.clone()))
577        .transpose()
578        .map_err(|e| e.to_string())?;
579    let local_secondary_indexes: Option<Vec<LocalSecondaryIndex>> = raw
580        .local_secondary_indexes
581        .as_ref()
582        .map(|v| serde_json::from_value(v.clone()))
583        .transpose()
584        .map_err(|e| e.to_string())?;
585
586    Ok(CreateTableRequest {
587        table_name,
588        key_schema,
589        attribute_definitions,
590        global_secondary_indexes,
591        local_secondary_indexes,
592        billing_mode: raw.billing_mode,
593        provisioned_throughput,
594        stream_specification: raw.stream_specification,
595        sse_specification: raw.sse_specification,
596        table_class: raw.table_class,
597        tags: raw.tags,
598        deletion_protection_enabled: raw.deletion_protection_enabled,
599        on_demand_throughput: raw.on_demand_throughput,
600    })
601}
602
603// ---- Multi-field constraint error collectors ----
604
605fn collect_pt_errors(pt_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
606    if let Some(v) = pt_val {
607        if let Some(obj) = v.as_object() {
608            let wcu = obj.get("WriteCapacityUnits");
609            let rcu = obj.get("ReadCapacityUnits");
610            if wcu.is_none() || wcu == Some(&serde_json::Value::Null) {
611                errors.push("Value null at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
612            } else if let Some(w) = wcu.and_then(|v| v.as_i64()) {
613                if w < 1 {
614                    errors.push(format!("Value '{}' at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w));
615                }
616            }
617            if rcu.is_none() || rcu == Some(&serde_json::Value::Null) {
618                errors.push("Value null at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
619            } else if let Some(r) = rcu.and_then(|v| v.as_i64()) {
620                if r < 1 {
621                    errors.push(format!("Value '{}' at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r));
622                }
623            }
624        }
625    }
626}
627
628fn collect_ks_errors(ks_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
629    match ks_val {
630        None => {
631            errors.push(
632                "Value null at 'keySchema' failed to satisfy constraint: Member must not be null"
633                    .to_string(),
634            );
635        }
636        Some(v) => {
637            if let Some(arr) = v.as_array() {
638                if arr.is_empty() {
639                    errors.push("Value '[]' at 'keySchema' failed to satisfy constraint: Member must have length greater than or equal to 1".to_string());
640                } else if arr.len() > 2 {
641                    let dump = render_key_schema_java_toString(arr);
642                    errors.push(format!("Value '{}' at 'keySchema' failed to satisfy constraint: Member must have length less than or equal to 2", dump));
643                }
644                for (i, elem) in arr.iter().enumerate().take(10) {
645                    collect_ks_elem_errors(elem, i + 1, errors);
646                }
647            }
648        }
649    }
650}
651
652fn collect_ks_elem_errors(elem: &serde_json::Value, idx: usize, errors: &mut Vec<String>) {
653    if let Some(obj) = elem.as_object() {
654        if !obj.contains_key("AttributeName")
655            || obj.get("AttributeName") == Some(&serde_json::Value::Null)
656        {
657            errors.push(format!("Value null at 'keySchema.{}.member.attributeName' failed to satisfy constraint: Member must not be null", idx));
658        }
659        let kt = obj.get("KeyType");
660        if kt.is_none() || kt == Some(&serde_json::Value::Null) {
661            errors.push(format!("Value null at 'keySchema.{}.member.keyType' failed to satisfy constraint: Member must not be null", idx));
662        } else if let Some(s) = kt.and_then(|v| v.as_str()) {
663            if s != "HASH" && s != "RANGE" {
664                errors.push(format!("Value '{}' at 'keySchema.{}.member.keyType' failed to satisfy constraint: Member must satisfy enum value set: [HASH, RANGE]", s, idx));
665            }
666        }
667    }
668}
669
670/// Render a KeySchema array the way AWS does in validation messages: Java's
671/// default toString() shape over the SDK's KeySchemaElement model, e.g.
672/// `[KeySchemaElement(attributeName=pk, keyType=HASH), KeySchemaElement(attributeName=sk, keyType=RANGE)]`.
673/// Missing attribute fields render as the empty string, matching what AWS
674/// emits when the upstream object hasn't been populated.
675#[allow(non_snake_case)]
676fn render_key_schema_java_toString(arr: &[serde_json::Value]) -> String {
677    let parts: Vec<String> = arr
678        .iter()
679        .map(|elem| {
680            let an = elem
681                .get("AttributeName")
682                .and_then(|v| v.as_str())
683                .unwrap_or("");
684            let kt = elem.get("KeyType").and_then(|v| v.as_str()).unwrap_or("");
685            format!("KeySchemaElement(attributeName={an}, keyType={kt})")
686        })
687        .collect();
688    format!("[{}]", parts.join(", "))
689}
690
691fn collect_ad_errors(ad_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
692    match ad_val {
693        None => {
694            errors.push("Value null at 'attributeDefinitions' failed to satisfy constraint: Member must not be null".to_string());
695        }
696        Some(v) => {
697            if let Some(arr) = v.as_array() {
698                for (i, elem) in arr.iter().enumerate() {
699                    if let Some(obj) = elem.as_object() {
700                        if !obj.contains_key("AttributeName")
701                            || obj.get("AttributeName") == Some(&serde_json::Value::Null)
702                        {
703                            errors.push(format!("Value null at 'attributeDefinitions.{}.member.attributeName' failed to satisfy constraint: Member must not be null", i + 1));
704                        }
705                        let at = obj.get("AttributeType");
706                        if at.is_none() || at == Some(&serde_json::Value::Null) {
707                            errors.push(format!("Value null at 'attributeDefinitions.{}.member.attributeType' failed to satisfy constraint: Member must not be null", i + 1));
708                        } else if let Some(s) = at.and_then(|v| v.as_str()) {
709                            if s != "S" && s != "N" && s != "B" {
710                                errors.push(format!("Value '{}' at 'attributeDefinitions.{}.member.attributeType' failed to satisfy constraint: Member must satisfy enum value set: [B, N, S]", s, i + 1));
711                            }
712                        }
713                    }
714                }
715            }
716        }
717    }
718}
719
720fn collect_lsi_errors(lsi_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
721    if let Some(v) = lsi_val {
722        if let Some(arr) = v.as_array() {
723            for (i, elem) in arr.iter().enumerate().take(10) {
724                if let Some(obj) = elem.as_object() {
725                    // Order: indexName, keySchema, projection
726                    if !obj.contains_key("IndexName")
727                        || obj.get("IndexName") == Some(&serde_json::Value::Null)
728                    {
729                        errors.push(format!("Value null at 'localSecondaryIndexes.{}.member.indexName' failed to satisfy constraint: Member must not be null", i + 1));
730                    } else if let Some(name) = obj.get("IndexName").and_then(|v| v.as_str()) {
731                        collect_idx_name_errors(name, "localSecondaryIndexes", i + 1, errors);
732                    }
733                    if !obj.contains_key("KeySchema")
734                        || obj.get("KeySchema") == Some(&serde_json::Value::Null)
735                    {
736                        errors.push(format!("Value null at 'localSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must not be null", i + 1));
737                    } else if let Some(ks) = obj.get("KeySchema").and_then(|v| v.as_array()) {
738                        if ks.is_empty() {
739                            errors.push(format!("Value '[]' at 'localSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must have length greater than or equal to 1", i + 1));
740                        }
741                    }
742                    if !obj.contains_key("Projection")
743                        || obj.get("Projection") == Some(&serde_json::Value::Null)
744                    {
745                        errors.push(format!("Value null at 'localSecondaryIndexes.{}.member.projection' failed to satisfy constraint: Member must not be null", i + 1));
746                    } else if let Some(p) = obj.get("Projection").and_then(|v| v.as_object()) {
747                        collect_proj_errors(p, &format!("localSecondaryIndexes.{}", i + 1), errors);
748                    }
749                }
750            }
751        }
752    }
753}
754
755fn collect_gsi_errors(gsi_val: &Option<serde_json::Value>, errors: &mut Vec<String>) {
756    if let Some(v) = gsi_val {
757        if let Some(arr) = v.as_array() {
758            for (i, elem) in arr.iter().enumerate().take(10) {
759                if let Some(obj) = elem.as_object() {
760                    // Order for GSI: keySchema, projection, indexName
761                    if !obj.contains_key("KeySchema")
762                        || obj.get("KeySchema") == Some(&serde_json::Value::Null)
763                    {
764                        errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must not be null", i + 1));
765                    } else if let Some(ks) = obj.get("KeySchema").and_then(|v| v.as_array()) {
766                        if ks.is_empty() {
767                            errors.push(format!("Value '[]' at 'globalSecondaryIndexes.{}.member.keySchema' failed to satisfy constraint: Member must have length greater than or equal to 1", i + 1));
768                        }
769                    }
770                    if !obj.contains_key("Projection")
771                        || obj.get("Projection") == Some(&serde_json::Value::Null)
772                    {
773                        errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.projection' failed to satisfy constraint: Member must not be null", i + 1));
774                    } else if let Some(p) = obj.get("Projection").and_then(|v| v.as_object()) {
775                        collect_proj_errors(
776                            p,
777                            &format!("globalSecondaryIndexes.{}", i + 1),
778                            errors,
779                        );
780                    }
781                    if !obj.contains_key("IndexName")
782                        || obj.get("IndexName") == Some(&serde_json::Value::Null)
783                    {
784                        errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.indexName' failed to satisfy constraint: Member must not be null", i + 1));
785                    } else if let Some(name) = obj.get("IndexName").and_then(|v| v.as_str()) {
786                        collect_idx_name_errors(name, "globalSecondaryIndexes", i + 1, errors);
787                    }
788                    // GSI ProvisionedThroughput
789                    if let Some(pt) = obj.get("ProvisionedThroughput").and_then(|v| v.as_object()) {
790                        let wcu = pt.get("WriteCapacityUnits");
791                        let rcu = pt.get("ReadCapacityUnits");
792                        if let Some(w) = wcu.and_then(|v| v.as_i64()) {
793                            if w < 1 {
794                                errors.push(format!("Value '{}' at 'globalSecondaryIndexes.{}.member.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w, i + 1));
795                            }
796                        } else if wcu.is_none() || wcu == Some(&serde_json::Value::Null) {
797                            errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
798                        }
799                        if let Some(r) = rcu.and_then(|v| v.as_i64()) {
800                            if r < 1 {
801                                errors.push(format!("Value '{}' at 'globalSecondaryIndexes.{}.member.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r, i + 1));
802                            }
803                        } else if rcu.is_none() || rcu == Some(&serde_json::Value::Null) {
804                            errors.push(format!("Value null at 'globalSecondaryIndexes.{}.member.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
805                        }
806                    }
807                }
808            }
809        }
810    }
811}
812
813fn collect_idx_name_errors(name: &str, prefix: &str, idx: usize, errors: &mut Vec<String>) {
814    if !name
815        .chars()
816        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
817    {
818        errors.push(format!("Value '{}' at '{}.{}.member.indexName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", name, prefix, idx));
819    }
820    if name.len() < 3 {
821        errors.push(format!("Value '{}' at '{}.{}.member.indexName' failed to satisfy constraint: Member must have length greater than or equal to 3", name, prefix, idx));
822    }
823    if name.len() > 255 {
824        errors.push(format!("Value '{}' at '{}.{}.member.indexName' failed to satisfy constraint: Member must have length less than or equal to 255", name, prefix, idx));
825    }
826}
827
828fn collect_proj_errors(
829    proj: &serde_json::Map<String, serde_json::Value>,
830    prefix: &str,
831    errors: &mut Vec<String>,
832) {
833    if let Some(pt) = proj.get("ProjectionType") {
834        if let Some(s) = pt.as_str() {
835            if s != "ALL" && s != "KEYS_ONLY" && s != "INCLUDE" {
836                errors.push(format!("Value '{}' at '{}.member.projection.projectionType' failed to satisfy constraint: Member must satisfy enum value set: [ALL, INCLUDE, KEYS_ONLY]", s, prefix));
837            }
838        }
839    }
840    if let Some(nka) = proj.get("NonKeyAttributes") {
841        if let Some(arr) = nka.as_array() {
842            if arr.is_empty() {
843                errors.push(format!("Value '[]' at '{}.member.projection.nonKeyAttributes' failed to satisfy constraint: Member must have length greater than or equal to 1", prefix));
844            }
845        }
846    }
847}
848
849// ---- Structural validation helpers ----
850
851fn validate_key_schema_structure(ks: &[KeySchemaElement]) -> std::result::Result<(), String> {
852    if ks.is_empty() {
853        return Err("1 validation error detected: Value null at 'keySchema' failed to satisfy constraint: Member must have length less than or equal to 2".to_string());
854    }
855    if ks[0].key_type != KeyType::HASH {
856        return Err(
857            "Invalid KeySchema: The first KeySchemaElement is not a HASH key type".to_string(),
858        );
859    }
860    if ks.len() == 2 && ks[0].attribute_name == ks[1].attribute_name {
861        return Err(
862            "Both the Hash Key and the Range Key element in the KeySchema have the same name"
863                .to_string(),
864        );
865    }
866    if ks.len() == 2 && ks[1].key_type != KeyType::RANGE {
867        return Err(
868            "Invalid KeySchema: The second KeySchemaElement is not a RANGE key type".to_string(),
869        );
870    }
871    Ok(())
872}
873
874fn validate_key_attrs_in_defs(
875    ks: &[KeySchemaElement],
876    defs: &[AttributeDefinition],
877) -> std::result::Result<(), String> {
878    // Collect missing key attribute names
879    let missing: Vec<&str> = ks
880        .iter()
881        .filter(|k| !defs.iter().any(|d| d.attribute_name == k.attribute_name))
882        .map(|k| k.attribute_name.as_str())
883        .collect();
884
885    if missing.is_empty() {
886        // Even if no keys are missing, check for structural issues (dup names/types)
887        // which DynamoDB reports as generic "no definition" when defs exist
888        let has_dup_names = ks.len() == 2 && ks[0].attribute_name == ks[1].attribute_name;
889        if has_dup_names {
890            return Err(
891                "Invalid KeySchema: Some index key attribute have no definition".to_string(),
892            );
893        }
894        return Ok(());
895    }
896
897    // Use generic message when:
898    // - defs is empty (fewer defs than unique key attrs)
899    // - key schema has 2 elements (regardless of structural validity)
900    // - key schema has structural issues (dup names, dup types)
901    // Use detailed message only when defs is non-empty AND key schema has 1 element
902    let has_dup_names = ks.len() == 2 && ks[0].attribute_name == ks[1].attribute_name;
903    let has_dup_types = ks.len() == 2 && ks[0].key_type == ks[1].key_type;
904    let use_generic = defs.is_empty() || ks.len() >= 2 || has_dup_names || has_dup_types;
905
906    if use_generic {
907        return Err("Invalid KeySchema: Some index key attribute have no definition".to_string());
908    }
909
910    // Detailed message for single-key schema with non-empty defs
911    let key_names: Vec<&str> = missing.to_vec();
912    let def_names: Vec<&str> = defs.iter().map(|d| d.attribute_name.as_str()).collect();
913    Err(format!(
914        "One or more parameter values were invalid: Some index key attributes are not defined in \
915         AttributeDefinitions. Keys: [{}], AttributeDefinitions: [{}]",
916        key_names.join(", "),
917        def_names.join(", ")
918    ))
919}
920
921fn validate_attr_def_count(
922    ks: &[KeySchemaElement],
923    defs: &[AttributeDefinition],
924    lsis: &Option<Vec<LocalSecondaryIndex>>,
925    gsis: &Option<Vec<GlobalSecondaryIndex>>,
926) -> std::result::Result<(), String> {
927    let mut all_key_attrs = std::collections::HashSet::new();
928    for k in ks {
929        all_key_attrs.insert(k.attribute_name.as_str());
930    }
931    if let Some(lsis) = lsis {
932        for lsi in lsis {
933            for k in &lsi.key_schema {
934                all_key_attrs.insert(k.attribute_name.as_str());
935            }
936        }
937    }
938    if let Some(gsis) = gsis {
939        for gsi in gsis {
940            for k in &gsi.key_schema {
941                all_key_attrs.insert(k.attribute_name.as_str());
942            }
943        }
944    }
945    if defs.len() != all_key_attrs.len() {
946        return Err("One or more parameter values were invalid: Number of attributes in KeySchema does not exactly match number of attributes defined in AttributeDefinitions".to_string());
947    }
948    Ok(())
949}
950
951fn validate_lsi_list(
952    lsis: &[LocalSecondaryIndex],
953    ks: &[KeySchemaElement],
954    defs: &[AttributeDefinition],
955) -> std::result::Result<(), String> {
956    // Empty check is done earlier in validate_typed_request
957
958    if !ks.iter().any(|k| k.key_type == KeyType::RANGE) {
959        return Err("One or more parameter values were invalid: Table KeySchema does not have a range key, which is required when specifying a LocalSecondaryIndex".to_string());
960    }
961
962    // Check missing attribute definitions across all LSI keys
963    let def_names: Vec<&str> = defs.iter().map(|d| d.attribute_name.as_str()).collect();
964    let mut missing_keys = Vec::new();
965    for lsi in lsis {
966        for k in &lsi.key_schema {
967            if !def_names.contains(&k.attribute_name.as_str())
968                && !missing_keys.contains(&k.attribute_name.as_str())
969            {
970                missing_keys.push(k.attribute_name.as_str());
971            }
972        }
973    }
974    if !missing_keys.is_empty() {
975        let mut all_keys = Vec::new();
976        for lsi in lsis {
977            for k in &lsi.key_schema {
978                if !all_keys.contains(&k.attribute_name.as_str()) {
979                    all_keys.push(k.attribute_name.as_str());
980                }
981            }
982        }
983        return Err(format!(
984            "One or more parameter values were invalid: Some index key attributes are not defined in AttributeDefinitions. Keys: [{}], AttributeDefinitions: [{}]",
985            all_keys.join(", "),
986            def_names.join(", ")
987        ));
988    }
989
990    // Structural validation for each LSI
991    for lsi in lsis {
992        validate_lsi_structure(lsi, ks)?;
993    }
994
995    // Duplicate index names
996    let mut seen = std::collections::HashSet::new();
997    for lsi in lsis {
998        if !seen.insert(&lsi.index_name) {
999            return Err(format!(
1000                "One or more parameter values were invalid: Duplicate index name: {}",
1001                lsi.index_name
1002            ));
1003        }
1004    }
1005
1006    // Count limit
1007    if lsis.len() > 5 {
1008        return Err("One or more parameter values were invalid: Number of LocalSecondaryIndexes exceeds per-table limit of 5".to_string());
1009    }
1010
1011    Ok(())
1012}
1013
1014fn validate_gsi_list(
1015    gsis: &[GlobalSecondaryIndex],
1016    defs: &[AttributeDefinition],
1017    bm: &str,
1018) -> std::result::Result<(), String> {
1019    // Empty check is done earlier in validate_typed_request
1020
1021    // Check missing attribute definitions across all GSI keys
1022    let def_names: Vec<&str> = defs.iter().map(|d| d.attribute_name.as_str()).collect();
1023    let mut missing_keys = Vec::new();
1024    for gsi in gsis {
1025        for k in &gsi.key_schema {
1026            if !def_names.contains(&k.attribute_name.as_str())
1027                && !missing_keys.contains(&k.attribute_name.as_str())
1028            {
1029                missing_keys.push(k.attribute_name.as_str());
1030            }
1031        }
1032    }
1033    if !missing_keys.is_empty() {
1034        let mut all_keys = Vec::new();
1035        for gsi in gsis {
1036            for k in &gsi.key_schema {
1037                if !all_keys.contains(&k.attribute_name.as_str()) {
1038                    all_keys.push(k.attribute_name.as_str());
1039                }
1040            }
1041        }
1042        return Err(format!(
1043            "One or more parameter values were invalid: Some index key attributes are not defined in AttributeDefinitions. Keys: [{}], AttributeDefinitions: [{}]",
1044            all_keys.join(", "),
1045            def_names.join(", ")
1046        ));
1047    }
1048
1049    // Structural validation for each GSI
1050    for gsi in gsis {
1051        validate_gsi_structure(gsi)?;
1052    }
1053
1054    // Duplicate index names
1055    let mut seen = std::collections::HashSet::new();
1056    for gsi in gsis {
1057        if !seen.insert(&gsi.index_name) {
1058            return Err(format!(
1059                "One or more parameter values were invalid: Duplicate index name: {}",
1060                gsi.index_name
1061            ));
1062        }
1063    }
1064
1065    // Count limit
1066    if gsis.len() > 20 {
1067        return Err("One or more parameter values were invalid: GlobalSecondaryIndex count exceeds the per-table limit of 20".to_string());
1068    }
1069
1070    // PAY_PER_REQUEST billing mode check
1071    if bm == "PAY_PER_REQUEST" {
1072        for gsi in gsis {
1073            if gsi.provisioned_throughput.is_some() {
1074                return Err(format!(
1075                    "One or more parameter values were invalid: ProvisionedThroughput should not be specified for index: {} when BillingMode is PAY_PER_REQUEST",
1076                    gsi.index_name
1077                ));
1078            }
1079        }
1080    }
1081
1082    Ok(())
1083}
1084
1085fn validate_lsi_structure(
1086    lsi: &LocalSecondaryIndex,
1087    table_ks: &[KeySchemaElement],
1088) -> std::result::Result<(), String> {
1089    // Key schema structure first
1090    validate_key_schema_structure(&lsi.key_schema)?;
1091
1092    // Range key presence (before projection, per DynamoDB ordering)
1093    let lsi_sk = lsi.key_schema.iter().find(|k| k.key_type == KeyType::RANGE);
1094    if lsi_sk.is_none() {
1095        return Err(format!(
1096            "One or more parameter values were invalid: Index KeySchema does not have a range key for index: {}",
1097            lsi.index_name
1098        ));
1099    }
1100
1101    // Hash key must match table hash key (before projection)
1102    let table_pk = table_ks
1103        .iter()
1104        .find(|k| k.key_type == KeyType::HASH)
1105        .map(|k| k.attribute_name.as_str());
1106    let lsi_pk = lsi
1107        .key_schema
1108        .iter()
1109        .find(|k| k.key_type == KeyType::HASH)
1110        .map(|k| k.attribute_name.as_str());
1111    if lsi_pk != table_pk {
1112        return Err(format!(
1113            "One or more parameter values were invalid: \
1114             Index KeySchema does not have the same leading hash key as table KeySchema \
1115             for index: {}. index hash key: {}, table hash key: {}",
1116            lsi.index_name,
1117            lsi_pk.unwrap_or("null"),
1118            table_pk.unwrap_or("null")
1119        ));
1120    }
1121
1122    // Projection (after range key and hash key checks)
1123    validate_proj_structure(&lsi.projection)?;
1124
1125    Ok(())
1126}
1127
1128fn validate_gsi_structure(gsi: &GlobalSecondaryIndex) -> std::result::Result<(), String> {
1129    validate_key_schema_structure(&gsi.key_schema)?;
1130    validate_proj_structure(&gsi.projection)?;
1131    Ok(())
1132}
1133
1134fn validate_proj_structure(p: &Projection) -> std::result::Result<(), String> {
1135    match &p.projection_type {
1136        None => Err(
1137            "One or more parameter values were invalid: Unknown ProjectionType: null".to_string(),
1138        ),
1139        Some(pt) => {
1140            if let Some(ref nka) = p.non_key_attributes {
1141                match pt {
1142                    ProjectionType::ALL => return Err("One or more parameter values were invalid: ProjectionType is ALL, but NonKeyAttributes is specified".to_string()),
1143                    ProjectionType::KEYS_ONLY => return Err("One or more parameter values were invalid: ProjectionType is KEYS_ONLY, but NonKeyAttributes is specified".to_string()),
1144                    ProjectionType::INCLUDE => { if nka.is_empty() { return Err("One or more parameter values were invalid: NonKeyAttributes must not be empty".to_string()); } }
1145                }
1146            }
1147            Ok(())
1148        }
1149    }
1150}