Skip to main content

fakecloud_dynamodb/
state.rs

1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7/// A single DynamoDB attribute value (tagged union matching the AWS wire format).
8/// AWS sends attribute values as `{"S": "hello"}`, `{"N": "42"}`, etc.
9pub type AttributeValue = Value;
10
11/// Extract the "typed" inner value for comparison purposes.
12/// Returns (type_tag, inner_value) e.g. ("S", "hello") or ("N", "42").
13pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
14    let obj = av.as_object()?;
15    if obj.len() != 1 {
16        return None;
17    }
18    let (k, v) = obj.iter().next()?;
19    Some((k.as_str(), v))
20}
21
22#[derive(Debug, Clone)]
23pub struct KeySchemaElement {
24    pub attribute_name: String,
25    pub key_type: String, // HASH or RANGE
26}
27
28#[derive(Debug, Clone)]
29pub struct AttributeDefinition {
30    pub attribute_name: String,
31    pub attribute_type: String, // S, N, B
32}
33
34#[derive(Debug, Clone)]
35pub struct ProvisionedThroughput {
36    pub read_capacity_units: i64,
37    pub write_capacity_units: i64,
38}
39
40#[derive(Debug, Clone)]
41pub struct GlobalSecondaryIndex {
42    pub index_name: String,
43    pub key_schema: Vec<KeySchemaElement>,
44    pub projection: Projection,
45    pub provisioned_throughput: Option<ProvisionedThroughput>,
46}
47
48#[derive(Debug, Clone)]
49pub struct LocalSecondaryIndex {
50    pub index_name: String,
51    pub key_schema: Vec<KeySchemaElement>,
52    pub projection: Projection,
53}
54
55#[derive(Debug, Clone)]
56pub struct Projection {
57    pub projection_type: String, // ALL, KEYS_ONLY, INCLUDE
58    pub non_key_attributes: Vec<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct DynamoTable {
63    pub name: String,
64    pub arn: String,
65    pub key_schema: Vec<KeySchemaElement>,
66    pub attribute_definitions: Vec<AttributeDefinition>,
67    pub provisioned_throughput: ProvisionedThroughput,
68    pub items: Vec<HashMap<String, AttributeValue>>,
69    pub gsi: Vec<GlobalSecondaryIndex>,
70    pub lsi: Vec<LocalSecondaryIndex>,
71    pub tags: HashMap<String, String>,
72    pub created_at: DateTime<Utc>,
73    pub status: String,
74    pub item_count: i64,
75    pub size_bytes: i64,
76    pub billing_mode: String, // PROVISIONED or PAY_PER_REQUEST
77    pub ttl_attribute: Option<String>,
78    pub ttl_enabled: bool,
79    pub resource_policy: Option<String>,
80    /// PITR enabled
81    pub pitr_enabled: bool,
82    /// Kinesis streaming destinations: stream_arn -> status
83    pub kinesis_destinations: Vec<KinesisDestination>,
84    /// Contributor insights status
85    pub contributor_insights_status: String,
86    /// Contributor insights: partition key access counters (key_value_string -> count)
87    pub contributor_insights_counters: HashMap<String, u64>,
88}
89
90#[derive(Debug, Clone)]
91pub struct KinesisDestination {
92    pub stream_arn: String,
93    pub destination_status: String,
94    pub approximate_creation_date_time_precision: String,
95}
96
97#[derive(Debug, Clone)]
98pub struct BackupDescription {
99    pub backup_arn: String,
100    pub backup_name: String,
101    pub table_name: String,
102    pub table_arn: String,
103    pub backup_status: String,
104    pub backup_type: String,
105    pub backup_creation_date: DateTime<Utc>,
106    pub key_schema: Vec<KeySchemaElement>,
107    pub attribute_definitions: Vec<AttributeDefinition>,
108    pub provisioned_throughput: ProvisionedThroughput,
109    pub billing_mode: String,
110    pub item_count: i64,
111    pub size_bytes: i64,
112    /// Snapshot of the table items at backup creation time.
113    pub items: Vec<HashMap<String, AttributeValue>>,
114}
115
116#[derive(Debug, Clone)]
117pub struct GlobalTableDescription {
118    pub global_table_name: String,
119    pub global_table_arn: String,
120    pub global_table_status: String,
121    pub creation_date: DateTime<Utc>,
122    pub replication_group: Vec<ReplicaDescription>,
123}
124
125#[derive(Debug, Clone)]
126pub struct ReplicaDescription {
127    pub region_name: String,
128    pub replica_status: String,
129}
130
131#[derive(Debug, Clone)]
132pub struct ExportDescription {
133    pub export_arn: String,
134    pub export_status: String,
135    pub table_arn: String,
136    pub s3_bucket: String,
137    pub s3_prefix: Option<String>,
138    pub export_format: String,
139    pub start_time: DateTime<Utc>,
140    pub end_time: DateTime<Utc>,
141    pub export_time: DateTime<Utc>,
142    pub item_count: i64,
143    pub billed_size_bytes: i64,
144}
145
146#[derive(Debug, Clone)]
147pub struct ImportDescription {
148    pub import_arn: String,
149    pub import_status: String,
150    pub table_arn: String,
151    pub table_name: String,
152    pub s3_bucket_source: String,
153    pub input_format: String,
154    pub start_time: DateTime<Utc>,
155    pub end_time: DateTime<Utc>,
156    pub processed_item_count: i64,
157    pub processed_size_bytes: i64,
158}
159
160impl DynamoTable {
161    /// Get the hash key attribute name from the key schema.
162    pub fn hash_key_name(&self) -> &str {
163        self.key_schema
164            .iter()
165            .find(|k| k.key_type == "HASH")
166            .map(|k| k.attribute_name.as_str())
167            .unwrap_or("")
168    }
169
170    /// Get the range key attribute name from the key schema (if any).
171    pub fn range_key_name(&self) -> Option<&str> {
172        self.key_schema
173            .iter()
174            .find(|k| k.key_type == "RANGE")
175            .map(|k| k.attribute_name.as_str())
176    }
177
178    /// Find an item index by its primary key.
179    pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
180        let hash_key = self.hash_key_name();
181        let range_key = self.range_key_name();
182
183        self.items.iter().position(|item| {
184            let hash_match = match (item.get(hash_key), key.get(hash_key)) {
185                (Some(a), Some(b)) => a == b,
186                _ => false,
187            };
188            if !hash_match {
189                return false;
190            }
191            match range_key {
192                Some(rk) => match (item.get(rk), key.get(rk)) {
193                    (Some(a), Some(b)) => a == b,
194                    (None, None) => true,
195                    _ => false,
196                },
197                None => true,
198            }
199        })
200    }
201
202    /// Estimate item size in bytes (rough approximation).
203    fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
204        let mut size: i64 = 0;
205        for (k, v) in item {
206            size += k.len() as i64;
207            size += Self::estimate_value_size(v);
208        }
209        size
210    }
211
212    fn estimate_value_size(v: &Value) -> i64 {
213        match v {
214            Value::Object(obj) => {
215                if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
216                    s.len() as i64
217                } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
218                    n.len() as i64
219                } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
220                    1
221                } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
222                    3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
223                } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
224                    3 + m
225                        .iter()
226                        .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
227                        .sum::<i64>()
228                } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
229                    ss.iter()
230                        .filter_map(|v| v.as_str())
231                        .map(|s| s.len() as i64)
232                        .sum()
233                } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
234                    ns.iter()
235                        .filter_map(|v| v.as_str())
236                        .map(|s| s.len() as i64)
237                        .sum()
238                } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
239                    // Base64-encoded binary
240                    (b.len() as i64 * 3) / 4
241                } else {
242                    v.to_string().len() as i64
243                }
244            }
245            _ => v.to_string().len() as i64,
246        }
247    }
248
249    /// Record a partition key access for contributor insights.
250    /// Only records if contributor insights is enabled.
251    pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
252        if self.contributor_insights_status != "ENABLED" {
253            return;
254        }
255        let hash_key = self.hash_key_name().to_string();
256        if let Some(pk_value) = key.get(&hash_key) {
257            let key_str = pk_value.to_string();
258            *self
259                .contributor_insights_counters
260                .entry(key_str)
261                .or_insert(0) += 1;
262        }
263    }
264
265    /// Record a partition key access from a full item (extracts the key first).
266    pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
267        if self.contributor_insights_status != "ENABLED" {
268            return;
269        }
270        let hash_key = self.hash_key_name().to_string();
271        if let Some(pk_value) = item.get(&hash_key) {
272            let key_str = pk_value.to_string();
273            *self
274                .contributor_insights_counters
275                .entry(key_str)
276                .or_insert(0) += 1;
277        }
278    }
279
280    /// Get top N contributors sorted by access count (descending).
281    pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
282        let mut entries: Vec<(&str, u64)> = self
283            .contributor_insights_counters
284            .iter()
285            .map(|(k, &v)| (k.as_str(), v))
286            .collect();
287        entries.sort_by(|a, b| b.1.cmp(&a.1));
288        entries.truncate(n);
289        entries
290    }
291
292    /// Recalculate item_count and size_bytes from the items vec.
293    pub fn recalculate_stats(&mut self) {
294        self.item_count = self.items.len() as i64;
295        self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
296    }
297}
298
299pub struct DynamoDbState {
300    pub account_id: String,
301    pub region: String,
302    pub tables: HashMap<String, DynamoTable>,
303    pub backups: HashMap<String, BackupDescription>,
304    pub global_tables: HashMap<String, GlobalTableDescription>,
305    pub exports: HashMap<String, ExportDescription>,
306    pub imports: HashMap<String, ImportDescription>,
307}
308
309impl DynamoDbState {
310    pub fn new(account_id: &str, region: &str) -> Self {
311        Self {
312            account_id: account_id.to_string(),
313            region: region.to_string(),
314            tables: HashMap::new(),
315            backups: HashMap::new(),
316            global_tables: HashMap::new(),
317            exports: HashMap::new(),
318            imports: HashMap::new(),
319        }
320    }
321
322    pub fn reset(&mut self) {
323        self.tables.clear();
324        self.backups.clear();
325        self.global_tables.clear();
326        self.exports.clear();
327        self.imports.clear();
328    }
329}
330
331pub type SharedDynamoDbState = Arc<RwLock<DynamoDbState>>;