Skip to main content

fakecloud_dynamodb/
state.rs

1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7/// A single DynamoDB attribute value (tagged union matching the AWS wire format).
8/// AWS sends attribute values as `{"S": "hello"}`, `{"N": "42"}`, etc.
9pub type AttributeValue = Value;
10
11/// Extract the "typed" inner value for comparison purposes.
12/// Returns (type_tag, inner_value) e.g. ("S", "hello") or ("N", "42").
13pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
14    let obj = av.as_object()?;
15    if obj.len() != 1 {
16        return None;
17    }
18    let (k, v) = obj.iter().next()?;
19    Some((k.as_str(), v))
20}
21
22#[derive(Debug, Clone)]
23pub struct KeySchemaElement {
24    pub attribute_name: String,
25    pub key_type: String, // HASH or RANGE
26}
27
28#[derive(Debug, Clone)]
29pub struct AttributeDefinition {
30    pub attribute_name: String,
31    pub attribute_type: String, // S, N, B
32}
33
34#[derive(Debug, Clone)]
35pub struct ProvisionedThroughput {
36    pub read_capacity_units: i64,
37    pub write_capacity_units: i64,
38}
39
40#[derive(Debug, Clone)]
41pub struct GlobalSecondaryIndex {
42    pub index_name: String,
43    pub key_schema: Vec<KeySchemaElement>,
44    pub projection: Projection,
45    pub provisioned_throughput: Option<ProvisionedThroughput>,
46}
47
48#[derive(Debug, Clone)]
49pub struct LocalSecondaryIndex {
50    pub index_name: String,
51    pub key_schema: Vec<KeySchemaElement>,
52    pub projection: Projection,
53}
54
55#[derive(Debug, Clone)]
56pub struct Projection {
57    pub projection_type: String, // ALL, KEYS_ONLY, INCLUDE
58    pub non_key_attributes: Vec<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct DynamoTable {
63    pub name: String,
64    pub arn: String,
65    pub table_id: String,
66    pub key_schema: Vec<KeySchemaElement>,
67    pub attribute_definitions: Vec<AttributeDefinition>,
68    pub provisioned_throughput: ProvisionedThroughput,
69    pub items: Vec<HashMap<String, AttributeValue>>,
70    pub gsi: Vec<GlobalSecondaryIndex>,
71    pub lsi: Vec<LocalSecondaryIndex>,
72    pub tags: HashMap<String, String>,
73    pub created_at: DateTime<Utc>,
74    pub status: String,
75    pub item_count: i64,
76    pub size_bytes: i64,
77    pub billing_mode: String, // PROVISIONED or PAY_PER_REQUEST
78    pub ttl_attribute: Option<String>,
79    pub ttl_enabled: bool,
80    pub resource_policy: Option<String>,
81    /// PITR enabled
82    pub pitr_enabled: bool,
83    /// Kinesis streaming destinations: stream_arn -> status
84    pub kinesis_destinations: Vec<KinesisDestination>,
85    /// Contributor insights status
86    pub contributor_insights_status: String,
87    /// Contributor insights: partition key access counters (key_value_string -> count)
88    pub contributor_insights_counters: HashMap<String, u64>,
89    /// DynamoDB Streams configuration
90    pub stream_enabled: bool,
91    pub stream_view_type: Option<String>, // KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES
92    pub stream_arn: Option<String>,
93    /// Stream records (retained for 24 hours)
94    pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
95    /// Server-side encryption type: AES256 (owned) or KMS
96    pub sse_type: Option<String>,
97    /// KMS key ARN for SSE (only when sse_type is KMS)
98    pub sse_kms_key_arn: Option<String>,
99}
100
101#[derive(Debug, Clone)]
102pub struct StreamRecord {
103    pub event_id: String,
104    pub event_name: String, // INSERT, MODIFY, REMOVE
105    pub event_version: String,
106    pub event_source: String,
107    pub aws_region: String,
108    pub dynamodb: DynamoDbStreamRecord,
109    pub event_source_arn: String,
110    pub timestamp: DateTime<Utc>,
111}
112
113#[derive(Debug, Clone)]
114pub struct DynamoDbStreamRecord {
115    pub keys: HashMap<String, AttributeValue>,
116    pub new_image: Option<HashMap<String, AttributeValue>>,
117    pub old_image: Option<HashMap<String, AttributeValue>>,
118    pub sequence_number: String,
119    pub size_bytes: i64,
120    pub stream_view_type: String,
121}
122
123#[derive(Debug, Clone)]
124pub struct KinesisDestination {
125    pub stream_arn: String,
126    pub destination_status: String,
127    pub approximate_creation_date_time_precision: String,
128}
129
130#[derive(Debug, Clone)]
131pub struct BackupDescription {
132    pub backup_arn: String,
133    pub backup_name: String,
134    pub table_name: String,
135    pub table_arn: String,
136    pub backup_status: String,
137    pub backup_type: String,
138    pub backup_creation_date: DateTime<Utc>,
139    pub key_schema: Vec<KeySchemaElement>,
140    pub attribute_definitions: Vec<AttributeDefinition>,
141    pub provisioned_throughput: ProvisionedThroughput,
142    pub billing_mode: String,
143    pub item_count: i64,
144    pub size_bytes: i64,
145    /// Snapshot of the table items at backup creation time.
146    pub items: Vec<HashMap<String, AttributeValue>>,
147}
148
149#[derive(Debug, Clone)]
150pub struct GlobalTableDescription {
151    pub global_table_name: String,
152    pub global_table_arn: String,
153    pub global_table_status: String,
154    pub creation_date: DateTime<Utc>,
155    pub replication_group: Vec<ReplicaDescription>,
156}
157
158#[derive(Debug, Clone)]
159pub struct ReplicaDescription {
160    pub region_name: String,
161    pub replica_status: String,
162}
163
164#[derive(Debug, Clone)]
165pub struct ExportDescription {
166    pub export_arn: String,
167    pub export_status: String,
168    pub table_arn: String,
169    pub s3_bucket: String,
170    pub s3_prefix: Option<String>,
171    pub export_format: String,
172    pub start_time: DateTime<Utc>,
173    pub end_time: DateTime<Utc>,
174    pub export_time: DateTime<Utc>,
175    pub item_count: i64,
176    pub billed_size_bytes: i64,
177}
178
179#[derive(Debug, Clone)]
180pub struct ImportDescription {
181    pub import_arn: String,
182    pub import_status: String,
183    pub table_arn: String,
184    pub table_name: String,
185    pub s3_bucket_source: String,
186    pub input_format: String,
187    pub start_time: DateTime<Utc>,
188    pub end_time: DateTime<Utc>,
189    pub processed_item_count: i64,
190    pub processed_size_bytes: i64,
191}
192
193impl DynamoTable {
194    /// Get the hash key attribute name from the key schema.
195    pub fn hash_key_name(&self) -> &str {
196        self.key_schema
197            .iter()
198            .find(|k| k.key_type == "HASH")
199            .map(|k| k.attribute_name.as_str())
200            .unwrap_or("")
201    }
202
203    /// Get the range key attribute name from the key schema (if any).
204    pub fn range_key_name(&self) -> Option<&str> {
205        self.key_schema
206            .iter()
207            .find(|k| k.key_type == "RANGE")
208            .map(|k| k.attribute_name.as_str())
209    }
210
211    /// Find an item index by its primary key.
212    pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
213        let hash_key = self.hash_key_name();
214        let range_key = self.range_key_name();
215
216        self.items.iter().position(|item| {
217            let hash_match = match (item.get(hash_key), key.get(hash_key)) {
218                (Some(a), Some(b)) => a == b,
219                _ => false,
220            };
221            if !hash_match {
222                return false;
223            }
224            match range_key {
225                Some(rk) => match (item.get(rk), key.get(rk)) {
226                    (Some(a), Some(b)) => a == b,
227                    (None, None) => true,
228                    _ => false,
229                },
230                None => true,
231            }
232        })
233    }
234
235    /// Estimate item size in bytes (rough approximation).
236    fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
237        let mut size: i64 = 0;
238        for (k, v) in item {
239            size += k.len() as i64;
240            size += Self::estimate_value_size(v);
241        }
242        size
243    }
244
245    fn estimate_value_size(v: &Value) -> i64 {
246        match v {
247            Value::Object(obj) => {
248                if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
249                    s.len() as i64
250                } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
251                    n.len() as i64
252                } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
253                    1
254                } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
255                    3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
256                } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
257                    3 + m
258                        .iter()
259                        .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
260                        .sum::<i64>()
261                } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
262                    ss.iter()
263                        .filter_map(|v| v.as_str())
264                        .map(|s| s.len() as i64)
265                        .sum()
266                } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
267                    ns.iter()
268                        .filter_map(|v| v.as_str())
269                        .map(|s| s.len() as i64)
270                        .sum()
271                } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
272                    // Base64-encoded binary
273                    (b.len() as i64 * 3) / 4
274                } else {
275                    v.to_string().len() as i64
276                }
277            }
278            _ => v.to_string().len() as i64,
279        }
280    }
281
282    /// Record a partition key access for contributor insights.
283    /// Only records if contributor insights is enabled.
284    pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
285        if self.contributor_insights_status != "ENABLED" {
286            return;
287        }
288        let hash_key = self.hash_key_name().to_string();
289        if let Some(pk_value) = key.get(&hash_key) {
290            let key_str = pk_value.to_string();
291            *self
292                .contributor_insights_counters
293                .entry(key_str)
294                .or_insert(0) += 1;
295        }
296    }
297
298    /// Record a partition key access from a full item (extracts the key first).
299    pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
300        if self.contributor_insights_status != "ENABLED" {
301            return;
302        }
303        let hash_key = self.hash_key_name().to_string();
304        if let Some(pk_value) = item.get(&hash_key) {
305            let key_str = pk_value.to_string();
306            *self
307                .contributor_insights_counters
308                .entry(key_str)
309                .or_insert(0) += 1;
310        }
311    }
312
313    /// Get top N contributors sorted by access count (descending).
314    pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
315        let mut entries: Vec<(&str, u64)> = self
316            .contributor_insights_counters
317            .iter()
318            .map(|(k, &v)| (k.as_str(), v))
319            .collect();
320        entries.sort_by(|a, b| b.1.cmp(&a.1));
321        entries.truncate(n);
322        entries
323    }
324
325    /// Recalculate item_count and size_bytes from the items vec.
326    pub fn recalculate_stats(&mut self) {
327        self.item_count = self.items.len() as i64;
328        self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
329    }
330}
331
332pub struct DynamoDbState {
333    pub account_id: String,
334    pub region: String,
335    pub tables: HashMap<String, DynamoTable>,
336    pub backups: HashMap<String, BackupDescription>,
337    pub global_tables: HashMap<String, GlobalTableDescription>,
338    pub exports: HashMap<String, ExportDescription>,
339    pub imports: HashMap<String, ImportDescription>,
340}
341
342impl DynamoDbState {
343    pub fn new(account_id: &str, region: &str) -> Self {
344        Self {
345            account_id: account_id.to_string(),
346            region: region.to_string(),
347            tables: HashMap::new(),
348            backups: HashMap::new(),
349            global_tables: HashMap::new(),
350            exports: HashMap::new(),
351            imports: HashMap::new(),
352        }
353    }
354
355    pub fn reset(&mut self) {
356        self.tables.clear();
357        self.backups.clear();
358        self.global_tables.clear();
359        self.exports.clear();
360        self.imports.clear();
361    }
362}
363
364pub type SharedDynamoDbState = Arc<RwLock<DynamoDbState>>;