1use crate::actions::create_table::StreamSpecification;
2use crate::actions::{TableDescription, build_table_description};
3use crate::actions::{gsi, helpers};
4use crate::errors::{DynoxideError, Result};
5use crate::storage::Storage;
6use crate::streams;
7use crate::types::{AttributeDefinition, GlobalSecondaryIndex, KeySchemaElement, Projection};
8use crate::validation;
9use rusqlite;
10use serde::{Deserialize, Serialize};
11
12#[derive(Debug, Default, Deserialize)]
14struct UpdateTableRequestRaw {
15 #[serde(rename = "TableName", default)]
16 table_name: Option<String>,
17
18 #[serde(rename = "AttributeDefinitions", default)]
19 attribute_definitions: Option<Vec<AttributeDefinition>>,
20
21 #[serde(rename = "GlobalSecondaryIndexUpdates", default)]
22 global_secondary_index_updates: Option<Vec<GlobalSecondaryIndexUpdate>>,
23
24 #[serde(rename = "StreamSpecification", default)]
25 stream_specification: Option<StreamSpecification>,
26
27 #[serde(rename = "DeletionProtectionEnabled", default)]
28 deletion_protection_enabled: Option<bool>,
29
30 #[serde(rename = "ProvisionedThroughput", default)]
31 provisioned_throughput: Option<serde_json::Value>,
32
33 #[serde(rename = "BillingMode", default)]
34 billing_mode: Option<String>,
35}
36
37#[derive(Debug, Default)]
38pub struct UpdateTableRequest {
39 pub table_name: String,
40 pub attribute_definitions: Option<Vec<AttributeDefinition>>,
41 pub global_secondary_index_updates: Option<Vec<GlobalSecondaryIndexUpdate>>,
42 pub stream_specification: Option<StreamSpecification>,
43 pub deletion_protection_enabled: Option<bool>,
44 pub provisioned_throughput: Option<serde_json::Value>,
45 pub billing_mode: Option<String>,
46}
47
48impl<'de> serde::Deserialize<'de> for UpdateTableRequest {
49 fn deserialize<D: serde::Deserializer<'de>>(
50 deserializer: D,
51 ) -> std::result::Result<Self, D::Error> {
52 let raw = UpdateTableRequestRaw::deserialize(deserializer)?;
53
54 if raw.table_name.is_none() || raw.table_name.as_deref() == Some("") {
56 let msg = if raw.table_name.is_none() {
57 "The parameter 'TableName' is required but was not present in the request"
58 } else {
59 "TableName must be at least 3 characters long and at most 255 characters long"
60 };
61 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
62 }
63
64 let table_name = raw.table_name.unwrap_or_default();
65
66 if table_name.len() < 3 || table_name.len() > 255 {
68 return Err(serde::de::Error::custom(
69 "VALIDATION:TableName must be at least 3 characters long and at most 255 characters long",
70 ));
71 }
72
73 let mut errors = Vec::new();
75
76 if !table_name
77 .chars()
78 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
79 {
80 errors.push(format!(
81 "Value '{}' at 'tableName' failed to satisfy constraint: \
82 Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+",
83 table_name
84 ));
85 }
86
87 if let Some(msg) = crate::validation::format_validation_errors(&errors) {
88 return Err(serde::de::Error::custom(format!("VALIDATION:{}", msg)));
89 }
90
91 Ok(UpdateTableRequest {
92 table_name,
93 attribute_definitions: raw.attribute_definitions,
94 global_secondary_index_updates: raw.global_secondary_index_updates,
95 stream_specification: raw.stream_specification,
96 deletion_protection_enabled: raw.deletion_protection_enabled,
97 provisioned_throughput: raw.provisioned_throughput,
98 billing_mode: raw.billing_mode,
99 })
100 }
101}
102
103#[derive(Debug, Default, Deserialize)]
104pub struct GlobalSecondaryIndexUpdate {
105 #[serde(rename = "Update", default)]
106 pub update: Option<UpdateGsiAction>,
107
108 #[serde(rename = "Create", default)]
109 pub create: Option<CreateGsiAction>,
110
111 #[serde(rename = "Delete", default)]
112 pub delete: Option<DeleteGsiAction>,
113}
114
115#[derive(Debug, Default, Deserialize)]
116pub struct UpdateGsiAction {
117 #[serde(rename = "IndexName")]
118 pub index_name: String,
119
120 #[serde(rename = "ProvisionedThroughput", default)]
121 pub provisioned_throughput: Option<crate::types::ProvisionedThroughput>,
122}
123
124#[derive(Debug, Default, Deserialize)]
125pub struct CreateGsiAction {
126 #[serde(rename = "IndexName")]
127 pub index_name: String,
128
129 #[serde(rename = "KeySchema")]
130 pub key_schema: Vec<KeySchemaElement>,
131
132 #[serde(rename = "Projection")]
133 pub projection: Projection,
134}
135
136#[derive(Debug, Default, Deserialize)]
137pub struct DeleteGsiAction {
138 #[serde(rename = "IndexName")]
139 pub index_name: String,
140}
141
142#[derive(Debug, Default, Serialize)]
143pub struct UpdateTableResponse {
144 #[serde(rename = "TableDescription")]
145 pub table_description: TableDescription,
146}
147
148pub fn execute(storage: &Storage, request: UpdateTableRequest) -> Result<UpdateTableResponse> {
149 validate_update_request(&request)?;
155
156 let meta = helpers::require_table(storage, &request.table_name)?;
158
159 let current_billing_mode = meta.billing_mode.as_deref().unwrap_or("PROVISIONED");
160
161 if current_billing_mode == "PAY_PER_REQUEST"
165 && request.billing_mode.is_none()
166 && request.provisioned_throughput.is_some()
167 {
168 return Err(DynoxideError::ValidationException(
169 "One or more parameter values were invalid: \
170 Neither ReadCapacityUnits nor WriteCapacityUnits can be \
171 specified when BillingMode is PAY_PER_REQUEST"
172 .to_string(),
173 ));
174 }
175
176 if request.billing_mode.as_deref() == Some("PROVISIONED")
178 && request.provisioned_throughput.is_none()
179 {
180 return Err(DynoxideError::ValidationException(
181 "One or more parameter values were invalid: \
182 ProvisionedThroughput must be specified when BillingMode is PROVISIONED"
183 .to_string(),
184 ));
185 }
186
187 if let Some(ref pt) = request.provisioned_throughput {
189 if let Some(obj) = pt.as_object() {
190 let new_rcu = obj
191 .get("ReadCapacityUnits")
192 .and_then(|v| v.as_i64())
193 .unwrap_or(0);
194 let new_wcu = obj
195 .get("WriteCapacityUnits")
196 .and_then(|v| v.as_i64())
197 .unwrap_or(0);
198
199 let (cur_rcu, cur_wcu) = parse_current_throughput(&meta);
201
202 let billing_mode_unchanged = request.billing_mode.is_none()
203 || (request.billing_mode.as_deref() == Some("PROVISIONED")
204 && current_billing_mode == "PROVISIONED");
205
206 if new_rcu == cur_rcu && new_wcu == cur_wcu && billing_mode_unchanged {
207 return Err(DynoxideError::ValidationException(format!(
208 "The provisioned throughput for the table will not change. \
209 The requested value equals the current value. \
210 Current ReadCapacityUnits provisioned for the table: {}. \
211 Requested ReadCapacityUnits: {}. \
212 Current WriteCapacityUnits provisioned for the table: {}. \
213 Requested WriteCapacityUnits: {}. \
214 Refer to the Amazon DynamoDB Developer Guide for current limits \
215 and how to request higher limits.",
216 cur_rcu, new_rcu, cur_wcu, new_wcu
217 )));
218 }
219 }
220 }
221
222 let mut current_gsis: Vec<GlobalSecondaryIndex> = meta
224 .gsi_definitions
225 .as_ref()
226 .map(|json| serde_json::from_str(json))
227 .transpose()
228 .map_err(|e| DynoxideError::InternalServerError(format!("Bad GSI JSON: {e}")))?
229 .unwrap_or_default();
230
231 if let Some(ref updates) = request.global_secondary_index_updates {
233 for update in updates {
234 if let Some(ref upd) = update.update {
235 if !current_gsis.iter().any(|g| g.index_name == upd.index_name) {
236 return Err(DynoxideError::ValidationException(
239 "This operation cannot be performed with given input values. \
240 Please contact DynamoDB service team for more info: \
241 Action Blocked: IndexUpdate"
242 .to_string(),
243 ));
244 }
245 }
246 }
247 }
248
249 if let Some(ref updates) = request.global_secondary_index_updates {
251 if updates.len() > 5 {
252 return Err(DynoxideError::LimitExceededException(
253 "Subscriber limit exceeded: Only 1 online index can be created or \
254 deleted simultaneously per table"
255 .to_string(),
256 ));
257 }
258 }
259
260 let existing_attr_defs: Vec<AttributeDefinition> =
262 serde_json::from_str(&meta.attribute_definitions)
263 .map_err(|e| DynoxideError::InternalServerError(format!("Bad attr defs JSON: {e}")))?;
264
265 let attr_defs = request
266 .attribute_definitions
267 .as_ref()
268 .unwrap_or(&existing_attr_defs);
269
270 let key_schema = helpers::parse_key_schema(&meta)?;
272
273 if let Some(ref updates) = request.global_secondary_index_updates {
275 for update in updates {
276 if let Some(ref create) = update.create {
277 if current_gsis
278 .iter()
279 .any(|g| g.index_name == create.index_name)
280 {
281 return Err(DynoxideError::ValidationException(format!(
282 "One or more parameter values were invalid: \
283 Index already exists: {}",
284 create.index_name
285 )));
286 }
287 let gsi_def = GlobalSecondaryIndex {
288 index_name: create.index_name.clone(),
289 key_schema: create.key_schema.clone(),
290 projection: create.projection.clone(),
291 provisioned_throughput: None,
292 };
293 validation::validate_gsi(&gsi_def, attr_defs)?;
294 }
295 if let Some(ref delete) = update.delete {
296 if !current_gsis
297 .iter()
298 .any(|g| g.index_name == delete.index_name)
299 {
300 return Err(DynoxideError::ResourceNotFoundException(format!(
301 "Requested resource not found: Table: {} not found",
302 delete.index_name
303 )));
304 }
305 }
306 }
307 }
308
309 let now = {
313 use std::sync::atomic::{AtomicU64, Ordering};
314 static LAST_TS: AtomicU64 = AtomicU64::new(0);
315 let wall = std::time::SystemTime::now()
316 .duration_since(std::time::UNIX_EPOCH)
317 .unwrap_or_default()
318 .as_secs_f64();
319 loop {
320 let prev_bits = LAST_TS.load(Ordering::SeqCst);
321 let prev_f = f64::from_bits(prev_bits);
322 let candidate = if wall > prev_f { wall } else { prev_f + 0.001 };
323 let candidate_bits = candidate.to_bits();
324 if LAST_TS
325 .compare_exchange(
326 prev_bits,
327 candidate_bits,
328 Ordering::SeqCst,
329 Ordering::SeqCst,
330 )
331 .is_ok()
332 {
333 break candidate;
334 }
335 }
336 };
337
338 let (cur_rcu, cur_wcu) = parse_current_throughput(&meta);
339 let is_pt_update = request.provisioned_throughput.is_some();
340 let (new_rcu, new_wcu) = if let Some(ref pt) = request.provisioned_throughput {
341 let obj = pt.as_object();
342 (
343 obj.and_then(|o| o.get("ReadCapacityUnits"))
344 .and_then(|v| v.as_i64())
345 .unwrap_or(0),
346 obj.and_then(|o| o.get("WriteCapacityUnits"))
347 .and_then(|v| v.as_i64())
348 .unwrap_or(0),
349 )
350 } else {
351 (cur_rcu, cur_wcu)
352 };
353
354 let is_increase = new_rcu > cur_rcu || new_wcu > cur_wcu;
355 let is_decrease = new_rcu < cur_rcu || new_wcu < cur_wcu;
356
357 storage.begin_transaction()?;
359
360 let result = (|| -> Result<()> {
361 if let Some(ref updates) = request.global_secondary_index_updates {
362 for update in updates {
363 if let Some(ref create) = update.create {
364 let gsi_def = GlobalSecondaryIndex {
365 index_name: create.index_name.clone(),
366 key_schema: create.key_schema.clone(),
367 projection: create.projection.clone(),
368 provisioned_throughput: None,
369 };
370
371 storage.create_gsi_table(&request.table_name, &create.index_name)?;
372
373 let gsi_p = gsi::gsi_to_def(&gsi_def)?;
374 backfill_gsi(storage, &request.table_name, &key_schema, &gsi_p)?;
375
376 current_gsis.push(gsi_def);
377 }
378
379 if let Some(ref delete) = update.delete {
380 storage.drop_gsi_table(&request.table_name, &delete.index_name)?;
381 current_gsis.retain(|g| g.index_name != delete.index_name);
382 }
383 }
384 }
385
386 let attr_defs_json = serde_json::to_string(attr_defs)
388 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
389 let gsi_json = if current_gsis.is_empty() {
390 None
391 } else {
392 Some(
393 serde_json::to_string(¤t_gsis)
394 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?,
395 )
396 };
397
398 storage.update_table_metadata(&request.table_name, &attr_defs_json, gsi_json.as_deref())?;
399
400 if is_pt_update {
402 let prev = parse_stored_throughput(&meta);
403 let mut stored = StoredProvisionedThroughput {
404 read_capacity_units: new_rcu,
405 write_capacity_units: new_wcu,
406 last_increase_date_time: prev.as_ref().and_then(|p| p.last_increase_date_time),
407 last_decrease_date_time: prev.as_ref().and_then(|p| p.last_decrease_date_time),
408 number_of_decreases_today: prev
409 .as_ref()
410 .and_then(|p| p.number_of_decreases_today)
411 .or(Some(0)),
412 };
413 if is_increase {
414 stored.last_increase_date_time = Some(now);
415 }
416 if is_decrease {
417 stored.last_decrease_date_time = Some(now);
418 stored.number_of_decreases_today =
419 Some(stored.number_of_decreases_today.unwrap_or(0) + 1);
420 }
421 let pt_json = serde_json::to_string(&stored)
422 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
423 storage.update_provisioned_throughput(&request.table_name, &pt_json)?;
424 }
425
426 if let Some(enabled) = request.deletion_protection_enabled {
428 storage.update_deletion_protection(&request.table_name, enabled)?;
429 }
430
431 if let Some(ref billing_mode) = request.billing_mode {
433 storage.update_billing_mode(&request.table_name, billing_mode)?;
434 if billing_mode == "PAY_PER_REQUEST" {
435 storage.clear_provisioned_throughput(&request.table_name)?;
437 }
438 }
439
440 if let Some(ref spec) = request.stream_specification {
442 if spec.stream_enabled {
443 let view_type = spec
444 .stream_view_type
445 .as_deref()
446 .unwrap_or("NEW_AND_OLD_IMAGES");
447 let label = streams::generate_stream_label();
448 storage.enable_stream(&request.table_name, view_type, &label)?;
449 } else {
450 storage.disable_stream(&request.table_name)?;
451 }
452 }
453
454 Ok(())
455 })();
456
457 match result {
458 Ok(()) => storage.commit()?,
459 Err(e) => {
460 let _ = storage.rollback();
461 return Err(e);
462 }
463 }
464
465 let updated_meta = helpers::require_table(storage, &request.table_name)?;
467 let mut desc = build_table_description(&updated_meta, Some(0), Some(0));
468
469 if is_pt_update {
471 desc.table_status = "UPDATING".to_string();
472
473 let stored = parse_stored_throughput(&updated_meta);
476 if let Some(ref mut pt) = desc.provisioned_throughput {
477 pt.read_capacity_units = cur_rcu as u64;
478 pt.write_capacity_units = cur_wcu as u64;
479 if let Some(ref s) = stored {
480 pt.last_increase_date_time = s.last_increase_date_time;
481 pt.last_decrease_date_time = s.last_decrease_date_time;
482 pt.number_of_decreases_today = s.number_of_decreases_today.unwrap_or(0);
483 }
484 }
485 }
486
487 Ok(UpdateTableResponse {
488 table_description: desc,
489 })
490}
491
492fn validate_update_request(request: &UpdateTableRequest) -> Result<()> {
497 let mut errors = Vec::new();
499
500 if let Some(ref pt) = request.provisioned_throughput {
502 if let Some(obj) = pt.as_object() {
503 let wcu = obj.get("WriteCapacityUnits");
504 let rcu = obj.get("ReadCapacityUnits");
505 if wcu.is_none() || wcu == Some(&serde_json::Value::Null) {
506 errors.push("Value null at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
507 } else if let Some(w) = wcu.and_then(|v| v.as_i64()) {
508 if w < 1 {
509 errors.push(format!("Value '{}' at 'provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w));
510 }
511 }
512 if rcu.is_none() || rcu == Some(&serde_json::Value::Null) {
513 errors.push("Value null at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null".to_string());
514 } else if let Some(r) = rcu.and_then(|v| v.as_i64()) {
515 if r < 1 {
516 errors.push(format!("Value '{}' at 'provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r));
517 }
518 }
519 }
520 }
521
522 if let Some(ref updates) = request.global_secondary_index_updates {
524 for (i, update) in updates.iter().enumerate() {
525 if let Some(ref upd) = update.update {
526 if upd.index_name.len() < 3 {
528 errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.indexName' failed to satisfy constraint: Member must have length greater than or equal to 3", upd.index_name, i + 1));
529 }
530 if !upd.index_name.is_empty()
531 && !upd
532 .index_name
533 .chars()
534 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.')
535 {
536 errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.indexName' failed to satisfy constraint: Member must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", upd.index_name, i + 1));
537 }
538 if let Some(ref pt) = upd.provisioned_throughput {
540 let wcu = pt.write_capacity_units;
541 let rcu = pt.read_capacity_units;
542 if wcu.is_none() {
543 errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
544 } else if let Some(w) = wcu {
545 if w < 1 {
546 errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.writeCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", w, i + 1));
547 }
548 }
549 if rcu.is_none() {
550 errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must not be null", i + 1));
551 } else if let Some(r) = rcu {
552 if r < 1 {
553 errors.push(format!("Value '{}' at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput.readCapacityUnits' failed to satisfy constraint: Member must have value greater than or equal to 1", r, i + 1));
554 }
555 }
556 } else {
557 errors.push(format!("Value null at 'globalSecondaryIndexUpdates.{}.member.update.provisionedThroughput' failed to satisfy constraint: Member must not be null", i + 1));
558 }
559 }
560 }
561 }
562
563 errors.truncate(10);
565
566 if !errors.is_empty() {
567 let prefix = format!(
568 "{} validation error{} detected: ",
569 errors.len(),
570 if errors.len() == 1 { "" } else { "s" }
571 );
572 return Err(DynoxideError::ValidationException(format!(
573 "{}{}",
574 prefix,
575 errors.join("; ")
576 )));
577 }
578
579 if let Some(ref bm) = request.billing_mode {
583 if bm != "PROVISIONED" && bm != "PAY_PER_REQUEST" {
584 return Err(DynoxideError::ValidationException(format!(
585 "1 validation error detected: Value '{}' at 'billingMode' \
586 failed to satisfy constraint: Member must satisfy enum value set: \
587 [PROVISIONED, PAY_PER_REQUEST]",
588 bm
589 )));
590 }
591 }
592
593 if request.billing_mode.as_deref() == Some("PAY_PER_REQUEST")
595 && request.provisioned_throughput.is_some()
596 {
597 return Err(DynoxideError::ValidationException(
598 "One or more parameter values were invalid: \
599 Neither ReadCapacityUnits nor WriteCapacityUnits can be \
600 specified when BillingMode is PAY_PER_REQUEST"
601 .to_string(),
602 ));
603 }
604
605 if let Some(ref pt) = request.provisioned_throughput {
607 if let Some(obj) = pt.as_object() {
608 let rcu = obj
609 .get("ReadCapacityUnits")
610 .and_then(|v| v.as_i64())
611 .unwrap_or(0);
612 let wcu = obj
613 .get("WriteCapacityUnits")
614 .and_then(|v| v.as_i64())
615 .unwrap_or(0);
616 const MAX_THROUGHPUT: i64 = 1_000_000_000_000;
617 if rcu > MAX_THROUGHPUT {
618 return Err(DynoxideError::ValidationException(format!(
619 "Given value {} for ReadCapacityUnits is out of bounds",
620 rcu
621 )));
622 }
623 if wcu > MAX_THROUGHPUT {
624 return Err(DynoxideError::ValidationException(format!(
625 "Given value {} for WriteCapacityUnits is out of bounds",
626 wcu
627 )));
628 }
629 }
630 }
631
632 if let Some(ref updates) = request.global_secondary_index_updates {
634 if updates.is_empty() {
635 if request.provisioned_throughput.is_none()
637 && request.billing_mode.is_none()
638 && request.stream_specification.is_none()
639 {
640 return Err(DynoxideError::ValidationException(
641 "At least one of ProvisionedThroughput, BillingMode, UpdateStreamEnabled, GlobalSecondaryIndexUpdates or SSESpecification or ReplicaUpdates is required".to_string(),
642 ));
643 }
644 }
645 } else if request.provisioned_throughput.is_none()
646 && request.billing_mode.is_none()
647 && request.stream_specification.is_none()
648 && request.deletion_protection_enabled.is_none()
649 {
650 return Err(DynoxideError::ValidationException(
651 "At least one of ProvisionedThroughput, BillingMode, UpdateStreamEnabled, GlobalSecondaryIndexUpdates or SSESpecification or ReplicaUpdates is required".to_string(),
652 ));
653 }
654
655 if let Some(ref updates) = request.global_secondary_index_updates {
657 for update in updates {
659 if update.update.is_none() && update.create.is_none() && update.delete.is_none() {
660 return Err(DynoxideError::ValidationException(
661 "One or more parameter values were invalid: One of GlobalSecondaryIndexUpdate.Update, GlobalSecondaryIndexUpdate.Create, GlobalSecondaryIndexUpdate.Delete must not be null".to_string(),
662 ));
663 }
664 }
665
666 let mut seen_names = std::collections::HashSet::new();
668 for update in updates {
669 let name = if let Some(ref u) = update.update {
670 Some(u.index_name.as_str())
671 } else if let Some(ref c) = update.create {
672 Some(c.index_name.as_str())
673 } else {
674 update.delete.as_ref().map(|d| d.index_name.as_str())
675 };
676 if let Some(name) = name {
677 if !seen_names.insert(name.to_string()) {
678 return Err(DynoxideError::ValidationException(format!(
679 "One or more parameter values were invalid: Only one global secondary index update per index is allowed simultaneously. Index: {}",
680 name
681 )));
682 }
683 }
684 }
685 }
686
687 Ok(())
688}
689
690#[derive(Debug, Clone, Default, Serialize, Deserialize)]
692struct StoredProvisionedThroughput {
693 #[serde(rename = "ReadCapacityUnits")]
694 read_capacity_units: i64,
695 #[serde(rename = "WriteCapacityUnits")]
696 write_capacity_units: i64,
697 #[serde(
698 rename = "LastIncreaseDateTime",
699 skip_serializing_if = "Option::is_none"
700 )]
701 last_increase_date_time: Option<f64>,
702 #[serde(
703 rename = "LastDecreaseDateTime",
704 skip_serializing_if = "Option::is_none"
705 )]
706 last_decrease_date_time: Option<f64>,
707 #[serde(
708 rename = "NumberOfDecreasesToday",
709 skip_serializing_if = "Option::is_none"
710 )]
711 number_of_decreases_today: Option<u64>,
712}
713
714fn parse_current_throughput(meta: &crate::storage::TableMetadata) -> (i64, i64) {
716 parse_stored_throughput(meta)
717 .map(|pt| (pt.read_capacity_units, pt.write_capacity_units))
718 .unwrap_or((0, 0))
719}
720
721fn parse_stored_throughput(
723 meta: &crate::storage::TableMetadata,
724) -> Option<StoredProvisionedThroughput> {
725 meta.provisioned_throughput
726 .as_ref()
727 .and_then(|pt_json| serde_json::from_str(pt_json).ok())
728}
729
730fn backfill_gsi(
732 storage: &Storage,
733 table_name: &str,
734 key_schema: &helpers::KeySchema,
735 gsi_def: &gsi::GsiDef,
736) -> Result<()> {
737 const BATCH_SIZE: usize = 1000;
738 let mut last_pk: Option<String> = None;
739 let mut last_sk: Option<String> = None;
740
741 let gsi_table_name = format!("{}::gsi::{}", table_name, gsi_def.index_name);
742 let insert_sql = format!(
743 "INSERT OR REPLACE INTO \"{}\" (gsi_pk, gsi_sk, table_pk, table_sk, item_json) \
744 VALUES (?1, ?2, ?3, ?4, ?5)",
745 crate::storage::escape_table_name(&gsi_table_name)
746 );
747 let mut stmt = storage
748 .conn()
749 .prepare_cached(&insert_sql)
750 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
751
752 loop {
753 let items = storage.scan_items(
754 table_name,
755 &crate::storage::ScanParams {
756 limit: Some(BATCH_SIZE),
757 exclusive_start_pk: last_pk.as_deref(),
758 exclusive_start_sk: last_sk.as_deref(),
759 ..Default::default()
760 },
761 )?;
762
763 if items.is_empty() {
764 break;
765 }
766
767 for (pk, sk, item_json) in &items {
768 let item: crate::types::Item = serde_json::from_str(item_json)
769 .map_err(|e| DynoxideError::InternalServerError(format!("Bad item JSON: {e}")))?;
770
771 if let Some(gsi_pk_val) = item.get(&gsi_def.pk_attr) {
772 let gsi_pk = gsi_pk_val.to_key_string().unwrap_or_default();
773 let gsi_sk = gsi_def
774 .sk_attr
775 .as_ref()
776 .and_then(|sk_attr| item.get(sk_attr))
777 .and_then(|v| v.to_key_string())
778 .unwrap_or_default();
779
780 let projected = gsi::build_index_item(
781 &item,
782 gsi_def,
783 &key_schema.partition_key,
784 key_schema.sort_key.as_deref(),
785 );
786 let projected_json = serde_json::to_string(&projected)
787 .map_err(|e| DynoxideError::InternalServerError(e.to_string()))?;
788
789 stmt.execute(rusqlite::params![gsi_pk, gsi_sk, pk, sk, projected_json])
790 .map_err(DynoxideError::from)?;
791 }
792 }
793
794 let last = &items[items.len() - 1];
795 last_pk = Some(last.0.clone());
796 last_sk = Some(last.1.clone());
797
798 if items.len() < BATCH_SIZE {
799 break;
800 }
801 }
802
803 Ok(())
804}