Skip to main content

dynoxide/actions/
mod.rs

1pub mod batch_execute_statement;
2pub mod batch_get_item;
3pub mod batch_write_item;
4pub mod create_table;
5pub mod delete_item;
6pub mod delete_table;
7pub mod describe_stream;
8pub mod describe_table;
9pub mod describe_time_to_live;
10pub mod execute_statement;
11pub mod execute_transaction;
12pub mod get_item;
13pub mod get_records;
14pub mod get_shard_iterator;
15pub(crate) mod gsi;
16pub(crate) mod helpers;
17pub mod import_items;
18pub mod list_streams;
19pub mod list_tables;
20pub mod list_tags_of_resource;
21pub(crate) mod lsi;
22pub mod put_item;
23pub mod query;
24pub mod scan;
25pub mod tag_resource;
26pub mod transact_get_items;
27pub mod transact_write_items;
28pub mod untag_resource;
29pub mod update_item;
30pub mod update_table;
31pub mod update_time_to_live;
32
33use crate::types::{
34    AttributeDefinition, GlobalSecondaryIndex, KeySchemaElement, LocalSecondaryIndex, Projection,
35};
36use serde::{Deserialize, Serialize};
37
38/// Full table description returned by DescribeTable and CreateTable.
39#[derive(Debug, Clone, Default, Serialize, Deserialize)]
40pub struct TableDescription {
41    #[serde(rename = "TableName")]
42    pub table_name: String,
43
44    #[serde(rename = "TableId", skip_serializing_if = "Option::is_none")]
45    pub table_id: Option<String>,
46
47    #[serde(rename = "TableArn")]
48    pub table_arn: String,
49
50    #[serde(rename = "TableStatus")]
51    pub table_status: String,
52
53    #[serde(rename = "KeySchema")]
54    pub key_schema: Vec<KeySchemaElement>,
55
56    #[serde(rename = "AttributeDefinitions")]
57    pub attribute_definitions: Vec<AttributeDefinition>,
58
59    #[serde(rename = "CreationDateTime", skip_serializing_if = "Option::is_none")]
60    pub creation_date_time: Option<f64>,
61
62    #[serde(rename = "ItemCount", skip_serializing_if = "Option::is_none")]
63    pub item_count: Option<i64>,
64
65    #[serde(rename = "TableSizeBytes", skip_serializing_if = "Option::is_none")]
66    pub table_size_bytes: Option<i64>,
67
68    #[serde(
69        rename = "ProvisionedThroughput",
70        skip_serializing_if = "Option::is_none"
71    )]
72    pub provisioned_throughput: Option<TableProvisionedThroughputDescription>,
73
74    #[serde(rename = "BillingModeSummary", skip_serializing_if = "Option::is_none")]
75    pub billing_mode_summary: Option<BillingModeSummary>,
76
77    #[serde(
78        rename = "TableThroughputModeSummary",
79        skip_serializing_if = "Option::is_none"
80    )]
81    pub table_throughput_mode_summary: Option<TableThroughputModeSummary>,
82
83    #[serde(
84        rename = "GlobalSecondaryIndexes",
85        skip_serializing_if = "Option::is_none"
86    )]
87    pub global_secondary_indexes: Option<Vec<GlobalSecondaryIndexDescription>>,
88
89    #[serde(
90        rename = "LocalSecondaryIndexes",
91        skip_serializing_if = "Option::is_none"
92    )]
93    pub local_secondary_indexes: Option<Vec<LocalSecondaryIndexDescription>>,
94
95    #[serde(
96        rename = "StreamSpecification",
97        skip_serializing_if = "Option::is_none"
98    )]
99    pub stream_specification: Option<StreamSpecificationDescription>,
100
101    #[serde(rename = "LatestStreamArn", skip_serializing_if = "Option::is_none")]
102    pub latest_stream_arn: Option<String>,
103
104    #[serde(rename = "LatestStreamLabel", skip_serializing_if = "Option::is_none")]
105    pub latest_stream_label: Option<String>,
106
107    #[serde(rename = "SSEDescription", skip_serializing_if = "Option::is_none")]
108    pub sse_description: Option<SseDescription>,
109
110    #[serde(rename = "TableClassSummary", skip_serializing_if = "Option::is_none")]
111    pub table_class_summary: Option<TableClassSummary>,
112
113    #[serde(
114        rename = "DeletionProtectionEnabled",
115        skip_serializing_if = "Option::is_none"
116    )]
117    pub deletion_protection_enabled: Option<bool>,
118
119    #[serde(rename = "OnDemandThroughput", skip_serializing_if = "Option::is_none")]
120    pub on_demand_throughput: Option<crate::types::OnDemandThroughput>,
121}
122
123/// SSE description returned in TableDescription.
124#[derive(Debug, Clone, Default, Serialize, Deserialize)]
125pub struct SseDescription {
126    #[serde(rename = "Status")]
127    pub status: String,
128    #[serde(rename = "SSEType", skip_serializing_if = "Option::is_none")]
129    pub sse_type: Option<String>,
130    #[serde(rename = "KMSMasterKeyArn", skip_serializing_if = "Option::is_none")]
131    pub kms_master_key_arn: Option<String>,
132}
133
134/// Stream specification description returned in TableDescription.
135#[derive(Debug, Clone, Default, Serialize, Deserialize)]
136pub struct StreamSpecificationDescription {
137    #[serde(rename = "StreamEnabled")]
138    pub stream_enabled: bool,
139    #[serde(rename = "StreamViewType", skip_serializing_if = "Option::is_none")]
140    pub stream_view_type: Option<String>,
141}
142
143/// Table class summary returned in TableDescription.
144#[derive(Debug, Clone, Default, Serialize, Deserialize)]
145pub struct TableClassSummary {
146    #[serde(rename = "TableClass")]
147    pub table_class: String,
148}
149
150/// Provisioned throughput description (with additional metadata fields).
151#[derive(Debug, Clone, Default, Serialize, Deserialize)]
152pub struct TableProvisionedThroughputDescription {
153    #[serde(rename = "ReadCapacityUnits")]
154    pub read_capacity_units: u64,
155    #[serde(rename = "WriteCapacityUnits")]
156    pub write_capacity_units: u64,
157    #[serde(rename = "NumberOfDecreasesToday")]
158    pub number_of_decreases_today: u64,
159    #[serde(
160        rename = "LastIncreaseDateTime",
161        skip_serializing_if = "Option::is_none"
162    )]
163    pub last_increase_date_time: Option<f64>,
164    #[serde(
165        rename = "LastDecreaseDateTime",
166        skip_serializing_if = "Option::is_none"
167    )]
168    pub last_decrease_date_time: Option<f64>,
169}
170
171/// Billing mode summary.
172#[derive(Debug, Clone, Default, Serialize, Deserialize)]
173pub struct BillingModeSummary {
174    #[serde(rename = "BillingMode")]
175    pub billing_mode: String,
176    #[serde(
177        rename = "LastUpdateToPayPerRequestDateTime",
178        skip_serializing_if = "Option::is_none"
179    )]
180    pub last_update_to_pay_per_request_date_time: Option<f64>,
181}
182
183/// Table throughput mode summary.
184#[derive(Debug, Clone, Default, Serialize, Deserialize)]
185pub struct TableThroughputModeSummary {
186    #[serde(rename = "TableThroughputMode")]
187    pub table_throughput_mode: String,
188    #[serde(
189        rename = "LastUpdateToPayPerRequestDateTime",
190        skip_serializing_if = "Option::is_none"
191    )]
192    pub last_update_to_pay_per_request_date_time: Option<f64>,
193}
194
195/// GSI description (returned in TableDescription).
196#[derive(Debug, Clone, Default, Serialize, Deserialize)]
197pub struct GlobalSecondaryIndexDescription {
198    #[serde(rename = "IndexName")]
199    pub index_name: String,
200    #[serde(rename = "IndexArn")]
201    pub index_arn: String,
202    #[serde(rename = "KeySchema")]
203    pub key_schema: Vec<KeySchemaElement>,
204    #[serde(rename = "Projection")]
205    pub projection: Projection,
206    #[serde(rename = "IndexStatus")]
207    pub index_status: String,
208    #[serde(
209        rename = "ProvisionedThroughput",
210        skip_serializing_if = "Option::is_none"
211    )]
212    pub provisioned_throughput: Option<TableProvisionedThroughputDescription>,
213    #[serde(rename = "ItemCount", skip_serializing_if = "Option::is_none")]
214    pub item_count: Option<i64>,
215    #[serde(rename = "IndexSizeBytes", skip_serializing_if = "Option::is_none")]
216    pub index_size_bytes: Option<i64>,
217}
218
219/// LSI description (returned in TableDescription).
220#[derive(Debug, Clone, Default, Serialize, Deserialize)]
221pub struct LocalSecondaryIndexDescription {
222    #[serde(rename = "IndexName")]
223    pub index_name: String,
224    #[serde(rename = "IndexArn")]
225    pub index_arn: String,
226    #[serde(rename = "KeySchema")]
227    pub key_schema: Vec<KeySchemaElement>,
228    #[serde(rename = "Projection")]
229    pub projection: Projection,
230    #[serde(rename = "ItemCount", skip_serializing_if = "Option::is_none")]
231    pub item_count: Option<i64>,
232    #[serde(rename = "IndexSizeBytes", skip_serializing_if = "Option::is_none")]
233    pub index_size_bytes: Option<i64>,
234}
235
236/// Deterministic fallback `TableId` for a table that has no persisted id.
237///
238/// The primary scheme persists a random v4 UUID assigned at create time (see
239/// `Storage::insert_table_metadata` and the v8 migration's backfill), matching
240/// AWS: stable across reads, and a new id for a dropped-and-recreated table.
241/// This fallback only applies to a row whose stored `table_id` is `None` — for
242/// example one written by an older binary into a newer database. It derives a
243/// stable UUID (v5) from the table name and creation timestamp so such a row
244/// still reports a consistent id across reads, rather than the per-call random
245/// value that #55 fixed.
246fn table_id_for(table_name: &str, created_at: i64) -> String {
247    uuid::Uuid::new_v5(
248        &uuid::Uuid::NAMESPACE_OID,
249        format!("dynoxide:table:{table_name}:{created_at}").as_bytes(),
250    )
251    .to_string()
252}
253
254/// Helper: Build a TableDescription from stored metadata.
255pub(crate) fn build_table_description(
256    meta: &crate::storage::TableMetadata,
257    item_count: Option<i64>,
258    table_size_bytes: Option<i64>,
259) -> TableDescription {
260    use crate::streams;
261
262    let key_schema: Vec<KeySchemaElement> =
263        serde_json::from_str(&meta.key_schema).unwrap_or_default();
264    let attribute_definitions: Vec<AttributeDefinition> =
265        serde_json::from_str(&meta.attribute_definitions).unwrap_or_default();
266
267    let gsi_definitions: Option<Vec<GlobalSecondaryIndex>> = meta
268        .gsi_definitions
269        .as_ref()
270        .and_then(|s| serde_json::from_str(s).ok());
271
272    let table_name = &meta.table_name;
273
274    let global_secondary_indexes = gsi_definitions.map(|gsis| {
275        gsis.into_iter()
276            .map(|gsi| {
277                let idx_arn = streams::index_arn(table_name, &gsi.index_name);
278                GlobalSecondaryIndexDescription {
279                    index_name: gsi.index_name,
280                    index_arn: idx_arn,
281                    key_schema: gsi.key_schema,
282                    projection: gsi.projection,
283                    index_status: "ACTIVE".to_string(),
284                    provisioned_throughput: Some(if let Some(pt) = gsi.provisioned_throughput {
285                        TableProvisionedThroughputDescription {
286                            read_capacity_units: pt.read_capacity_units.unwrap_or(0) as u64,
287                            write_capacity_units: pt.write_capacity_units.unwrap_or(0) as u64,
288                            number_of_decreases_today: 0,
289                            last_increase_date_time: None,
290                            last_decrease_date_time: None,
291                        }
292                    } else {
293                        // PAY_PER_REQUEST or no PT specified
294                        TableProvisionedThroughputDescription {
295                            read_capacity_units: 0,
296                            write_capacity_units: 0,
297                            number_of_decreases_today: 0,
298                            last_increase_date_time: None,
299                            last_decrease_date_time: None,
300                        }
301                    }),
302                    item_count: Some(0),
303                    index_size_bytes: Some(0),
304                }
305            })
306            .collect()
307    });
308
309    let lsi_definitions: Option<Vec<LocalSecondaryIndex>> = meta
310        .lsi_definitions
311        .as_ref()
312        .and_then(|s| serde_json::from_str(s).ok());
313
314    let local_secondary_indexes = lsi_definitions.map(|lsis| {
315        lsis.into_iter()
316            .map(|lsi| {
317                let idx_arn = streams::index_arn(table_name, &lsi.index_name);
318                LocalSecondaryIndexDescription {
319                    index_name: lsi.index_name,
320                    index_arn: idx_arn,
321                    key_schema: lsi.key_schema,
322                    projection: lsi.projection,
323                    item_count: Some(0),
324                    index_size_bytes: Some(0),
325                }
326            })
327            .collect()
328    });
329
330    let billing_mode = meta.billing_mode.clone();
331
332    let provisioned_throughput = if let Some(pt_json) = &meta.provisioned_throughput {
333        // Try parsing extended format (with timestamps) first, fall back to basic
334        serde_json::from_str::<serde_json::Value>(pt_json)
335            .ok()
336            .map(|v| {
337                let rcu = v
338                    .get("ReadCapacityUnits")
339                    .and_then(|v| v.as_i64())
340                    .unwrap_or(0) as u64;
341                let wcu = v
342                    .get("WriteCapacityUnits")
343                    .and_then(|v| v.as_i64())
344                    .unwrap_or(0) as u64;
345                let last_inc = v.get("LastIncreaseDateTime").and_then(|v| v.as_f64());
346                let last_dec = v.get("LastDecreaseDateTime").and_then(|v| v.as_f64());
347                let num_dec = v
348                    .get("NumberOfDecreasesToday")
349                    .and_then(|v| v.as_u64())
350                    .unwrap_or(0);
351                TableProvisionedThroughputDescription {
352                    read_capacity_units: rcu,
353                    write_capacity_units: wcu,
354                    number_of_decreases_today: num_dec,
355                    last_increase_date_time: last_inc,
356                    last_decrease_date_time: last_dec,
357                }
358            })
359    } else if billing_mode.as_deref() == Some("PAY_PER_REQUEST") {
360        // PAY_PER_REQUEST tables have zero provisioned throughput
361        Some(TableProvisionedThroughputDescription {
362            read_capacity_units: 0,
363            write_capacity_units: 0,
364            number_of_decreases_today: 0,
365            last_increase_date_time: None,
366            last_decrease_date_time: None,
367        })
368    } else {
369        None
370    };
371
372    let stream_specification = if meta.stream_enabled {
373        Some(StreamSpecificationDescription {
374            stream_enabled: true,
375            stream_view_type: meta.stream_view_type.clone(),
376        })
377    } else {
378        None
379    };
380
381    let latest_stream_arn = if meta.stream_enabled {
382        meta.stream_label
383            .as_ref()
384            .map(|label| streams::stream_arn(table_name, label))
385    } else {
386        None
387    };
388
389    let latest_stream_label = if meta.stream_enabled {
390        meta.stream_label.clone()
391    } else {
392        None
393    };
394
395    // Build SSE description from stored specification. When encryption is
396    // enabled, AWS reports SSEType=KMS and a KMS key ARN even if the request
397    // only set Enabled=true, so default those here rather than dropping them.
398    let sse_description = meta.sse_specification.as_ref().and_then(|json| {
399        serde_json::from_str::<crate::types::SseSpecification>(json)
400            .ok()
401            .map(|spec| {
402                let enabled = spec.enabled.unwrap_or(false);
403                SseDescription {
404                    status: if enabled { "ENABLED" } else { "DISABLED" }.to_string(),
405                    sse_type: if enabled {
406                        spec.sse_type.or_else(|| Some("KMS".to_string()))
407                    } else {
408                        spec.sse_type
409                    },
410                    kms_master_key_arn: if enabled {
411                        // New tables persist a synthesised key id at create time;
412                        // fall back to a table-derived key for any older row that
413                        // enabled SSE without one, so the ARN stays stable across
414                        // repeated DescribeTable calls rather than changing each read.
415                        Some(
416                            spec.kms_master_key_id
417                                .map(|id| streams::kms_key_arn(&id))
418                                .unwrap_or_else(|| streams::kms_key_arn(table_name)),
419                        )
420                    } else {
421                        None
422                    },
423                }
424            })
425    });
426
427    let table_class_summary = meta.table_class.as_ref().map(|tc| TableClassSummary {
428        table_class: tc.clone(),
429    });
430
431    let on_demand_throughput = meta
432        .on_demand_throughput
433        .as_ref()
434        .and_then(|json| serde_json::from_str::<crate::types::OnDemandThroughput>(json).ok());
435
436    let deletion_protection_enabled = Some(meta.deletion_protection_enabled);
437
438    TableDescription {
439        table_name: meta.table_name.clone(),
440        table_id: Some(
441            meta.table_id
442                .clone()
443                .unwrap_or_else(|| table_id_for(&meta.table_name, meta.created_at)),
444        ),
445        table_arn: streams::table_arn(table_name),
446        table_status: meta.table_status.clone(),
447        key_schema,
448        attribute_definitions,
449        creation_date_time: Some(meta.created_at as f64),
450        item_count,
451        table_size_bytes,
452        provisioned_throughput,
453        billing_mode_summary: match billing_mode.as_deref() {
454            Some("PAY_PER_REQUEST") => Some(BillingModeSummary {
455                billing_mode: "PAY_PER_REQUEST".to_string(),
456                last_update_to_pay_per_request_date_time: None,
457            }),
458            _ => None,
459        },
460        table_throughput_mode_summary: match billing_mode.as_deref() {
461            Some("PAY_PER_REQUEST") => Some(TableThroughputModeSummary {
462                table_throughput_mode: "PAY_PER_REQUEST".to_string(),
463                last_update_to_pay_per_request_date_time: None,
464            }),
465            _ => None,
466        },
467        global_secondary_indexes,
468        local_secondary_indexes,
469        stream_specification,
470        latest_stream_arn,
471        latest_stream_label,
472        sse_description,
473        table_class_summary,
474        deletion_protection_enabled,
475        on_demand_throughput,
476    }
477}