Skip to main content

dynoxide/actions/
update_table.rs

1use crate::actions::create_table::StreamSpecification;
2use crate::actions::{TableDescription, build_table_description};
3use crate::actions::{gsi, helpers};
4use crate::errors::{DynoxideError, Result};
5use crate::storage_backend::StorageBackend;
6use crate::streams;
7use crate::types::{AttributeDefinition, GlobalSecondaryIndex, KeySchemaElement, Projection};
8use crate::validation;
9use serde::{Deserialize, Serialize};
10
11/// Internal raw deserialization struct.
12#[derive(Debug, Default, Deserialize)]
13struct UpdateTableRequestRaw {
14    #[serde(rename = "TableName", default)]
15    table_name: Option<String>,
16
17    #[serde(rename = "AttributeDefinitions", default)]
18    attribute_definitions: Option<Vec<AttributeDefinition>>,
19
20    #[serde(rename = "GlobalSecondaryIndexUpdates", default)]
21    global_secondary_index_updates: Option<Vec<GlobalSecondaryIndexUpdate>>,
22
23    #[serde(rename = "StreamSpecification", default)]
24    stream_specification: Option<StreamSpecification>,
25
26    #[serde(rename = "DeletionProtectionEnabled", default)]
27    deletion_protection_enabled: Option<bool>,
28
29    #[serde(rename = "ProvisionedThroughput", default)]
30    provisioned_throughput: Option<serde_json::Value>,
31
32    #[serde(rename = "BillingMode", default)]
33    billing_mode: Option<String>,
34
35    #[serde(rename = "TableClass", default)]
36    table_class: Option<String>,
37
38    #[serde(rename = "OnDemandThroughput", default)]
39    on_demand_throughput: Option<crate::types::OnDemandThroughput>,
40}
41
42#[derive(Debug, Default)]
43pub struct UpdateTableRequest {
44    pub table_name: String,
45    pub attribute_definitions: Option<Vec<AttributeDefinition>>,
46    pub global_secondary_index_updates: Option<Vec<GlobalSecondaryIndexUpdate>>,
47    pub stream_specification: Option<StreamSpecification>,
48    pub deletion_protection_enabled: Option<bool>,
49    pub provisioned_throughput: Option<serde_json::Value>,
50    pub billing_mode: Option<String>,
51    pub table_class: Option<String>,
52    pub on_demand_throughput: Option<crate::types::OnDemandThroughput>,
53}
54
55impl<'de> serde::Deserialize<'de> for UpdateTableRequest {
56    fn deserialize<D: serde::Deserializer<'de>>(
57        deserializer: D,
58    ) -> std::result::Result<Self, D::Error> {
59        let raw = UpdateTableRequestRaw::deserialize(deserializer)?;
60
61        // Phase 1: Check TableName missing
62        if raw.table_name.is_none() || raw.table_name.as_deref() == Some("") {
63            let msg = if raw.table_name.is_none() {
64                "The parameter 'TableName' is required but was not present in the request"
65            } else {
66                "TableName must be at least 3 characters long and at most 255 characters long"
67            };
68            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
69        }
70
71        let table_name = raw.table_name.unwrap_or_default();
72
73        // Phase 2: Check TableName length
74        if table_name.len() < 3 || table_name.len() > 255 {
75            return Err(serde::de::Error::custom(
76                "VALIDATION:TableName must be at least 3 characters long and at most 255 characters long",
77            ));
78        }
79
80        // Phase 3: Multi-field constraint validation
81        let mut errors = Vec::new();
82
83        if !table_name
84            .chars()
85            .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
86        {
87            errors.push(format!(
88                "Value '{}' at 'tableName' failed to satisfy constraint: \
89                 Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+",
90                table_name
91            ));
92        }
93
94        if let Some(msg) = crate::validation::format_validation_errors(&errors) {
95            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
96        }
97
98        Ok(UpdateTableRequest {
99            table_name,
100            attribute_definitions: raw.attribute_definitions,
101            global_secondary_index_updates: raw.global_secondary_index_updates,
102            stream_specification: raw.stream_specification,
103            deletion_protection_enabled: raw.deletion_protection_enabled,
104            provisioned_throughput: raw.provisioned_throughput,
105            billing_mode: raw.billing_mode,
106            table_class: raw.table_class,
107            on_demand_throughput: raw.on_demand_throughput,
108        })
109    }
110}
111
112#[derive(Debug, Default, Deserialize)]
113pub struct GlobalSecondaryIndexUpdate {
114    #[serde(rename = "Update", default)]
115    pub update: Option<UpdateGsiAction>,
116
117    #[serde(rename = "Create", default)]
118    pub create: Option<CreateGsiAction>,
119
120    #[serde(rename = "Delete", default)]
121    pub delete: Option<DeleteGsiAction>,
122}
123
124#[derive(Debug, Default, Deserialize)]
125pub struct UpdateGsiAction {
126    #[serde(rename = "IndexName")]
127    pub index_name: String,
128
129    #[serde(rename = "ProvisionedThroughput", default)]
130    pub provisioned_throughput: Option<crate::types::ProvisionedThroughput>,
131}
132
133#[derive(Debug, Default, Deserialize)]
134pub struct CreateGsiAction {
135    #[serde(rename = "IndexName")]
136    pub index_name: String,
137
138    #[serde(rename = "KeySchema")]
139    pub key_schema: Vec<KeySchemaElement>,
140
141    #[serde(rename = "Projection")]
142    pub projection: Projection,
143}
144
145#[derive(Debug, Default, Deserialize)]
146pub struct DeleteGsiAction {
147    #[serde(rename = "IndexName")]
148    pub index_name: String,
149}
150
151#[derive(Debug, Default, Serialize)]
152pub struct UpdateTableResponse {
153    #[serde(rename = "TableDescription")]
154    pub table_description: TableDescription,
155}
156
157pub async fn execute<S: StorageBackend>(
158    storage: &S,
159    request: UpdateTableRequest,
160) -> Result<UpdateTableResponse> {
161    // Table name validation is handled in the Deserialize impl
162
163    // Phase 1: Validate request parameters BEFORE table existence check
164    // (DynamoDB validates these first and returns ValidationException,
165    // not ResourceNotFoundException)
166    validate_update_request(&request)?;
167
168    // Phase 2: Table existence check
169    let meta = helpers::require_table(storage, &request.table_name).await?;
170
171    let current_billing_mode = meta.billing_mode.as_deref().unwrap_or("PROVISIONED");
172
173    // Phase 3: Post-table-existence validations
174
175    // PAY_PER_REQUEST table + ProvisionedThroughput update is not allowed
176    if current_billing_mode == "PAY_PER_REQUEST"
177        && request.billing_mode.is_none()
178        && request.provisioned_throughput.is_some()
179    {
180        return Err(DynoxideError::ValidationException(
181            "One or more parameter values were invalid: \
182             Neither ReadCapacityUnits nor WriteCapacityUnits can be \
183             specified when BillingMode is PAY_PER_REQUEST"
184                .to_string(),
185        ));
186    }
187
188    // BillingMode PROVISIONED without ProvisionedThroughput
189    if request.billing_mode.as_deref() == Some("PROVISIONED")
190        && request.provisioned_throughput.is_none()
191    {
192        return Err(DynoxideError::ValidationException(
193            "One or more parameter values were invalid: \
194             ProvisionedThroughput must be specified when BillingMode is PROVISIONED"
195                .to_string(),
196        ));
197    }
198
199    // Same read/write values check
200    if let Some(ref pt) = request.provisioned_throughput {
201        if let Some(obj) = pt.as_object() {
202            let new_rcu = obj
203                .get("ReadCapacityUnits")
204                .and_then(|v| v.as_i64())
205                .unwrap_or(0);
206            let new_wcu = obj
207                .get("WriteCapacityUnits")
208                .and_then(|v| v.as_i64())
209                .unwrap_or(0);
210
211            // Parse current provisioned throughput from metadata
212            let (cur_rcu, cur_wcu) = parse_current_throughput(&meta);
213
214            let billing_mode_unchanged = request.billing_mode.is_none()
215                || (request.billing_mode.as_deref() == Some("PROVISIONED")
216                    && current_billing_mode == "PROVISIONED");
217
218            if new_rcu == cur_rcu && new_wcu == cur_wcu && billing_mode_unchanged {
219                return Err(DynoxideError::ValidationException(format!(
220                    "The provisioned throughput for the table will not change. \
221                     The requested value equals the current value. \
222                     Current ReadCapacityUnits provisioned for the table: {}. \
223                     Requested ReadCapacityUnits: {}. \
224                     Current WriteCapacityUnits provisioned for the table: {}. \
225                     Requested WriteCapacityUnits: {}. \
226                     Refer to the Amazon DynamoDB Developer Guide for current limits \
227                     and how to request higher limits.",
228                    cur_rcu, new_rcu, cur_wcu, new_wcu
229                )));
230            }
231        }
232    }
233
234    // Parse existing GSI definitions
235    let mut current_gsis: Vec<GlobalSecondaryIndex> = meta
236        .gsi_definitions
237        .as_ref()
238        .map(|json| serde_json::from_str(json))
239        .transpose()
240        .map_err(|e| DynoxideError::InternalServerError(format!("Bad GSI JSON: {e}")))?
241        .unwrap_or_default();
242
243    // GSI Update with high capacity on non-existent index
244    if let Some(ref updates) = request.global_secondary_index_updates {
245        for update in updates {
246            if let Some(ref upd) = update.update {
247                if !current_gsis.iter().any(|g| g.index_name == upd.index_name) {
248                    // DynamoDB returns this specific message for GSI updates on
249                    // non-existent indexes (even with out-of-bounds capacity)
250                    return Err(DynoxideError::ValidationException(
251                        "This operation cannot be performed with given input values. \
252                         Please contact DynamoDB service team for more info: \
253                         Action Blocked: IndexUpdate"
254                            .to_string(),
255                    ));
256                }
257            }
258        }
259    }
260
261    // Check GSI update count limit (DynamoDB allows at most 5 per request)
262    if let Some(ref updates) = request.global_secondary_index_updates {
263        if updates.len() > 5 {
264            return Err(DynoxideError::LimitExceededException(
265                "Subscriber limit exceeded: Only 1 online index can be created or \
266                 deleted simultaneously per table"
267                    .to_string(),
268            ));
269        }
270    }
271
272    // Use provided attribute definitions or fall back to existing
273    let existing_attr_defs: Vec<AttributeDefinition> =
274        serde_json::from_str(&meta.attribute_definitions)
275            .map_err(|e| DynoxideError::InternalServerError(format!("Bad attr defs JSON: {e}")))?;
276
277    let attr_defs = request
278        .attribute_definitions
279        .as_ref()
280        .unwrap_or(&existing_attr_defs);
281
282    // Parse table key schema for backfill
283    let key_schema = helpers::parse_key_schema(&meta)?;
284
285    // Validate all GSI updates before making any changes
286    if let Some(ref updates) = request.global_secondary_index_updates {
287        for update in updates {
288            if let Some(ref create) = update.create {
289                if current_gsis
290                    .iter()
291                    .any(|g| g.index_name == create.index_name)
292                {
293                    return Err(DynoxideError::ValidationException(format!(
294                        "One or more parameter values were invalid: \
295                         Index already exists: {}",
296                        create.index_name
297                    )));
298                }
299                let gsi_def = GlobalSecondaryIndex {
300                    index_name: create.index_name.clone(),
301                    key_schema: create.key_schema.clone(),
302                    projection: create.projection.clone(),
303                    provisioned_throughput: None,
304                };
305                validation::validate_gsi(&gsi_def, attr_defs)?;
306            }
307            if let Some(ref delete) = update.delete {
308                if !current_gsis
309                    .iter()
310                    .any(|g| g.index_name == delete.index_name)
311                {
312                    return Err(DynoxideError::ResourceNotFoundException(format!(
313                        "Requested resource not found: Table: {} not found",
314                        delete.index_name
315                    )));
316                }
317            }
318        }
319    }
320
321    // Determine if this is a throughput increase or decrease.
322    // Ensure timestamps strictly increase across successive updates
323    // (the dynalite test expects LastDecreaseDateTime > LastIncreaseDateTime).
324    let now = {
325        use std::sync::atomic::{AtomicU64, Ordering};
326        static LAST_TS: AtomicU64 = AtomicU64::new(0);
327        let wall = web_time::SystemTime::now()
328            .duration_since(web_time::UNIX_EPOCH)
329            .unwrap_or_default()
330            .as_secs_f64();
331        loop {
332            let prev_bits = LAST_TS.load(Ordering::SeqCst);
333            let prev_f = f64::from_bits(prev_bits);
334            let candidate = if wall > prev_f { wall } else { prev_f + 0.001 };
335            let candidate_bits = candidate.to_bits();
336            if LAST_TS
337                .compare_exchange(
338                    prev_bits,
339                    candidate_bits,
340                    Ordering::SeqCst,
341                    Ordering::SeqCst,
342                )
343                .is_ok()
344            {
345                break candidate;
346            }
347        }
348    };
349
350    let (cur_rcu, cur_wcu) = parse_current_throughput(&meta);
351    let is_pt_update = request.provisioned_throughput.is_some();
352    let (new_rcu, new_wcu) = if let Some(ref pt) = request.provisioned_throughput {
353        let obj = pt.as_object();
354        (
355            obj.and_then(|o| o.get("ReadCapacityUnits"))
356                .and_then(|v| v.as_i64())
357                .unwrap_or(0),
358            obj.and_then(|o| o.get("WriteCapacityUnits"))
359                .and_then(|v| v.as_i64())
360                .unwrap_or(0),
361        )
362    } else {
363        (cur_rcu, cur_wcu)
364    };
365
366    let is_increase = new_rcu > cur_rcu || new_wcu > cur_wcu;
367    let is_decrease = new_rcu < cur_rcu || new_wcu < cur_wcu;
368
369    // All validation passed; perform mutations inside a single transaction.
370    helpers::with_write_transaction(storage, async {
371        if let Some(ref updates) = request.global_secondary_index_updates {
372            for update in updates {
373                if let Some(ref create) = update.create {
374                    let gsi_def = GlobalSecondaryIndex {
375                        index_name: create.index_name.clone(),
376                        key_schema: create.key_schema.clone(),
377                        projection: create.projection.clone(),
378                        provisioned_throughput: None,
379                    };
380
381                    storage
382                        .create_gsi_table(&request.table_name, &create.index_name)
383                        .await?;
384
385                    let gsi_p = gsi::gsi_to_def(&gsi_def)?;
386                    backfill_gsi(storage, &request.table_name, &key_schema, &gsi_p).await?;
387
388                    current_gsis.push(gsi_def);
389                }
390
391                if let Some(ref delete) = update.delete {
392                    storage
393                        .drop_gsi_table(&request.table_name, &delete.index_name)
394                        .await?;
395                    current_gsis.retain(|g| g.index_name != delete.index_name);
396                }
397            }
398        }
399
400        // Update metadata
401        let attr_defs_json = serde_json::to_string(attr_defs)
402            .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
403        let gsi_json = if current_gsis.is_empty() {
404            None
405        } else {
406            Some(
407                serde_json::to_string(&current_gsis)
408                    .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?,
409            )
410        };
411
412        storage
413            .update_table_metadata(&request.table_name, &attr_defs_json, gsi_json.as_deref())
414            .await?;
415
416        // Update provisioned throughput if requested
417        if is_pt_update {
418            let prev = parse_stored_throughput(&meta);
419            let mut stored = StoredProvisionedThroughput {
420                read_capacity_units: new_rcu,
421                write_capacity_units: new_wcu,
422                last_increase_date_time: prev.as_ref().and_then(|p| p.last_increase_date_time),
423                last_decrease_date_time: prev.as_ref().and_then(|p| p.last_decrease_date_time),
424                number_of_decreases_today: prev
425                    .as_ref()
426                    .and_then(|p| p.number_of_decreases_today)
427                    .or(Some(0)),
428            };
429            if is_increase {
430                stored.last_increase_date_time = Some(now);
431            }
432            if is_decrease {
433                stored.last_decrease_date_time = Some(now);
434                stored.number_of_decreases_today =
435                    Some(stored.number_of_decreases_today.unwrap_or(0) + 1);
436            }
437            let pt_json = serde_json::to_string(&stored)
438                .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
439            storage
440                .update_provisioned_throughput(&request.table_name, &pt_json)
441                .await?;
442        }
443
444        // Handle deletion protection changes
445        if let Some(enabled) = request.deletion_protection_enabled {
446            storage
447                .update_deletion_protection(&request.table_name, enabled)
448                .await?;
449        }
450
451        // Handle table class changes
452        if let Some(ref table_class) = request.table_class {
453            storage
454                .update_table_class(&request.table_name, table_class)
455                .await?;
456        }
457
458        // Handle on-demand throughput changes
459        if let Some(ref on_demand) = request.on_demand_throughput {
460            let json = serde_json::to_string(on_demand)
461                .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
462            storage
463                .update_on_demand_throughput(&request.table_name, &json)
464                .await?;
465        }
466
467        // Handle billing mode changes
468        if let Some(ref billing_mode) = request.billing_mode {
469            storage
470                .update_billing_mode(&request.table_name, billing_mode)
471                .await?;
472            if billing_mode == "PAY_PER_REQUEST" {
473                // Clear provisioned throughput to avoid stale data
474                storage
475                    .clear_provisioned_throughput(&request.table_name)
476                    .await?;
477            }
478        }
479
480        // Handle stream specification changes
481        if let Some(ref spec) = request.stream_specification {
482            if spec.stream_enabled {
483                let view_type = spec
484                    .stream_view_type
485                    .as_deref()
486                    .unwrap_or("NEW_AND_OLD_IMAGES");
487                let label = streams::generate_stream_label(storage.clock());
488                storage
489                    .enable_stream(&request.table_name, view_type, &label)
490                    .await?;
491            } else {
492                storage.disable_stream(&request.table_name).await?;
493            }
494        }
495
496        Ok(())
497    })
498    .await?;
499
500    // Build response from updated metadata
501    let updated_meta = helpers::require_table(storage, &request.table_name).await?;
502    let mut desc = build_table_description(&updated_meta, Some(0), Some(0));
503
504    // DynamoDB returns UPDATING status during throughput changes
505    if is_pt_update {
506        desc.table_status = "UPDATING".to_string();
507
508        // The immediate response shows the OLD throughput values while the
509        // table is in UPDATING status, but with updated timestamps.
510        let stored = parse_stored_throughput(&updated_meta);
511        if let Some(ref mut pt) = desc.provisioned_throughput {
512            pt.read_capacity_units = cur_rcu as u64;
513            pt.write_capacity_units = cur_wcu as u64;
514            if let Some(ref s) = stored {
515                pt.last_increase_date_time = s.last_increase_date_time;
516                pt.last_decrease_date_time = s.last_decrease_date_time;
517                pt.number_of_decreases_today = s.number_of_decreases_today.unwrap_or(0);
518            }
519        }
520    }
521
522    Ok(UpdateTableResponse {
523        table_description: desc,
524    })
525}
526
527/// Validate UpdateTable request parameters before checking table existence.
528///
529/// DynamoDB validates these parameters first and returns ValidationException
530/// rather than ResourceNotFoundException when both are invalid.
531fn validate_update_request(request: &UpdateTableRequest) -> Result<()> {
532    // Multi-field constraint errors
533    let mut errors = Vec::new();
534
535    // Validate ProvisionedThroughput fields
536    if let Some(ref pt) = request.provisioned_throughput {
537        if let Some(obj) = pt.as_object() {
538            let wcu = obj.get("WriteCapacityUnits");
539            let rcu = obj.get("ReadCapacityUnits");
540            if wcu.is_none() || wcu == Some(&serde_json::Value::Null) {
541                errors.push("Value null at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
542            } else if let Some(w) = wcu.and_then(|v| v.as_i64()) {
543                if w < 1 {
544                    errors.push(format!("Value '{}' at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w));
545                }
546            }
547            if rcu.is_none() || rcu == Some(&serde_json::Value::Null) {
548                errors.push("Value null at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
549            } else if let Some(r) = rcu.and_then(|v| v.as_i64()) {
550                if r < 1 {
551                    errors.push(format!("Value '{}' at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r));
552                }
553            }
554        }
555    }
556
557    // Validate GlobalSecondaryIndexUpdates fields
558    if let Some(ref updates) = request.global_secondary_index_updates {
559        for (i, update) in updates.iter().enumerate() {
560            if let Some(ref upd) = update.update {
561                // Validate Update.IndexName
562                if upd.index_name.len() < 3 {
563                    errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.indexName' failed to satisfy constraint: Member must have length greater than or equal to 3", upd.index_name, i + 1));
564                }
565                if !upd.index_name.is_empty()
566                    && !upd
567                        .index_name
568                        .chars()
569                        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
570                {
571                    errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.indexName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", upd.index_name, i + 1));
572                }
573                // Validate Update.ProvisionedThroughput
574                if let Some(ref pt) = upd.provisioned_throughput {
575                    let wcu = pt.write_capacity_units;
576                    let rcu = pt.read_capacity_units;
577                    if wcu.is_none() {
578                        errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
579                    } else if let Some(w) = wcu {
580                        if w < 1 {
581                            errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w, i + 1));
582                        }
583                    }
584                    if rcu.is_none() {
585                        errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
586                    } else if let Some(r) = rcu {
587                        if r < 1 {
588                            errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r, i + 1));
589                        }
590                    }
591                } else {
592                    errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput' failed to satisfy constraint: Member must not be null", i + 1));
593                }
594            }
595        }
596    }
597
598    // Cap at 10 errors
599    errors.truncate(10);
600
601    if !errors.is_empty() {
602        let prefix = format!(
603            "{} validation error{} detected: ",
604            errors.len(),
605            if errors.len() == 1 { "" } else { "s" }
606        );
607        return Err(DynoxideError::ValidationException(format!(
608            "{}{}",
609            prefix,
610            errors.join("; ")
611        )));
612    }
613
614    // Single-error validations (after multi-field)
615
616    // BillingMode enum validation
617    if let Some(ref bm) = request.billing_mode {
618        if bm != "PROVISIONED" && bm != "PAY_PER_REQUEST" {
619            return Err(DynoxideError::ValidationException(format!(
620                "1 validation error detected: Value '{}' at 'billingMode' \
621                 failed to satisfy constraint: Member must satisfy enum value set: \
622                 [PROVISIONED, PAY_PER_REQUEST]",
623                bm
624            )));
625        }
626    }
627
628    // TableClass enum validation (mirrors CreateTable)
629    if let Some(ref tc) = request.table_class {
630        if tc != "STANDARD" && tc != "STANDARD_INFREQUENT_ACCESS" {
631            return Err(DynoxideError::ValidationException(format!(
632                "1 validation error detected: Value '{tc}' at 'tableClass' failed to satisfy \
633                 constraint: Member must satisfy enum value set: \
634                 [STANDARD, STANDARD_INFREQUENT_ACCESS]"
635            )));
636        }
637    }
638
639    // BillingMode PAY_PER_REQUEST with ProvisionedThroughput is not allowed
640    if request.billing_mode.as_deref() == Some("PAY_PER_REQUEST")
641        && request.provisioned_throughput.is_some()
642    {
643        return Err(DynoxideError::ValidationException(
644            "One or more parameter values were invalid: \
645             Neither ReadCapacityUnits nor WriteCapacityUnits can be \
646             specified when BillingMode is PAY_PER_REQUEST"
647                .to_string(),
648        ));
649    }
650
651    // ProvisionedThroughput out-of-bounds
652    if let Some(ref pt) = request.provisioned_throughput {
653        if let Some(obj) = pt.as_object() {
654            let rcu = obj
655                .get("ReadCapacityUnits")
656                .and_then(|v| v.as_i64())
657                .unwrap_or(0);
658            let wcu = obj
659                .get("WriteCapacityUnits")
660                .and_then(|v| v.as_i64())
661                .unwrap_or(0);
662            const MAX_THROUGHPUT: i64 = 1_000_000_000_000;
663            if rcu > MAX_THROUGHPUT {
664                return Err(DynoxideError::ValidationException(format!(
665                    "Given value {} for ReadCapacityUnits is out of bounds",
666                    rcu
667                )));
668            }
669            if wcu > MAX_THROUGHPUT {
670                return Err(DynoxideError::ValidationException(format!(
671                    "Given value {} for WriteCapacityUnits is out of bounds",
672                    wcu
673                )));
674            }
675        }
676    }
677
678    // "At least one of ..." — a request must change something. A lone
679    // TableClass, OnDemandThroughput, or DeletionProtectionEnabled counts, the
680    // same as a throughput/billing/stream change. An empty
681    // GlobalSecondaryIndexUpdates array is treated as "no GSI change" rather
682    // than satisfying the requirement on its own.
683    let no_config_change = request.provisioned_throughput.is_none()
684        && request.billing_mode.is_none()
685        && request.stream_specification.is_none()
686        && request.deletion_protection_enabled.is_none()
687        && request.table_class.is_none()
688        && request.on_demand_throughput.is_none();
689    let no_gsi_change = request
690        .global_secondary_index_updates
691        .as_ref()
692        .is_none_or(|updates| updates.is_empty());
693    if no_gsi_change && no_config_change {
694        return Err(DynoxideError::ValidationException(
695            "At least one of ProvisionedThroughput, BillingMode, UpdateStreamEnabled, GlobalSecondaryIndexUpdates or SSESpecification or ReplicaUpdates is required".to_string(),
696        ));
697    }
698
699    // Validate GSI update structural constraints
700    if let Some(ref updates) = request.global_secondary_index_updates {
701        // Check empty index struct (no Update, Create, or Delete)
702        for update in updates {
703            if update.update.is_none() && update.create.is_none() && update.delete.is_none() {
704                return Err(DynoxideError::ValidationException(
705                    "One or more parameter values were invalid: One of GlobalSecondaryIndexUpdate.Update, GlobalSecondaryIndexUpdate.Create, GlobalSecondaryIndexUpdate.Delete must not be null".to_string(),
706                ));
707            }
708        }
709
710        // Check repeated index names
711        let mut seen_names = std::collections::HashSet::new();
712        for update in updates {
713            let name = if let Some(ref u) = update.update {
714                Some(u.index_name.as_str())
715            } else if let Some(ref c) = update.create {
716                Some(c.index_name.as_str())
717            } else {
718                update.delete.as_ref().map(|d| d.index_name.as_str())
719            };
720            if let Some(name) = name {
721                if !seen_names.insert(name.to_string()) {
722                    return Err(DynoxideError::ValidationException(format!(
723                        "One or more parameter values were invalid: Only one global secondary index update per index is allowed simultaneously. Index: {}",
724                        name
725                    )));
726                }
727            }
728        }
729    }
730
731    Ok(())
732}
733
734/// Extended provisioned throughput stored in metadata, including timestamps.
735#[derive(Debug, Clone, Default, Serialize, Deserialize)]
736struct StoredProvisionedThroughput {
737    #[serde(rename = "ReadCapacityUnits")]
738    read_capacity_units: i64,
739    #[serde(rename = "WriteCapacityUnits")]
740    write_capacity_units: i64,
741    #[serde(
742        rename = "LastIncreaseDateTime",
743        skip_serializing_if = "Option::is_none"
744    )]
745    last_increase_date_time: Option<f64>,
746    #[serde(
747        rename = "LastDecreaseDateTime",
748        skip_serializing_if = "Option::is_none"
749    )]
750    last_decrease_date_time: Option<f64>,
751    #[serde(
752        rename = "NumberOfDecreasesToday",
753        skip_serializing_if = "Option::is_none"
754    )]
755    number_of_decreases_today: Option<u64>,
756}
757
758/// Parse current provisioned throughput from table metadata.
759fn parse_current_throughput(meta: &crate::storage::TableMetadata) -> (i64, i64) {
760    parse_stored_throughput(meta)
761        .map(|pt| (pt.read_capacity_units, pt.write_capacity_units))
762        .unwrap_or((0, 0))
763}
764
765/// Parse the full stored provisioned throughput including timestamps.
766fn parse_stored_throughput(
767    meta: &crate::storage::TableMetadata,
768) -> Option<StoredProvisionedThroughput> {
769    meta.provisioned_throughput
770        .as_ref()
771        .and_then(|pt_json| serde_json::from_str(pt_json).ok())
772}
773
774/// Backfill existing items into a newly created GSI, processing in batches.
775async fn backfill_gsi<S: StorageBackend>(
776    storage: &S,
777    table_name: &str,
778    key_schema: &helpers::KeySchema,
779    gsi_def: &gsi::GsiDef,
780) -> Result<()> {
781    const BATCH_SIZE: usize = 1000;
782    let mut last_pk: Option<String> = None;
783    let mut last_sk: Option<String> = None;
784
785    loop {
786        let items = storage
787            .scan_items(
788                table_name,
789                &crate::storage::ScanParams {
790                    limit: Some(BATCH_SIZE),
791                    exclusive_start_pk: last_pk.as_deref(),
792                    exclusive_start_sk: last_sk.as_deref(),
793                    ..Default::default()
794                },
795            )
796            .await?;
797
798        if items.is_empty() {
799            break;
800        }
801
802        let mut rows: Vec<crate::storage_backend::GsiItemRow> = Vec::new();
803        for (pk, sk, item_json) in &items {
804            let item: crate::types::Item = serde_json::from_str(item_json)
805                .map_err(|e| DynoxideError::InternalServerError(format!("Bad item JSON: {e}")))?;
806
807            if let Some(gsi_pk_val) = item.get(&gsi_def.pk_attr) {
808                let gsi_pk = gsi_pk_val.to_key_string().unwrap_or_default();
809                let gsi_sk = gsi_def
810                    .sk_attr
811                    .as_ref()
812                    .and_then(|sk_attr| item.get(sk_attr))
813                    .and_then(|v| v.to_key_string())
814                    .unwrap_or_default();
815
816                let projected = gsi::build_index_item(
817                    &item,
818                    gsi_def,
819                    &key_schema.partition_key,
820                    key_schema.sort_key.as_deref(),
821                );
822                let projected_json = serde_json::to_string(&projected)
823                    .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
824
825                rows.push(crate::storage_backend::GsiItemRow {
826                    gsi_pk,
827                    gsi_sk,
828                    table_pk: pk.clone(),
829                    table_sk: sk.clone(),
830                    item_json: projected_json,
831                });
832            }
833        }
834
835        storage
836            .insert_gsi_items(table_name, &gsi_def.index_name, &rows)
837            .await?;
838
839        let last = &items[items.len() - 1];
840        last_pk = Some(last.0.clone());
841        last_sk = Some(last.1.clone());
842
843        if items.len() < BATCH_SIZE {
844            break;
845        }
846    }
847
848    Ok(())
849}