Skip to main content

fakecloud_dynamodb/
state.rs

1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7/// A single DynamoDB attribute value (tagged union matching the AWS wire format).
8/// AWS sends attribute values as `{"S": "hello"}`, `{"N": "42"}`, etc.
9pub type AttributeValue = Value;
10
11/// Extract the "typed" inner value for comparison purposes.
12/// Returns (type_tag, inner_value) e.g. ("S", "hello") or ("N", "42").
13pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
14    let obj = av.as_object()?;
15    if obj.len() != 1 {
16        return None;
17    }
18    let (k, v) = obj.iter().next()?;
19    Some((k.as_str(), v))
20}
21
22#[derive(Debug, Clone)]
23pub struct KeySchemaElement {
24    pub attribute_name: String,
25    pub key_type: String, // HASH or RANGE
26}
27
28#[derive(Debug, Clone)]
29pub struct AttributeDefinition {
30    pub attribute_name: String,
31    pub attribute_type: String, // S, N, B
32}
33
34#[derive(Debug, Clone)]
35pub struct ProvisionedThroughput {
36    pub read_capacity_units: i64,
37    pub write_capacity_units: i64,
38}
39
40#[derive(Debug, Clone)]
41pub struct GlobalSecondaryIndex {
42    pub index_name: String,
43    pub key_schema: Vec<KeySchemaElement>,
44    pub projection: Projection,
45    pub provisioned_throughput: Option<ProvisionedThroughput>,
46}
47
48#[derive(Debug, Clone)]
49pub struct LocalSecondaryIndex {
50    pub index_name: String,
51    pub key_schema: Vec<KeySchemaElement>,
52    pub projection: Projection,
53}
54
55#[derive(Debug, Clone)]
56pub struct Projection {
57    pub projection_type: String, // ALL, KEYS_ONLY, INCLUDE
58    pub non_key_attributes: Vec<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct DynamoTable {
63    pub name: String,
64    pub arn: String,
65    pub key_schema: Vec<KeySchemaElement>,
66    pub attribute_definitions: Vec<AttributeDefinition>,
67    pub provisioned_throughput: ProvisionedThroughput,
68    pub items: Vec<HashMap<String, AttributeValue>>,
69    pub gsi: Vec<GlobalSecondaryIndex>,
70    pub lsi: Vec<LocalSecondaryIndex>,
71    pub tags: HashMap<String, String>,
72    pub created_at: DateTime<Utc>,
73    pub status: String,
74    pub item_count: i64,
75    pub size_bytes: i64,
76    pub billing_mode: String, // PROVISIONED or PAY_PER_REQUEST
77    pub ttl_attribute: Option<String>,
78    pub ttl_enabled: bool,
79    pub resource_policy: Option<String>,
80    /// PITR enabled
81    pub pitr_enabled: bool,
82    /// Kinesis streaming destinations: stream_arn -> status
83    pub kinesis_destinations: Vec<KinesisDestination>,
84    /// Contributor insights status
85    pub contributor_insights_status: String,
86}
87
88#[derive(Debug, Clone)]
89pub struct KinesisDestination {
90    pub stream_arn: String,
91    pub destination_status: String,
92    pub approximate_creation_date_time_precision: String,
93}
94
95#[derive(Debug, Clone)]
96pub struct BackupDescription {
97    pub backup_arn: String,
98    pub backup_name: String,
99    pub table_name: String,
100    pub table_arn: String,
101    pub backup_status: String,
102    pub backup_type: String,
103    pub backup_creation_date: DateTime<Utc>,
104    pub key_schema: Vec<KeySchemaElement>,
105    pub attribute_definitions: Vec<AttributeDefinition>,
106    pub provisioned_throughput: ProvisionedThroughput,
107    pub billing_mode: String,
108    pub item_count: i64,
109    pub size_bytes: i64,
110}
111
112#[derive(Debug, Clone)]
113pub struct GlobalTableDescription {
114    pub global_table_name: String,
115    pub global_table_arn: String,
116    pub global_table_status: String,
117    pub creation_date: DateTime<Utc>,
118    pub replication_group: Vec<ReplicaDescription>,
119}
120
121#[derive(Debug, Clone)]
122pub struct ReplicaDescription {
123    pub region_name: String,
124    pub replica_status: String,
125}
126
127#[derive(Debug, Clone)]
128pub struct ExportDescription {
129    pub export_arn: String,
130    pub export_status: String,
131    pub table_arn: String,
132    pub s3_bucket: String,
133    pub s3_prefix: Option<String>,
134    pub export_format: String,
135    pub start_time: DateTime<Utc>,
136    pub end_time: DateTime<Utc>,
137    pub export_time: DateTime<Utc>,
138    pub item_count: i64,
139    pub billed_size_bytes: i64,
140}
141
142#[derive(Debug, Clone)]
143pub struct ImportDescription {
144    pub import_arn: String,
145    pub import_status: String,
146    pub table_arn: String,
147    pub table_name: String,
148    pub s3_bucket_source: String,
149    pub input_format: String,
150    pub start_time: DateTime<Utc>,
151    pub end_time: DateTime<Utc>,
152    pub processed_item_count: i64,
153    pub processed_size_bytes: i64,
154}
155
156impl DynamoTable {
157    /// Get the hash key attribute name from the key schema.
158    pub fn hash_key_name(&self) -> &str {
159        self.key_schema
160            .iter()
161            .find(|k| k.key_type == "HASH")
162            .map(|k| k.attribute_name.as_str())
163            .unwrap_or("")
164    }
165
166    /// Get the range key attribute name from the key schema (if any).
167    pub fn range_key_name(&self) -> Option<&str> {
168        self.key_schema
169            .iter()
170            .find(|k| k.key_type == "RANGE")
171            .map(|k| k.attribute_name.as_str())
172    }
173
174    /// Find an item index by its primary key.
175    pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
176        let hash_key = self.hash_key_name();
177        let range_key = self.range_key_name();
178
179        self.items.iter().position(|item| {
180            let hash_match = match (item.get(hash_key), key.get(hash_key)) {
181                (Some(a), Some(b)) => a == b,
182                _ => false,
183            };
184            if !hash_match {
185                return false;
186            }
187            match range_key {
188                Some(rk) => match (item.get(rk), key.get(rk)) {
189                    (Some(a), Some(b)) => a == b,
190                    (None, None) => true,
191                    _ => false,
192                },
193                None => true,
194            }
195        })
196    }
197
198    /// Estimate item size in bytes (rough approximation).
199    fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
200        let mut size: i64 = 0;
201        for (k, v) in item {
202            size += k.len() as i64;
203            size += Self::estimate_value_size(v);
204        }
205        size
206    }
207
208    fn estimate_value_size(v: &Value) -> i64 {
209        match v {
210            Value::Object(obj) => {
211                if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
212                    s.len() as i64
213                } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
214                    n.len() as i64
215                } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
216                    1
217                } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
218                    3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
219                } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
220                    3 + m
221                        .iter()
222                        .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
223                        .sum::<i64>()
224                } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
225                    ss.iter()
226                        .filter_map(|v| v.as_str())
227                        .map(|s| s.len() as i64)
228                        .sum()
229                } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
230                    ns.iter()
231                        .filter_map(|v| v.as_str())
232                        .map(|s| s.len() as i64)
233                        .sum()
234                } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
235                    // Base64-encoded binary
236                    (b.len() as i64 * 3) / 4
237                } else {
238                    v.to_string().len() as i64
239                }
240            }
241            _ => v.to_string().len() as i64,
242        }
243    }
244
245    /// Recalculate item_count and size_bytes from the items vec.
246    pub fn recalculate_stats(&mut self) {
247        self.item_count = self.items.len() as i64;
248        self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
249    }
250}
251
252pub struct DynamoDbState {
253    pub account_id: String,
254    pub region: String,
255    pub tables: HashMap<String, DynamoTable>,
256    pub backups: HashMap<String, BackupDescription>,
257    pub global_tables: HashMap<String, GlobalTableDescription>,
258    pub exports: HashMap<String, ExportDescription>,
259    pub imports: HashMap<String, ImportDescription>,
260}
261
262impl DynamoDbState {
263    pub fn new(account_id: &str, region: &str) -> Self {
264        Self {
265            account_id: account_id.to_string(),
266            region: region.to_string(),
267            tables: HashMap::new(),
268            backups: HashMap::new(),
269            global_tables: HashMap::new(),
270            exports: HashMap::new(),
271            imports: HashMap::new(),
272        }
273    }
274
275    pub fn reset(&mut self) {
276        self.tables.clear();
277        self.backups.clear();
278        self.global_tables.clear();
279        self.exports.clear();
280        self.imports.clear();
281    }
282}
283
284pub type SharedDynamoDbState = Arc<RwLock<DynamoDbState>>;