1use crate::actions::create_table::StreamSpecification;
2use crate::actions::{TableDescription, build_table_description};
3use crate::actions::{gsi, helpers};
4use crate::errors::{DynoxideError, Result};
5use crate::storage_backend::StorageBackend;
6use crate::streams;
7use crate::types::{AttributeDefinition, GlobalSecondaryIndex, KeySchemaElement, Projection};
8use crate::validation;
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Default, Deserialize)]
13struct UpdateTableRequestRaw {
14 #[serde(rename = "TableName", default)]
15 table_name: Option<String>,
16
17 #[serde(rename = "AttributeDefinitions", default)]
18 attribute_definitions: Option<Vec<AttributeDefinition>>,
19
20 #[serde(rename = "GlobalSecondaryIndexUpdates", default)]
21 global_secondary_index_updates: Option<Vec<GlobalSecondaryIndexUpdate>>,
22
23 #[serde(rename = "StreamSpecification", default)]
24 stream_specification: Option<StreamSpecification>,
25
26 #[serde(rename = "DeletionProtectionEnabled", default)]
27 deletion_protection_enabled: Option<bool>,
28
29 #[serde(rename = "ProvisionedThroughput", default)]
30 provisioned_throughput: Option<serde_json::Value>,
31
32 #[serde(rename = "BillingMode", default)]
33 billing_mode: Option<String>,
34
35 #[serde(rename = "TableClass", default)]
36 table_class: Option<String>,
37
38 #[serde(rename = "OnDemandThroughput", default)]
39 on_demand_throughput: Option<crate::types::OnDemandThroughput>,
40}
41
42#[derive(Debug, Default)]
43pub struct UpdateTableRequest {
44 pub table_name: String,
45 pub attribute_definitions: Option<Vec<AttributeDefinition>>,
46 pub global_secondary_index_updates: Option<Vec<GlobalSecondaryIndexUpdate>>,
47 pub stream_specification: Option<StreamSpecification>,
48 pub deletion_protection_enabled: Option<bool>,
49 pub provisioned_throughput: Option<serde_json::Value>,
50 pub billing_mode: Option<String>,
51 pub table_class: Option<String>,
52 pub on_demand_throughput: Option<crate::types::OnDemandThroughput>,
53}
54
55impl<'de> serde::Deserialize<'de> for UpdateTableRequest {
56 fn deserialize<D: serde::Deserializer<'de>>(
57 deserializer: D,
58 ) -> std::result::Result<Self, D::Error> {
59 let raw = UpdateTableRequestRaw::deserialize(deserializer)?;
60
61 if raw.table_name.is_none() || raw.table_name.as_deref() == Some("") {
63 let msg = if raw.table_name.is_none() {
64 "The parameter 'TableName' is required but was not present in the request"
65 } else {
66 "TableName must be at least 3 characters long and at most 255 characters long"
67 };
68 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
69 }
70
71 let table_name = raw.table_name.unwrap_or_default();
72
73 if table_name.len() < 3 || table_name.len() > 255 {
75 return Err(serde::de::Error::custom(
76 "VALIDATION:TableName must be at least 3 characters long and at most 255 characters long",
77 ));
78 }
79
80 let mut errors = Vec::new();
82
83 if !table_name
84 .chars()
85 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
86 {
87 errors.push(format!(
88 "Value '{}' at 'tableName' failed to satisfy constraint: \
89 Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+",
90 table_name
91 ));
92 }
93
94 if let Some(msg) = crate::validation::format_validation_errors(&errors) {
95 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
96 }
97
98 Ok(UpdateTableRequest {
99 table_name,
100 attribute_definitions: raw.attribute_definitions,
101 global_secondary_index_updates: raw.global_secondary_index_updates,
102 stream_specification: raw.stream_specification,
103 deletion_protection_enabled: raw.deletion_protection_enabled,
104 provisioned_throughput: raw.provisioned_throughput,
105 billing_mode: raw.billing_mode,
106 table_class: raw.table_class,
107 on_demand_throughput: raw.on_demand_throughput,
108 })
109 }
110}
111
112#[derive(Debug, Default, Deserialize)]
113pub struct GlobalSecondaryIndexUpdate {
114 #[serde(rename = "Update", default)]
115 pub update: Option<UpdateGsiAction>,
116
117 #[serde(rename = "Create", default)]
118 pub create: Option<CreateGsiAction>,
119
120 #[serde(rename = "Delete", default)]
121 pub delete: Option<DeleteGsiAction>,
122}
123
124#[derive(Debug, Default, Deserialize)]
125pub struct UpdateGsiAction {
126 #[serde(rename = "IndexName")]
127 pub index_name: String,
128
129 #[serde(rename = "ProvisionedThroughput", default)]
130 pub provisioned_throughput: Option<crate::types::ProvisionedThroughput>,
131}
132
133#[derive(Debug, Default, Deserialize)]
134pub struct CreateGsiAction {
135 #[serde(rename = "IndexName")]
136 pub index_name: String,
137
138 #[serde(rename = "KeySchema")]
139 pub key_schema: Vec<KeySchemaElement>,
140
141 #[serde(rename = "Projection")]
142 pub projection: Projection,
143}
144
145#[derive(Debug, Default, Deserialize)]
146pub struct DeleteGsiAction {
147 #[serde(rename = "IndexName")]
148 pub index_name: String,
149}
150
151#[derive(Debug, Default, Serialize)]
152pub struct UpdateTableResponse {
153 #[serde(rename = "TableDescription")]
154 pub table_description: TableDescription,
155}
156
157pub async fn execute<S: StorageBackend>(
158 storage: &S,
159 request: UpdateTableRequest,
160) -> Result<UpdateTableResponse> {
161 validate_update_request(&request)?;
167
168 let meta = helpers::require_table(storage, &request.table_name).await?;
170
171 let current_billing_mode = meta.billing_mode.as_deref().unwrap_or("PROVISIONED");
172
173 if current_billing_mode == "PAY_PER_REQUEST"
177 && request.billing_mode.is_none()
178 && request.provisioned_throughput.is_some()
179 {
180 return Err(DynoxideError::ValidationException(
181 "One or more parameter values were invalid: \
182 Neither ReadCapacityUnits nor WriteCapacityUnits can be \
183 specified when BillingMode is PAY_PER_REQUEST"
184 .to_string(),
185 ));
186 }
187
188 if request.billing_mode.as_deref() == Some("PROVISIONED")
190 && request.provisioned_throughput.is_none()
191 {
192 return Err(DynoxideError::ValidationException(
193 "One or more parameter values were invalid: \
194 ProvisionedThroughput must be specified when BillingMode is PROVISIONED"
195 .to_string(),
196 ));
197 }
198
199 if let Some(ref pt) = request.provisioned_throughput {
201 if let Some(obj) = pt.as_object() {
202 let new_rcu = obj
203 .get("ReadCapacityUnits")
204 .and_then(|v| v.as_i64())
205 .unwrap_or(0);
206 let new_wcu = obj
207 .get("WriteCapacityUnits")
208 .and_then(|v| v.as_i64())
209 .unwrap_or(0);
210
211 let (cur_rcu, cur_wcu) = parse_current_throughput(&meta);
213
214 let billing_mode_unchanged = request.billing_mode.is_none()
215 || (request.billing_mode.as_deref() == Some("PROVISIONED")
216 && current_billing_mode == "PROVISIONED");
217
218 if new_rcu == cur_rcu && new_wcu == cur_wcu && billing_mode_unchanged {
219 return Err(DynoxideError::ValidationException(format!(
220 "The provisioned throughput for the table will not change. \
221 The requested value equals the current value. \
222 Current ReadCapacityUnits provisioned for the table: {}. \
223 Requested ReadCapacityUnits: {}. \
224 Current WriteCapacityUnits provisioned for the table: {}. \
225 Requested WriteCapacityUnits: {}. \
226 Refer to the Amazon DynamoDB Developer Guide for current limits \
227 and how to request higher limits.",
228 cur_rcu, new_rcu, cur_wcu, new_wcu
229 )));
230 }
231 }
232 }
233
234 let mut current_gsis: Vec<GlobalSecondaryIndex> = meta
236 .gsi_definitions
237 .as_ref()
238 .map(|json| serde_json::from_str(json))
239 .transpose()
240 .map_err(|e| DynoxideError::InternalServerError(format!("Bad GSI JSON: {e}")))?
241 .unwrap_or_default();
242
243 if let Some(ref updates) = request.global_secondary_index_updates {
245 for update in updates {
246 if let Some(ref upd) = update.update {
247 if !current_gsis.iter().any(|g| g.index_name == upd.index_name) {
248 return Err(DynoxideError::ValidationException(
251 "This operation cannot be performed with given input values. \
252 Please contact DynamoDB service team for more info: \
253 Action Blocked: IndexUpdate"
254 .to_string(),
255 ));
256 }
257 }
258 }
259 }
260
261 if let Some(ref updates) = request.global_secondary_index_updates {
263 if updates.len() > 5 {
264 return Err(DynoxideError::LimitExceededException(
265 "Subscriber limit exceeded: Only 1 online index can be created or \
266 deleted simultaneously per table"
267 .to_string(),
268 ));
269 }
270 }
271
272 let existing_attr_defs: Vec<AttributeDefinition> =
274 serde_json::from_str(&meta.attribute_definitions)
275 .map_err(|e| DynoxideError::InternalServerError(format!("Bad attr defs JSON: {e}")))?;
276
277 let attr_defs = request
278 .attribute_definitions
279 .as_ref()
280 .unwrap_or(&existing_attr_defs);
281
282 let key_schema = helpers::parse_key_schema(&meta)?;
284
285 if let Some(ref updates) = request.global_secondary_index_updates {
287 for update in updates {
288 if let Some(ref create) = update.create {
289 if current_gsis
290 .iter()
291 .any(|g| g.index_name == create.index_name)
292 {
293 return Err(DynoxideError::ValidationException(format!(
294 "One or more parameter values were invalid: \
295 Index already exists: {}",
296 create.index_name
297 )));
298 }
299 let gsi_def = GlobalSecondaryIndex {
300 index_name: create.index_name.clone(),
301 key_schema: create.key_schema.clone(),
302 projection: create.projection.clone(),
303 provisioned_throughput: None,
304 };
305 validation::validate_gsi(&gsi_def, attr_defs)?;
306 }
307 if let Some(ref delete) = update.delete {
308 if !current_gsis
309 .iter()
310 .any(|g| g.index_name == delete.index_name)
311 {
312 return Err(DynoxideError::ResourceNotFoundException(format!(
313 "Requested resource not found: Table: {} not found",
314 delete.index_name
315 )));
316 }
317 }
318 }
319 }
320
321 let now = {
325 use std::sync::atomic::{AtomicU64, Ordering};
326 static LAST_TS: AtomicU64 = AtomicU64::new(0);
327 let wall = web_time::SystemTime::now()
328 .duration_since(web_time::UNIX_EPOCH)
329 .unwrap_or_default()
330 .as_secs_f64();
331 loop {
332 let prev_bits = LAST_TS.load(Ordering::SeqCst);
333 let prev_f = f64::from_bits(prev_bits);
334 let candidate = if wall > prev_f { wall } else { prev_f + 0.001 };
335 let candidate_bits = candidate.to_bits();
336 if LAST_TS
337 .compare_exchange(
338 prev_bits,
339 candidate_bits,
340 Ordering::SeqCst,
341 Ordering::SeqCst,
342 )
343 .is_ok()
344 {
345 break candidate;
346 }
347 }
348 };
349
350 let (cur_rcu, cur_wcu) = parse_current_throughput(&meta);
351 let is_pt_update = request.provisioned_throughput.is_some();
352 let (new_rcu, new_wcu) = if let Some(ref pt) = request.provisioned_throughput {
353 let obj = pt.as_object();
354 (
355 obj.and_then(|o| o.get("ReadCapacityUnits"))
356 .and_then(|v| v.as_i64())
357 .unwrap_or(0),
358 obj.and_then(|o| o.get("WriteCapacityUnits"))
359 .and_then(|v| v.as_i64())
360 .unwrap_or(0),
361 )
362 } else {
363 (cur_rcu, cur_wcu)
364 };
365
366 let is_increase = new_rcu > cur_rcu || new_wcu > cur_wcu;
367 let is_decrease = new_rcu < cur_rcu || new_wcu < cur_wcu;
368
369 helpers::with_write_transaction(storage, async {
371 if let Some(ref updates) = request.global_secondary_index_updates {
372 for update in updates {
373 if let Some(ref create) = update.create {
374 let gsi_def = GlobalSecondaryIndex {
375 index_name: create.index_name.clone(),
376 key_schema: create.key_schema.clone(),
377 projection: create.projection.clone(),
378 provisioned_throughput: None,
379 };
380
381 storage
382 .create_gsi_table(&request.table_name, &create.index_name)
383 .await?;
384
385 let gsi_p = gsi::gsi_to_def(&gsi_def)?;
386 backfill_gsi(storage, &request.table_name, &key_schema, &gsi_p).await?;
387
388 current_gsis.push(gsi_def);
389 }
390
391 if let Some(ref delete) = update.delete {
392 storage
393 .drop_gsi_table(&request.table_name, &delete.index_name)
394 .await?;
395 current_gsis.retain(|g| g.index_name != delete.index_name);
396 }
397 }
398 }
399
400 let attr_defs_json = serde_json::to_string(attr_defs)
402 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
403 let gsi_json = if current_gsis.is_empty() {
404 None
405 } else {
406 Some(
407 serde_json::to_string(¤t_gsis)
408 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?,
409 )
410 };
411
412 storage
413 .update_table_metadata(&request.table_name, &attr_defs_json, gsi_json.as_deref())
414 .await?;
415
416 if is_pt_update {
418 let prev = parse_stored_throughput(&meta);
419 let mut stored = StoredProvisionedThroughput {
420 read_capacity_units: new_rcu,
421 write_capacity_units: new_wcu,
422 last_increase_date_time: prev.as_ref().and_then(|p| p.last_increase_date_time),
423 last_decrease_date_time: prev.as_ref().and_then(|p| p.last_decrease_date_time),
424 number_of_decreases_today: prev
425 .as_ref()
426 .and_then(|p| p.number_of_decreases_today)
427 .or(Some(0)),
428 };
429 if is_increase {
430 stored.last_increase_date_time = Some(now);
431 }
432 if is_decrease {
433 stored.last_decrease_date_time = Some(now);
434 stored.number_of_decreases_today =
435 Some(stored.number_of_decreases_today.unwrap_or(0) + 1);
436 }
437 let pt_json = serde_json::to_string(&stored)
438 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
439 storage
440 .update_provisioned_throughput(&request.table_name, &pt_json)
441 .await?;
442 }
443
444 if let Some(enabled) = request.deletion_protection_enabled {
446 storage
447 .update_deletion_protection(&request.table_name, enabled)
448 .await?;
449 }
450
451 if let Some(ref table_class) = request.table_class {
453 storage
454 .update_table_class(&request.table_name, table_class)
455 .await?;
456 }
457
458 if let Some(ref on_demand) = request.on_demand_throughput {
460 let json = serde_json::to_string(on_demand)
461 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
462 storage
463 .update_on_demand_throughput(&request.table_name, &json)
464 .await?;
465 }
466
467 if let Some(ref billing_mode) = request.billing_mode {
469 storage
470 .update_billing_mode(&request.table_name, billing_mode)
471 .await?;
472 if billing_mode == "PAY_PER_REQUEST" {
473 storage
475 .clear_provisioned_throughput(&request.table_name)
476 .await?;
477 }
478 }
479
480 if let Some(ref spec) = request.stream_specification {
482 if spec.stream_enabled {
483 let view_type = spec
484 .stream_view_type
485 .as_deref()
486 .unwrap_or("NEW_AND_OLD_IMAGES");
487 let label = streams::generate_stream_label(storage.clock());
488 storage
489 .enable_stream(&request.table_name, view_type, &label)
490 .await?;
491 } else {
492 storage.disable_stream(&request.table_name).await?;
493 }
494 }
495
496 Ok(())
497 })
498 .await?;
499
500 let updated_meta = helpers::require_table(storage, &request.table_name).await?;
502 let mut desc = build_table_description(&updated_meta, Some(0), Some(0));
503
504 if is_pt_update {
506 desc.table_status = "UPDATING".to_string();
507
508 let stored = parse_stored_throughput(&updated_meta);
511 if let Some(ref mut pt) = desc.provisioned_throughput {
512 pt.read_capacity_units = cur_rcu as u64;
513 pt.write_capacity_units = cur_wcu as u64;
514 if let Some(ref s) = stored {
515 pt.last_increase_date_time = s.last_increase_date_time;
516 pt.last_decrease_date_time = s.last_decrease_date_time;
517 pt.number_of_decreases_today = s.number_of_decreases_today.unwrap_or(0);
518 }
519 }
520 }
521
522 Ok(UpdateTableResponse {
523 table_description: desc,
524 })
525}
526
527fn validate_update_request(request: &UpdateTableRequest) -> Result<()> {
532 let mut errors = Vec::new();
534
535 if let Some(ref pt) = request.provisioned_throughput {
537 if let Some(obj) = pt.as_object() {
538 let wcu = obj.get("WriteCapacityUnits");
539 let rcu = obj.get("ReadCapacityUnits");
540 if wcu.is_none() || wcu == Some(&serde_json::Value::Null) {
541 errors.push("Value null at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
542 } else if let Some(w) = wcu.and_then(|v| v.as_i64()) {
543 if w < 1 {
544 errors.push(format!("Value '{}' at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w));
545 }
546 }
547 if rcu.is_none() || rcu == Some(&serde_json::Value::Null) {
548 errors.push("Value null at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
549 } else if let Some(r) = rcu.and_then(|v| v.as_i64()) {
550 if r < 1 {
551 errors.push(format!("Value '{}' at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r));
552 }
553 }
554 }
555 }
556
557 if let Some(ref updates) = request.global_secondary_index_updates {
559 for (i, update) in updates.iter().enumerate() {
560 if let Some(ref upd) = update.update {
561 if upd.index_name.len() < 3 {
563 errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.indexName' failed to satisfy constraint: Member must have length greater than or equal to 3", upd.index_name, i + 1));
564 }
565 if !upd.index_name.is_empty()
566 && !upd
567 .index_name
568 .chars()
569 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
570 {
571 errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.indexName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", upd.index_name, i + 1));
572 }
573 if let Some(ref pt) = upd.provisioned_throughput {
575 let wcu = pt.write_capacity_units;
576 let rcu = pt.read_capacity_units;
577 if wcu.is_none() {
578 errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
579 } else if let Some(w) = wcu {
580 if w < 1 {
581 errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w, i + 1));
582 }
583 }
584 if rcu.is_none() {
585 errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
586 } else if let Some(r) = rcu {
587 if r < 1 {
588 errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r, i + 1));
589 }
590 }
591 } else {
592 errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput' failed to satisfy constraint: Member must not be null", i + 1));
593 }
594 }
595 }
596 }
597
598 errors.truncate(10);
600
601 if !errors.is_empty() {
602 let prefix = format!(
603 "{} validation error{} detected: ",
604 errors.len(),
605 if errors.len() == 1 { "" } else { "s" }
606 );
607 return Err(DynoxideError::ValidationException(format!(
608 "{}{}",
609 prefix,
610 errors.join("; ")
611 )));
612 }
613
614 if let Some(ref bm) = request.billing_mode {
618 if bm != "PROVISIONED" && bm != "PAY_PER_REQUEST" {
619 return Err(DynoxideError::ValidationException(format!(
620 "1 validation error detected: Value '{}' at 'billingMode' \
621 failed to satisfy constraint: Member must satisfy enum value set: \
622 [PROVISIONED, PAY_PER_REQUEST]",
623 bm
624 )));
625 }
626 }
627
628 if let Some(ref tc) = request.table_class {
630 if tc != "STANDARD" && tc != "STANDARD_INFREQUENT_ACCESS" {
631 return Err(DynoxideError::ValidationException(format!(
632 "1 validation error detected: Value '{tc}' at 'tableClass' failed to satisfy \
633 constraint: Member must satisfy enum value set: \
634 [STANDARD, STANDARD_INFREQUENT_ACCESS]"
635 )));
636 }
637 }
638
639 if request.billing_mode.as_deref() == Some("PAY_PER_REQUEST")
641 && request.provisioned_throughput.is_some()
642 {
643 return Err(DynoxideError::ValidationException(
644 "One or more parameter values were invalid: \
645 Neither ReadCapacityUnits nor WriteCapacityUnits can be \
646 specified when BillingMode is PAY_PER_REQUEST"
647 .to_string(),
648 ));
649 }
650
651 if let Some(ref pt) = request.provisioned_throughput {
653 if let Some(obj) = pt.as_object() {
654 let rcu = obj
655 .get("ReadCapacityUnits")
656 .and_then(|v| v.as_i64())
657 .unwrap_or(0);
658 let wcu = obj
659 .get("WriteCapacityUnits")
660 .and_then(|v| v.as_i64())
661 .unwrap_or(0);
662 const MAX_THROUGHPUT: i64 = 1_000_000_000_000;
663 if rcu > MAX_THROUGHPUT {
664 return Err(DynoxideError::ValidationException(format!(
665 "Given value {} for ReadCapacityUnits is out of bounds",
666 rcu
667 )));
668 }
669 if wcu > MAX_THROUGHPUT {
670 return Err(DynoxideError::ValidationException(format!(
671 "Given value {} for WriteCapacityUnits is out of bounds",
672 wcu
673 )));
674 }
675 }
676 }
677
678 let no_config_change = request.provisioned_throughput.is_none()
684 && request.billing_mode.is_none()
685 && request.stream_specification.is_none()
686 && request.deletion_protection_enabled.is_none()
687 && request.table_class.is_none()
688 && request.on_demand_throughput.is_none();
689 let no_gsi_change = request
690 .global_secondary_index_updates
691 .as_ref()
692 .is_none_or(|updates| updates.is_empty());
693 if no_gsi_change && no_config_change {
694 return Err(DynoxideError::ValidationException(
695 "At least one of ProvisionedThroughput, BillingMode, UpdateStreamEnabled, GlobalSecondaryIndexUpdates or SSESpecification or ReplicaUpdates is required".to_string(),
696 ));
697 }
698
699 if let Some(ref updates) = request.global_secondary_index_updates {
701 for update in updates {
703 if update.update.is_none() && update.create.is_none() && update.delete.is_none() {
704 return Err(DynoxideError::ValidationException(
705 "One or more parameter values were invalid: One of GlobalSecondaryIndexUpdate.Update, GlobalSecondaryIndexUpdate.Create, GlobalSecondaryIndexUpdate.Delete must not be null".to_string(),
706 ));
707 }
708 }
709
710 let mut seen_names = std::collections::HashSet::new();
712 for update in updates {
713 let name = if let Some(ref u) = update.update {
714 Some(u.index_name.as_str())
715 } else if let Some(ref c) = update.create {
716 Some(c.index_name.as_str())
717 } else {
718 update.delete.as_ref().map(|d| d.index_name.as_str())
719 };
720 if let Some(name) = name {
721 if !seen_names.insert(name.to_string()) {
722 return Err(DynoxideError::ValidationException(format!(
723 "One or more parameter values were invalid: Only one global secondary index update per index is allowed simultaneously. Index: {}",
724 name
725 )));
726 }
727 }
728 }
729 }
730
731 Ok(())
732}
733
734#[derive(Debug, Clone, Default, Serialize, Deserialize)]
736struct StoredProvisionedThroughput {
737 #[serde(rename = "ReadCapacityUnits")]
738 read_capacity_units: i64,
739 #[serde(rename = "WriteCapacityUnits")]
740 write_capacity_units: i64,
741 #[serde(
742 rename = "LastIncreaseDateTime",
743 skip_serializing_if = "Option::is_none"
744 )]
745 last_increase_date_time: Option<f64>,
746 #[serde(
747 rename = "LastDecreaseDateTime",
748 skip_serializing_if = "Option::is_none"
749 )]
750 last_decrease_date_time: Option<f64>,
751 #[serde(
752 rename = "NumberOfDecreasesToday",
753 skip_serializing_if = "Option::is_none"
754 )]
755 number_of_decreases_today: Option<u64>,
756}
757
758fn parse_current_throughput(meta: &crate::storage::TableMetadata) -> (i64, i64) {
760 parse_stored_throughput(meta)
761 .map(|pt| (pt.read_capacity_units, pt.write_capacity_units))
762 .unwrap_or((0, 0))
763}
764
765fn parse_stored_throughput(
767 meta: &crate::storage::TableMetadata,
768) -> Option<StoredProvisionedThroughput> {
769 meta.provisioned_throughput
770 .as_ref()
771 .and_then(|pt_json| serde_json::from_str(pt_json).ok())
772}
773
774async fn backfill_gsi<S: StorageBackend>(
776 storage: &S,
777 table_name: &str,
778 key_schema: &helpers::KeySchema,
779 gsi_def: &gsi::GsiDef,
780) -> Result<()> {
781 const BATCH_SIZE: usize = 1000;
782 let mut last_pk: Option<String> = None;
783 let mut last_sk: Option<String> = None;
784
785 loop {
786 let items = storage
787 .scan_items(
788 table_name,
789 &crate::storage::ScanParams {
790 limit: Some(BATCH_SIZE),
791 exclusive_start_pk: last_pk.as_deref(),
792 exclusive_start_sk: last_sk.as_deref(),
793 ..Default::default()
794 },
795 )
796 .await?;
797
798 if items.is_empty() {
799 break;
800 }
801
802 let mut rows: Vec<crate::storage_backend::GsiItemRow> = Vec::new();
803 for (pk, sk, item_json) in &items {
804 let item: crate::types::Item = serde_json::from_str(item_json)
805 .map_err(|e| DynoxideError::InternalServerError(format!("Bad item JSON: {e}")))?;
806
807 if let Some((gsi_pk, gsi_sk)) = gsi_def.index_key_strings(&item) {
809 let projected = gsi::build_index_item(
810 &item,
811 gsi_def,
812 &key_schema.partition_key,
813 key_schema.sort_key.as_deref(),
814 );
815 let projected_json = serde_json::to_string(&projected)
816 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
817
818 rows.push(crate::storage_backend::GsiItemRow {
819 gsi_pk,
820 gsi_sk,
821 table_pk: pk.clone(),
822 table_sk: sk.clone(),
823 item_json: projected_json,
824 });
825 }
826 }
827
828 storage
829 .insert_gsi_items(table_name, &gsi_def.index_name, &rows)
830 .await?;
831
832 let last = &items[items.len() - 1];
833 last_pk = Some(last.0.clone());
834 last_sk = Some(last.1.clone());
835
836 if items.len() < BATCH_SIZE {
837 break;
838 }
839 }
840
841 Ok(())
842}