Skip to main content

fakecloud_dynamodb/
state.rs

1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7/// A single DynamoDB attribute value (tagged union matching the AWS wire format).
8/// AWS sends attribute values as `{"S": "hello"}`, `{"N": "42"}`, etc.
9pub type AttributeValue = Value;
10
11/// Extract the "typed" inner value for comparison purposes.
12/// Returns (type_tag, inner_value) e.g. ("S", "hello") or ("N", "42").
13pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
14    let obj = av.as_object()?;
15    if obj.len() != 1 {
16        return None;
17    }
18    let (k, v) = obj.iter().next()?;
19    Some((k.as_str(), v))
20}
21
22#[derive(Debug, Clone)]
23pub struct KeySchemaElement {
24    pub attribute_name: String,
25    pub key_type: String, // HASH or RANGE
26}
27
28#[derive(Debug, Clone)]
29pub struct AttributeDefinition {
30    pub attribute_name: String,
31    pub attribute_type: String, // S, N, B
32}
33
34#[derive(Debug, Clone)]
35pub struct ProvisionedThroughput {
36    pub read_capacity_units: i64,
37    pub write_capacity_units: i64,
38}
39
40#[derive(Debug, Clone)]
41pub struct GlobalSecondaryIndex {
42    pub index_name: String,
43    pub key_schema: Vec<KeySchemaElement>,
44    pub projection: Projection,
45    pub provisioned_throughput: Option<ProvisionedThroughput>,
46}
47
48#[derive(Debug, Clone)]
49pub struct LocalSecondaryIndex {
50    pub index_name: String,
51    pub key_schema: Vec<KeySchemaElement>,
52    pub projection: Projection,
53}
54
55#[derive(Debug, Clone)]
56pub struct Projection {
57    pub projection_type: String, // ALL, KEYS_ONLY, INCLUDE
58    pub non_key_attributes: Vec<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct DynamoTable {
63    pub name: String,
64    pub arn: String,
65    pub table_id: String,
66    pub key_schema: Vec<KeySchemaElement>,
67    pub attribute_definitions: Vec<AttributeDefinition>,
68    pub provisioned_throughput: ProvisionedThroughput,
69    pub items: Vec<HashMap<String, AttributeValue>>,
70    pub gsi: Vec<GlobalSecondaryIndex>,
71    pub lsi: Vec<LocalSecondaryIndex>,
72    pub tags: HashMap<String, String>,
73    pub created_at: DateTime<Utc>,
74    pub status: String,
75    pub item_count: i64,
76    pub size_bytes: i64,
77    pub billing_mode: String, // PROVISIONED or PAY_PER_REQUEST
78    pub ttl_attribute: Option<String>,
79    pub ttl_enabled: bool,
80    pub resource_policy: Option<String>,
81    /// PITR enabled
82    pub pitr_enabled: bool,
83    /// Kinesis streaming destinations: stream_arn -> status
84    pub kinesis_destinations: Vec<KinesisDestination>,
85    /// Contributor insights status
86    pub contributor_insights_status: String,
87    /// Contributor insights: partition key access counters (key_value_string -> count)
88    pub contributor_insights_counters: HashMap<String, u64>,
89    /// DynamoDB Streams configuration
90    pub stream_enabled: bool,
91    pub stream_view_type: Option<String>, // KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES
92    pub stream_arn: Option<String>,
93    /// Stream records (retained for 24 hours)
94    pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
95    /// Server-side encryption type: AES256 (owned) or KMS
96    pub sse_type: Option<String>,
97    /// KMS key ARN for SSE (only when sse_type is KMS)
98    pub sse_kms_key_arn: Option<String>,
99    /// Deletion protection: when true, DeleteTable is rejected with
100    /// `ResourceInUseException`. Defaults to false. Returned on every
101    /// `DescribeTable` and toggleable via `UpdateTable`.
102    pub deletion_protection_enabled: bool,
103}
104
105#[derive(Debug, Clone)]
106pub struct StreamRecord {
107    pub event_id: String,
108    pub event_name: String, // INSERT, MODIFY, REMOVE
109    pub event_version: String,
110    pub event_source: String,
111    pub aws_region: String,
112    pub dynamodb: DynamoDbStreamRecord,
113    pub event_source_arn: String,
114    pub timestamp: DateTime<Utc>,
115}
116
117#[derive(Debug, Clone)]
118pub struct DynamoDbStreamRecord {
119    pub keys: HashMap<String, AttributeValue>,
120    pub new_image: Option<HashMap<String, AttributeValue>>,
121    pub old_image: Option<HashMap<String, AttributeValue>>,
122    pub sequence_number: String,
123    pub size_bytes: i64,
124    pub stream_view_type: String,
125}
126
127#[derive(Debug, Clone)]
128pub struct KinesisDestination {
129    pub stream_arn: String,
130    pub destination_status: String,
131    pub approximate_creation_date_time_precision: String,
132}
133
134#[derive(Debug, Clone)]
135pub struct BackupDescription {
136    pub backup_arn: String,
137    pub backup_name: String,
138    pub table_name: String,
139    pub table_arn: String,
140    pub backup_status: String,
141    pub backup_type: String,
142    pub backup_creation_date: DateTime<Utc>,
143    pub key_schema: Vec<KeySchemaElement>,
144    pub attribute_definitions: Vec<AttributeDefinition>,
145    pub provisioned_throughput: ProvisionedThroughput,
146    pub billing_mode: String,
147    pub item_count: i64,
148    pub size_bytes: i64,
149    /// Snapshot of the table items at backup creation time.
150    pub items: Vec<HashMap<String, AttributeValue>>,
151}
152
153#[derive(Debug, Clone)]
154pub struct GlobalTableDescription {
155    pub global_table_name: String,
156    pub global_table_arn: String,
157    pub global_table_status: String,
158    pub creation_date: DateTime<Utc>,
159    pub replication_group: Vec<ReplicaDescription>,
160}
161
162#[derive(Debug, Clone)]
163pub struct ReplicaDescription {
164    pub region_name: String,
165    pub replica_status: String,
166}
167
168#[derive(Debug, Clone)]
169pub struct ExportDescription {
170    pub export_arn: String,
171    pub export_status: String,
172    pub table_arn: String,
173    pub s3_bucket: String,
174    pub s3_prefix: Option<String>,
175    pub export_format: String,
176    pub start_time: DateTime<Utc>,
177    pub end_time: DateTime<Utc>,
178    pub export_time: DateTime<Utc>,
179    pub item_count: i64,
180    pub billed_size_bytes: i64,
181}
182
183#[derive(Debug, Clone)]
184pub struct ImportDescription {
185    pub import_arn: String,
186    pub import_status: String,
187    pub table_arn: String,
188    pub table_name: String,
189    pub s3_bucket_source: String,
190    pub input_format: String,
191    pub start_time: DateTime<Utc>,
192    pub end_time: DateTime<Utc>,
193    pub processed_item_count: i64,
194    pub processed_size_bytes: i64,
195}
196
197impl DynamoTable {
198    /// Get the hash key attribute name from the key schema.
199    pub fn hash_key_name(&self) -> &str {
200        self.key_schema
201            .iter()
202            .find(|k| k.key_type == "HASH")
203            .map(|k| k.attribute_name.as_str())
204            .unwrap_or("")
205    }
206
207    /// Get the range key attribute name from the key schema (if any).
208    pub fn range_key_name(&self) -> Option<&str> {
209        self.key_schema
210            .iter()
211            .find(|k| k.key_type == "RANGE")
212            .map(|k| k.attribute_name.as_str())
213    }
214
215    /// Find an item index by its primary key.
216    pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
217        let hash_key = self.hash_key_name();
218        let range_key = self.range_key_name();
219
220        self.items.iter().position(|item| {
221            let hash_match = match (item.get(hash_key), key.get(hash_key)) {
222                (Some(a), Some(b)) => a == b,
223                _ => false,
224            };
225            if !hash_match {
226                return false;
227            }
228            match range_key {
229                Some(rk) => match (item.get(rk), key.get(rk)) {
230                    (Some(a), Some(b)) => a == b,
231                    (None, None) => true,
232                    _ => false,
233                },
234                None => true,
235            }
236        })
237    }
238
239    /// Estimate item size in bytes (rough approximation).
240    fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
241        let mut size: i64 = 0;
242        for (k, v) in item {
243            size += k.len() as i64;
244            size += Self::estimate_value_size(v);
245        }
246        size
247    }
248
249    fn estimate_value_size(v: &Value) -> i64 {
250        match v {
251            Value::Object(obj) => {
252                if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
253                    s.len() as i64
254                } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
255                    n.len() as i64
256                } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
257                    1
258                } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
259                    3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
260                } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
261                    3 + m
262                        .iter()
263                        .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
264                        .sum::<i64>()
265                } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
266                    ss.iter()
267                        .filter_map(|v| v.as_str())
268                        .map(|s| s.len() as i64)
269                        .sum()
270                } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
271                    ns.iter()
272                        .filter_map(|v| v.as_str())
273                        .map(|s| s.len() as i64)
274                        .sum()
275                } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
276                    // Base64-encoded binary
277                    (b.len() as i64 * 3) / 4
278                } else {
279                    v.to_string().len() as i64
280                }
281            }
282            _ => v.to_string().len() as i64,
283        }
284    }
285
286    /// Record a partition key access for contributor insights.
287    /// Only records if contributor insights is enabled.
288    pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
289        if self.contributor_insights_status != "ENABLED" {
290            return;
291        }
292        let hash_key = self.hash_key_name().to_string();
293        if let Some(pk_value) = key.get(&hash_key) {
294            let key_str = pk_value.to_string();
295            *self
296                .contributor_insights_counters
297                .entry(key_str)
298                .or_insert(0) += 1;
299        }
300    }
301
302    /// Record a partition key access from a full item (extracts the key first).
303    pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
304        if self.contributor_insights_status != "ENABLED" {
305            return;
306        }
307        let hash_key = self.hash_key_name().to_string();
308        if let Some(pk_value) = item.get(&hash_key) {
309            let key_str = pk_value.to_string();
310            *self
311                .contributor_insights_counters
312                .entry(key_str)
313                .or_insert(0) += 1;
314        }
315    }
316
317    /// Get top N contributors sorted by access count (descending).
318    pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
319        let mut entries: Vec<(&str, u64)> = self
320            .contributor_insights_counters
321            .iter()
322            .map(|(k, &v)| (k.as_str(), v))
323            .collect();
324        entries.sort_by(|a, b| b.1.cmp(&a.1));
325        entries.truncate(n);
326        entries
327    }
328
329    /// Recalculate item_count and size_bytes from the items vec.
330    pub fn recalculate_stats(&mut self) {
331        self.item_count = self.items.len() as i64;
332        self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
333    }
334}
335
336pub struct DynamoDbState {
337    pub account_id: String,
338    pub region: String,
339    pub tables: HashMap<String, DynamoTable>,
340    pub backups: HashMap<String, BackupDescription>,
341    pub global_tables: HashMap<String, GlobalTableDescription>,
342    pub exports: HashMap<String, ExportDescription>,
343    pub imports: HashMap<String, ImportDescription>,
344}
345
346impl DynamoDbState {
347    pub fn new(account_id: &str, region: &str) -> Self {
348        Self {
349            account_id: account_id.to_string(),
350            region: region.to_string(),
351            tables: HashMap::new(),
352            backups: HashMap::new(),
353            global_tables: HashMap::new(),
354            exports: HashMap::new(),
355            imports: HashMap::new(),
356        }
357    }
358
359    pub fn reset(&mut self) {
360        self.tables.clear();
361        self.backups.clear();
362        self.global_tables.clear();
363        self.exports.clear();
364        self.imports.clear();
365    }
366}
367
368pub type SharedDynamoDbState = Arc<RwLock<DynamoDbState>>;