Skip to main content

dynoxide/actions/
update_table.rs

1use crate::actions::create_table::StreamSpecification;
2use crate::actions::{TableDescription, build_table_description};
3use crate::actions::{gsi, helpers};
4use crate::errors::{DynoxideError, Result};
5use crate::storage::Storage;
6use crate::streams;
7use crate::types::{AttributeDefinition, GlobalSecondaryIndex, KeySchemaElement, Projection};
8use crate::validation;
9use rusqlite;
10use serde::{Deserialize, Serialize};
11
12/// Internal raw deserialization struct.
13#[derive(Debug, Default, Deserialize)]
14struct UpdateTableRequestRaw {
15    #[serde(rename = "TableName", default)]
16    table_name: Option<String>,
17
18    #[serde(rename = "AttributeDefinitions", default)]
19    attribute_definitions: Option<Vec<AttributeDefinition>>,
20
21    #[serde(rename = "GlobalSecondaryIndexUpdates", default)]
22    global_secondary_index_updates: Option<Vec<GlobalSecondaryIndexUpdate>>,
23
24    #[serde(rename = "StreamSpecification", default)]
25    stream_specification: Option<StreamSpecification>,
26
27    #[serde(rename = "DeletionProtectionEnabled", default)]
28    deletion_protection_enabled: Option<bool>,
29
30    #[serde(rename = "ProvisionedThroughput", default)]
31    provisioned_throughput: Option<serde_json::Value>,
32
33    #[serde(rename = "BillingMode", default)]
34    billing_mode: Option<String>,
35}
36
37#[derive(Debug, Default)]
38pub struct UpdateTableRequest {
39    pub table_name: String,
40    pub attribute_definitions: Option<Vec<AttributeDefinition>>,
41    pub global_secondary_index_updates: Option<Vec<GlobalSecondaryIndexUpdate>>,
42    pub stream_specification: Option<StreamSpecification>,
43    pub deletion_protection_enabled: Option<bool>,
44    pub provisioned_throughput: Option<serde_json::Value>,
45    pub billing_mode: Option<String>,
46}
47
48impl<'de> serde::Deserialize<'de> for UpdateTableRequest {
49    fn deserialize<D: serde::Deserializer<'de>>(
50        deserializer: D,
51    ) -> std::result::Result<Self, D::Error> {
52        let raw = UpdateTableRequestRaw::deserialize(deserializer)?;
53
54        // Phase 1: Check TableName missing
55        if raw.table_name.is_none() || raw.table_name.as_deref() == Some("") {
56            let msg = if raw.table_name.is_none() {
57                "The parameter 'TableName' is required but was not present in the request"
58            } else {
59                "TableName must be at least 3 characters long and at most 255 characters long"
60            };
61            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
62        }
63
64        let table_name = raw.table_name.unwrap_or_default();
65
66        // Phase 2: Check TableName length
67        if table_name.len() < 3 || table_name.len() > 255 {
68            return Err(serde::de::Error::custom(
69                "VALIDATION:TableName must be at least 3 characters long and at most 255 characters long",
70            ));
71        }
72
73        // Phase 3: Multi-field constraint validation
74        let mut errors = Vec::new();
75
76        if !table_name
77            .chars()
78            .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
79        {
80            errors.push(format!(
81                "Value '{}' at 'tableName' failed to satisfy constraint: \
82                 Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+",
83                table_name
84            ));
85        }
86
87        if let Some(msg) = crate::validation::format_validation_errors(&errors) {
88            return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
89        }
90
91        Ok(UpdateTableRequest {
92            table_name,
93            attribute_definitions: raw.attribute_definitions,
94            global_secondary_index_updates: raw.global_secondary_index_updates,
95            stream_specification: raw.stream_specification,
96            deletion_protection_enabled: raw.deletion_protection_enabled,
97            provisioned_throughput: raw.provisioned_throughput,
98            billing_mode: raw.billing_mode,
99        })
100    }
101}
102
103#[derive(Debug, Default, Deserialize)]
104pub struct GlobalSecondaryIndexUpdate {
105    #[serde(rename = "Update", default)]
106    pub update: Option<UpdateGsiAction>,
107
108    #[serde(rename = "Create", default)]
109    pub create: Option<CreateGsiAction>,
110
111    #[serde(rename = "Delete", default)]
112    pub delete: Option<DeleteGsiAction>,
113}
114
115#[derive(Debug, Default, Deserialize)]
116pub struct UpdateGsiAction {
117    #[serde(rename = "IndexName")]
118    pub index_name: String,
119
120    #[serde(rename = "ProvisionedThroughput", default)]
121    pub provisioned_throughput: Option<crate::types::ProvisionedThroughput>,
122}
123
124#[derive(Debug, Default, Deserialize)]
125pub struct CreateGsiAction {
126    #[serde(rename = "IndexName")]
127    pub index_name: String,
128
129    #[serde(rename = "KeySchema")]
130    pub key_schema: Vec<KeySchemaElement>,
131
132    #[serde(rename = "Projection")]
133    pub projection: Projection,
134}
135
136#[derive(Debug, Default, Deserialize)]
137pub struct DeleteGsiAction {
138    #[serde(rename = "IndexName")]
139    pub index_name: String,
140}
141
142#[derive(Debug, Default, Serialize)]
143pub struct UpdateTableResponse {
144    #[serde(rename = "TableDescription")]
145    pub table_description: TableDescription,
146}
147
148pub fn execute(storage: &Storage, request: UpdateTableRequest) -> Result<UpdateTableResponse> {
149    // Table name validation is handled in the Deserialize impl
150
151    // Phase 1: Validate request parameters BEFORE table existence check
152    // (DynamoDB validates these first and returns ValidationException,
153    // not ResourceNotFoundException)
154    validate_update_request(&request)?;
155
156    // Phase 2: Table existence check
157    let meta = helpers::require_table(storage, &request.table_name)?;
158
159    let current_billing_mode = meta.billing_mode.as_deref().unwrap_or("PROVISIONED");
160
161    // Phase 3: Post-table-existence validations
162
163    // PAY_PER_REQUEST table + ProvisionedThroughput update is not allowed
164    if current_billing_mode == "PAY_PER_REQUEST"
165        && request.billing_mode.is_none()
166        && request.provisioned_throughput.is_some()
167    {
168        return Err(DynoxideError::ValidationException(
169            "One or more parameter values were invalid: \
170             Neither ReadCapacityUnits nor WriteCapacityUnits can be \
171             specified when BillingMode is PAY_PER_REQUEST"
172                .to_string(),
173        ));
174    }
175
176    // BillingMode PROVISIONED without ProvisionedThroughput
177    if request.billing_mode.as_deref() == Some("PROVISIONED")
178        && request.provisioned_throughput.is_none()
179    {
180        return Err(DynoxideError::ValidationException(
181            "One or more parameter values were invalid: \
182             ProvisionedThroughput must be specified when BillingMode is PROVISIONED"
183                .to_string(),
184        ));
185    }
186
187    // Same read/write values check
188    if let Some(ref pt) = request.provisioned_throughput {
189        if let Some(obj) = pt.as_object() {
190            let new_rcu = obj
191                .get("ReadCapacityUnits")
192                .and_then(|v| v.as_i64())
193                .unwrap_or(0);
194            let new_wcu = obj
195                .get("WriteCapacityUnits")
196                .and_then(|v| v.as_i64())
197                .unwrap_or(0);
198
199            // Parse current provisioned throughput from metadata
200            let (cur_rcu, cur_wcu) = parse_current_throughput(&meta);
201
202            let billing_mode_unchanged = request.billing_mode.is_none()
203                || (request.billing_mode.as_deref() == Some("PROVISIONED")
204                    && current_billing_mode == "PROVISIONED");
205
206            if new_rcu == cur_rcu && new_wcu == cur_wcu && billing_mode_unchanged {
207                return Err(DynoxideError::ValidationException(format!(
208                    "The provisioned throughput for the table will not change. \
209                     The requested value equals the current value. \
210                     Current ReadCapacityUnits provisioned for the table: {}. \
211                     Requested ReadCapacityUnits: {}. \
212                     Current WriteCapacityUnits provisioned for the table: {}. \
213                     Requested WriteCapacityUnits: {}. \
214                     Refer to the Amazon DynamoDB Developer Guide for current limits \
215                     and how to request higher limits.",
216                    cur_rcu, new_rcu, cur_wcu, new_wcu
217                )));
218            }
219        }
220    }
221
222    // Parse existing GSI definitions
223    let mut current_gsis: Vec<GlobalSecondaryIndex> = meta
224        .gsi_definitions
225        .as_ref()
226        .map(|json| serde_json::from_str(json))
227        .transpose()
228        .map_err(|e| DynoxideError::InternalServerError(format!("Bad GSI JSON: {e}")))?
229        .unwrap_or_default();
230
231    // GSI Update with high capacity on non-existent index
232    if let Some(ref updates) = request.global_secondary_index_updates {
233        for update in updates {
234            if let Some(ref upd) = update.update {
235                if !current_gsis.iter().any(|g| g.index_name == upd.index_name) {
236                    // DynamoDB returns this specific message for GSI updates on
237                    // non-existent indexes (even with out-of-bounds capacity)
238                    return Err(DynoxideError::ValidationException(
239                        "This operation cannot be performed with given input values. \
240                         Please contact DynamoDB service team for more info: \
241                         Action Blocked: IndexUpdate"
242                            .to_string(),
243                    ));
244                }
245            }
246        }
247    }
248
249    // Check GSI update count limit (DynamoDB allows at most 5 per request)
250    if let Some(ref updates) = request.global_secondary_index_updates {
251        if updates.len() > 5 {
252            return Err(DynoxideError::LimitExceededException(
253                "Subscriber limit exceeded: Only 1 online index can be created or \
254                 deleted simultaneously per table"
255                    .to_string(),
256            ));
257        }
258    }
259
260    // Use provided attribute definitions or fall back to existing
261    let existing_attr_defs: Vec<AttributeDefinition> =
262        serde_json::from_str(&meta.attribute_definitions)
263            .map_err(|e| DynoxideError::InternalServerError(format!("Bad attr defs JSON: {e}")))?;
264
265    let attr_defs = request
266        .attribute_definitions
267        .as_ref()
268        .unwrap_or(&existing_attr_defs);
269
270    // Parse table key schema for backfill
271    let key_schema = helpers::parse_key_schema(&meta)?;
272
273    // Validate all GSI updates before making any changes
274    if let Some(ref updates) = request.global_secondary_index_updates {
275        for update in updates {
276            if let Some(ref create) = update.create {
277                if current_gsis
278                    .iter()
279                    .any(|g| g.index_name == create.index_name)
280                {
281                    return Err(DynoxideError::ValidationException(format!(
282                        "One or more parameter values were invalid: \
283                         Index already exists: {}",
284                        create.index_name
285                    )));
286                }
287                let gsi_def = GlobalSecondaryIndex {
288                    index_name: create.index_name.clone(),
289                    key_schema: create.key_schema.clone(),
290                    projection: create.projection.clone(),
291                    provisioned_throughput: None,
292                };
293                validation::validate_gsi(&gsi_def, attr_defs)?;
294            }
295            if let Some(ref delete) = update.delete {
296                if !current_gsis
297                    .iter()
298                    .any(|g| g.index_name == delete.index_name)
299                {
300                    return Err(DynoxideError::ResourceNotFoundException(format!(
301                        "Requested resource not found: Table: {} not found",
302                        delete.index_name
303                    )));
304                }
305            }
306        }
307    }
308
309    // Determine if this is a throughput increase or decrease.
310    // Ensure timestamps strictly increase across successive updates
311    // (the dynalite test expects LastDecreaseDateTime > LastIncreaseDateTime).
312    let now = {
313        use std::sync::atomic::{AtomicU64, Ordering};
314        static LAST_TS: AtomicU64 = AtomicU64::new(0);
315        let wall = std::time::SystemTime::now()
316            .duration_since(std::time::UNIX_EPOCH)
317            .unwrap_or_default()
318            .as_secs_f64();
319        loop {
320            let prev_bits = LAST_TS.load(Ordering::SeqCst);
321            let prev_f = f64::from_bits(prev_bits);
322            let candidate = if wall > prev_f { wall } else { prev_f + 0.001 };
323            let candidate_bits = candidate.to_bits();
324            if LAST_TS
325                .compare_exchange(
326                    prev_bits,
327                    candidate_bits,
328                    Ordering::SeqCst,
329                    Ordering::SeqCst,
330                )
331                .is_ok()
332            {
333                break candidate;
334            }
335        }
336    };
337
338    let (cur_rcu, cur_wcu) = parse_current_throughput(&meta);
339    let is_pt_update = request.provisioned_throughput.is_some();
340    let (new_rcu, new_wcu) = if let Some(ref pt) = request.provisioned_throughput {
341        let obj = pt.as_object();
342        (
343            obj.and_then(|o| o.get("ReadCapacityUnits"))
344                .and_then(|v| v.as_i64())
345                .unwrap_or(0),
346            obj.and_then(|o| o.get("WriteCapacityUnits"))
347                .and_then(|v| v.as_i64())
348                .unwrap_or(0),
349        )
350    } else {
351        (cur_rcu, cur_wcu)
352    };
353
354    let is_increase = new_rcu > cur_rcu || new_wcu > cur_wcu;
355    let is_decrease = new_rcu < cur_rcu || new_wcu < cur_wcu;
356
357    // All validation passed — perform mutations inside a transaction
358    storage.begin_transaction()?;
359
360    let result = (|| -> Result<()> {
361        if let Some(ref updates) = request.global_secondary_index_updates {
362            for update in updates {
363                if let Some(ref create) = update.create {
364                    let gsi_def = GlobalSecondaryIndex {
365                        index_name: create.index_name.clone(),
366                        key_schema: create.key_schema.clone(),
367                        projection: create.projection.clone(),
368                        provisioned_throughput: None,
369                    };
370
371                    storage.create_gsi_table(&request.table_name, &create.index_name)?;
372
373                    let gsi_p = gsi::gsi_to_def(&gsi_def)?;
374                    backfill_gsi(storage, &request.table_name, &key_schema, &gsi_p)?;
375
376                    current_gsis.push(gsi_def);
377                }
378
379                if let Some(ref delete) = update.delete {
380                    storage.drop_gsi_table(&request.table_name, &delete.index_name)?;
381                    current_gsis.retain(|g| g.index_name != delete.index_name);
382                }
383            }
384        }
385
386        // Update metadata
387        let attr_defs_json = serde_json::to_string(attr_defs)
388            .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
389        let gsi_json = if current_gsis.is_empty() {
390            None
391        } else {
392            Some(
393                serde_json::to_string(&current_gsis)
394                    .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?,
395            )
396        };
397
398        storage.update_table_metadata(&request.table_name, &attr_defs_json, gsi_json.as_deref())?;
399
400        // Update provisioned throughput if requested
401        if is_pt_update {
402            let prev = parse_stored_throughput(&meta);
403            let mut stored = StoredProvisionedThroughput {
404                read_capacity_units: new_rcu,
405                write_capacity_units: new_wcu,
406                last_increase_date_time: prev.as_ref().and_then(|p| p.last_increase_date_time),
407                last_decrease_date_time: prev.as_ref().and_then(|p| p.last_decrease_date_time),
408                number_of_decreases_today: prev
409                    .as_ref()
410                    .and_then(|p| p.number_of_decreases_today)
411                    .or(Some(0)),
412            };
413            if is_increase {
414                stored.last_increase_date_time = Some(now);
415            }
416            if is_decrease {
417                stored.last_decrease_date_time = Some(now);
418                stored.number_of_decreases_today =
419                    Some(stored.number_of_decreases_today.unwrap_or(0) + 1);
420            }
421            let pt_json = serde_json::to_string(&stored)
422                .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
423            storage.update_provisioned_throughput(&request.table_name, &pt_json)?;
424        }
425
426        // Handle deletion protection changes
427        if let Some(enabled) = request.deletion_protection_enabled {
428            storage.update_deletion_protection(&request.table_name, enabled)?;
429        }
430
431        // Handle billing mode changes
432        if let Some(ref billing_mode) = request.billing_mode {
433            storage.update_billing_mode(&request.table_name, billing_mode)?;
434            if billing_mode == "PAY_PER_REQUEST" {
435                // Clear provisioned throughput to avoid stale data
436                storage.clear_provisioned_throughput(&request.table_name)?;
437            }
438        }
439
440        // Handle stream specification changes
441        if let Some(ref spec) = request.stream_specification {
442            if spec.stream_enabled {
443                let view_type = spec
444                    .stream_view_type
445                    .as_deref()
446                    .unwrap_or("NEW_AND_OLD_IMAGES");
447                let label = streams::generate_stream_label();
448                storage.enable_stream(&request.table_name, view_type, &label)?;
449            } else {
450                storage.disable_stream(&request.table_name)?;
451            }
452        }
453
454        Ok(())
455    })();
456
457    match result {
458        Ok(()) => storage.commit()?,
459        Err(e) => {
460            let _ = storage.rollback();
461            return Err(e);
462        }
463    }
464
465    // Build response from updated metadata
466    let updated_meta = helpers::require_table(storage, &request.table_name)?;
467    let mut desc = build_table_description(&updated_meta, Some(0), Some(0));
468
469    // DynamoDB returns UPDATING status during throughput changes
470    if is_pt_update {
471        desc.table_status = "UPDATING".to_string();
472
473        // The immediate response shows the OLD throughput values while the
474        // table is in UPDATING status, but with updated timestamps.
475        let stored = parse_stored_throughput(&updated_meta);
476        if let Some(ref mut pt) = desc.provisioned_throughput {
477            pt.read_capacity_units = cur_rcu as u64;
478            pt.write_capacity_units = cur_wcu as u64;
479            if let Some(ref s) = stored {
480                pt.last_increase_date_time = s.last_increase_date_time;
481                pt.last_decrease_date_time = s.last_decrease_date_time;
482                pt.number_of_decreases_today = s.number_of_decreases_today.unwrap_or(0);
483            }
484        }
485    }
486
487    Ok(UpdateTableResponse {
488        table_description: desc,
489    })
490}
491
492/// Validate UpdateTable request parameters before checking table existence.
493///
494/// DynamoDB validates these parameters first and returns ValidationException
495/// rather than ResourceNotFoundException when both are invalid.
496fn validate_update_request(request: &UpdateTableRequest) -> Result<()> {
497    // Multi-field constraint errors
498    let mut errors = Vec::new();
499
500    // Validate ProvisionedThroughput fields
501    if let Some(ref pt) = request.provisioned_throughput {
502        if let Some(obj) = pt.as_object() {
503            let wcu = obj.get("WriteCapacityUnits");
504            let rcu = obj.get("ReadCapacityUnits");
505            if wcu.is_none() || wcu == Some(&serde_json::Value::Null) {
506                errors.push("Value null at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
507            } else if let Some(w) = wcu.and_then(|v| v.as_i64()) {
508                if w < 1 {
509                    errors.push(format!("Value '{}' at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w));
510                }
511            }
512            if rcu.is_none() || rcu == Some(&serde_json::Value::Null) {
513                errors.push("Value null at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
514            } else if let Some(r) = rcu.and_then(|v| v.as_i64()) {
515                if r < 1 {
516                    errors.push(format!("Value '{}' at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r));
517                }
518            }
519        }
520    }
521
522    // Validate GlobalSecondaryIndexUpdates fields
523    if let Some(ref updates) = request.global_secondary_index_updates {
524        for (i, update) in updates.iter().enumerate() {
525            if let Some(ref upd) = update.update {
526                // Validate Update.IndexName
527                if upd.index_name.len() < 3 {
528                    errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.indexName' failed to satisfy constraint: Member must have length greater than or equal to 3", upd.index_name, i + 1));
529                }
530                if !upd.index_name.is_empty()
531                    && !upd
532                        .index_name
533                        .chars()
534                        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
535                {
536                    errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.indexName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", upd.index_name, i + 1));
537                }
538                // Validate Update.ProvisionedThroughput
539                if let Some(ref pt) = upd.provisioned_throughput {
540                    let wcu = pt.write_capacity_units;
541                    let rcu = pt.read_capacity_units;
542                    if wcu.is_none() {
543                        errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
544                    } else if let Some(w) = wcu {
545                        if w < 1 {
546                            errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w, i + 1));
547                        }
548                    }
549                    if rcu.is_none() {
550                        errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
551                    } else if let Some(r) = rcu {
552                        if r < 1 {
553                            errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r, i + 1));
554                        }
555                    }
556                } else {
557                    errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput' failed to satisfy constraint: Member must not be null", i + 1));
558                }
559            }
560        }
561    }
562
563    // Cap at 10 errors
564    errors.truncate(10);
565
566    if !errors.is_empty() {
567        let prefix = format!(
568            "{} validation error{} detected: ",
569            errors.len(),
570            if errors.len() == 1 { "" } else { "s" }
571        );
572        return Err(DynoxideError::ValidationException(format!(
573            "{}{}",
574            prefix,
575            errors.join("; ")
576        )));
577    }
578
579    // Single-error validations (after multi-field)
580
581    // BillingMode enum validation
582    if let Some(ref bm) = request.billing_mode {
583        if bm != "PROVISIONED" && bm != "PAY_PER_REQUEST" {
584            return Err(DynoxideError::ValidationException(format!(
585                "1 validation error detected: Value '{}' at 'billingMode' \
586                 failed to satisfy constraint: Member must satisfy enum value set: \
587                 [PROVISIONED, PAY_PER_REQUEST]",
588                bm
589            )));
590        }
591    }
592
593    // BillingMode PAY_PER_REQUEST with ProvisionedThroughput is not allowed
594    if request.billing_mode.as_deref() == Some("PAY_PER_REQUEST")
595        && request.provisioned_throughput.is_some()
596    {
597        return Err(DynoxideError::ValidationException(
598            "One or more parameter values were invalid: \
599             Neither ReadCapacityUnits nor WriteCapacityUnits can be \
600             specified when BillingMode is PAY_PER_REQUEST"
601                .to_string(),
602        ));
603    }
604
605    // ProvisionedThroughput out-of-bounds
606    if let Some(ref pt) = request.provisioned_throughput {
607        if let Some(obj) = pt.as_object() {
608            let rcu = obj
609                .get("ReadCapacityUnits")
610                .and_then(|v| v.as_i64())
611                .unwrap_or(0);
612            let wcu = obj
613                .get("WriteCapacityUnits")
614                .and_then(|v| v.as_i64())
615                .unwrap_or(0);
616            const MAX_THROUGHPUT: i64 = 1_000_000_000_000;
617            if rcu > MAX_THROUGHPUT {
618                return Err(DynoxideError::ValidationException(format!(
619                    "Given value {} for ReadCapacityUnits is out of bounds",
620                    rcu
621                )));
622            }
623            if wcu > MAX_THROUGHPUT {
624                return Err(DynoxideError::ValidationException(format!(
625                    "Given value {} for WriteCapacityUnits is out of bounds",
626                    wcu
627                )));
628            }
629        }
630    }
631
632    // Empty GlobalSecondaryIndexUpdates
633    if let Some(ref updates) = request.global_secondary_index_updates {
634        if updates.is_empty() {
635            // "At least one of ..." - but only if nothing else is specified
636            if request.provisioned_throughput.is_none()
637                && request.billing_mode.is_none()
638                && request.stream_specification.is_none()
639            {
640                return Err(DynoxideError::ValidationException(
641                    "At least one of ProvisionedThroughput, BillingMode, UpdateStreamEnabled, GlobalSecondaryIndexUpdates or SSESpecification or ReplicaUpdates is required".to_string(),
642                ));
643            }
644        }
645    } else if request.provisioned_throughput.is_none()
646        && request.billing_mode.is_none()
647        && request.stream_specification.is_none()
648        && request.deletion_protection_enabled.is_none()
649    {
650        return Err(DynoxideError::ValidationException(
651            "At least one of ProvisionedThroughput, BillingMode, UpdateStreamEnabled, GlobalSecondaryIndexUpdates or SSESpecification or ReplicaUpdates is required".to_string(),
652        ));
653    }
654
655    // Validate GSI update structural constraints
656    if let Some(ref updates) = request.global_secondary_index_updates {
657        // Check empty index struct (no Update, Create, or Delete)
658        for update in updates {
659            if update.update.is_none() && update.create.is_none() && update.delete.is_none() {
660                return Err(DynoxideError::ValidationException(
661                    "One or more parameter values were invalid: One of GlobalSecondaryIndexUpdate.Update, GlobalSecondaryIndexUpdate.Create, GlobalSecondaryIndexUpdate.Delete must not be null".to_string(),
662                ));
663            }
664        }
665
666        // Check repeated index names
667        let mut seen_names = std::collections::HashSet::new();
668        for update in updates {
669            let name = if let Some(ref u) = update.update {
670                Some(u.index_name.as_str())
671            } else if let Some(ref c) = update.create {
672                Some(c.index_name.as_str())
673            } else {
674                update.delete.as_ref().map(|d| d.index_name.as_str())
675            };
676            if let Some(name) = name {
677                if !seen_names.insert(name.to_string()) {
678                    return Err(DynoxideError::ValidationException(format!(
679                        "One or more parameter values were invalid: Only one global secondary index update per index is allowed simultaneously. Index: {}",
680                        name
681                    )));
682                }
683            }
684        }
685    }
686
687    Ok(())
688}
689
690/// Extended provisioned throughput stored in metadata, including timestamps.
691#[derive(Debug, Clone, Default, Serialize, Deserialize)]
692struct StoredProvisionedThroughput {
693    #[serde(rename = "ReadCapacityUnits")]
694    read_capacity_units: i64,
695    #[serde(rename = "WriteCapacityUnits")]
696    write_capacity_units: i64,
697    #[serde(
698        rename = "LastIncreaseDateTime",
699        skip_serializing_if = "Option::is_none"
700    )]
701    last_increase_date_time: Option<f64>,
702    #[serde(
703        rename = "LastDecreaseDateTime",
704        skip_serializing_if = "Option::is_none"
705    )]
706    last_decrease_date_time: Option<f64>,
707    #[serde(
708        rename = "NumberOfDecreasesToday",
709        skip_serializing_if = "Option::is_none"
710    )]
711    number_of_decreases_today: Option<u64>,
712}
713
714/// Parse current provisioned throughput from table metadata.
715fn parse_current_throughput(meta: &crate::storage::TableMetadata) -> (i64, i64) {
716    parse_stored_throughput(meta)
717        .map(|pt| (pt.read_capacity_units, pt.write_capacity_units))
718        .unwrap_or((0, 0))
719}
720
721/// Parse the full stored provisioned throughput including timestamps.
722fn parse_stored_throughput(
723    meta: &crate::storage::TableMetadata,
724) -> Option<StoredProvisionedThroughput> {
725    meta.provisioned_throughput
726        .as_ref()
727        .and_then(|pt_json| serde_json::from_str(pt_json).ok())
728}
729
730/// Backfill existing items into a newly created GSI, processing in batches.
731fn backfill_gsi(
732    storage: &Storage,
733    table_name: &str,
734    key_schema: &helpers::KeySchema,
735    gsi_def: &gsi::GsiDef,
736) -> Result<()> {
737    const BATCH_SIZE: usize = 1000;
738    let mut last_pk: Option<String> = None;
739    let mut last_sk: Option<String> = None;
740
741    let gsi_table_name = format!("{}::gsi::{}", table_name, gsi_def.index_name);
742    let insert_sql = format!(
743        "INSERT OR REPLACE INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) \
744         VALUES (?1, ?2, ?3, ?4, ?5)",
745        crate::storage::escape_table_name(&gsi_table_name)
746    );
747    let mut stmt = storage
748        .conn()
749        .prepare_cached(&insert_sql)
750        .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
751
752    loop {
753        let items = storage.scan_items(
754            table_name,
755            &crate::storage::ScanParams {
756                limit: Some(BATCH_SIZE),
757                exclusive_start_pk: last_pk.as_deref(),
758                exclusive_start_sk: last_sk.as_deref(),
759                ..Default::default()
760            },
761        )?;
762
763        if items.is_empty() {
764            break;
765        }
766
767        for (pk, sk, item_json) in &items {
768            let item: crate::types::Item = serde_json::from_str(item_json)
769                .map_err(|e| DynoxideError::InternalServerError(format!("Bad item JSON: {e}")))?;
770
771            if let Some(gsi_pk_val) = item.get(&gsi_def.pk_attr) {
772                let gsi_pk = gsi_pk_val.to_key_string().unwrap_or_default();
773                let gsi_sk = gsi_def
774                    .sk_attr
775                    .as_ref()
776                    .and_then(|sk_attr| item.get(sk_attr))
777                    .and_then(|v| v.to_key_string())
778                    .unwrap_or_default();
779
780                let projected = gsi::build_index_item(
781                    &item,
782                    gsi_def,
783                    &key_schema.partition_key,
784                    key_schema.sort_key.as_deref(),
785                );
786                let projected_json = serde_json::to_string(&projected)
787                    .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
788
789                stmt.execute(rusqlite::params![gsi_pk, gsi_sk, pk, sk, projected_json])
790                    .map_err(DynoxideError::from)?;
791            }
792        }
793
794        let last = &items[items.len() - 1];
795        last_pk = Some(last.0.clone());
796        last_sk = Some(last.1.clone());
797
798        if items.len() < BATCH_SIZE {
799            break;
800        }
801    }
802
803    Ok(())
804}