Skip to main content

fakecloud_dynamodb/
state.rs

1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7/// A single DynamoDB attribute value (tagged union matching the AWS wire format).
8/// AWS sends attribute values as `{"S": "hello"}`, `{"N": "42"}`, etc.
9pub type AttributeValue = Value;
10
11/// Extract the "typed" inner value for comparison purposes.
12/// Returns (type_tag, inner_value) e.g. ("S", "hello") or ("N", "42").
13pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
14    let obj = av.as_object()?;
15    if obj.len() != 1 {
16        return None;
17    }
18    let (k, v) = obj.iter().next()?;
19    Some((k.as_str(), v))
20}
21
22#[derive(Debug, Clone)]
23pub struct KeySchemaElement {
24    pub attribute_name: String,
25    pub key_type: String, // HASH or RANGE
26}
27
28#[derive(Debug, Clone)]
29pub struct AttributeDefinition {
30    pub attribute_name: String,
31    pub attribute_type: String, // S, N, B
32}
33
34#[derive(Debug, Clone)]
35pub struct ProvisionedThroughput {
36    pub read_capacity_units: i64,
37    pub write_capacity_units: i64,
38}
39
40#[derive(Debug, Clone)]
41pub struct GlobalSecondaryIndex {
42    pub index_name: String,
43    pub key_schema: Vec<KeySchemaElement>,
44    pub projection: Projection,
45    pub provisioned_throughput: Option<ProvisionedThroughput>,
46}
47
48#[derive(Debug, Clone)]
49pub struct LocalSecondaryIndex {
50    pub index_name: String,
51    pub key_schema: Vec<KeySchemaElement>,
52    pub projection: Projection,
53}
54
55#[derive(Debug, Clone)]
56pub struct Projection {
57    pub projection_type: String, // ALL, KEYS_ONLY, INCLUDE
58    pub non_key_attributes: Vec<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct DynamoTable {
63    pub name: String,
64    pub arn: String,
65    pub key_schema: Vec<KeySchemaElement>,
66    pub attribute_definitions: Vec<AttributeDefinition>,
67    pub provisioned_throughput: ProvisionedThroughput,
68    pub items: Vec<HashMap<String, AttributeValue>>,
69    pub gsi: Vec<GlobalSecondaryIndex>,
70    pub lsi: Vec<LocalSecondaryIndex>,
71    pub tags: HashMap<String, String>,
72    pub created_at: DateTime<Utc>,
73    pub status: String,
74    pub item_count: i64,
75    pub size_bytes: i64,
76    pub billing_mode: String, // PROVISIONED or PAY_PER_REQUEST
77    pub ttl_attribute: Option<String>,
78    pub ttl_enabled: bool,
79    pub resource_policy: Option<String>,
80    /// PITR enabled
81    pub pitr_enabled: bool,
82    /// Kinesis streaming destinations: stream_arn -> status
83    pub kinesis_destinations: Vec<KinesisDestination>,
84    /// Contributor insights status
85    pub contributor_insights_status: String,
86    /// Contributor insights: partition key access counters (key_value_string -> count)
87    pub contributor_insights_counters: HashMap<String, u64>,
88    /// DynamoDB Streams configuration
89    pub stream_enabled: bool,
90    pub stream_view_type: Option<String>, // KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES
91    pub stream_arn: Option<String>,
92    /// Stream records (retained for 24 hours)
93    pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
94    /// Server-side encryption type: AES256 (owned) or KMS
95    pub sse_type: Option<String>,
96    /// KMS key ARN for SSE (only when sse_type is KMS)
97    pub sse_kms_key_arn: Option<String>,
98}
99
100#[derive(Debug, Clone)]
101pub struct StreamRecord {
102    pub event_id: String,
103    pub event_name: String, // INSERT, MODIFY, REMOVE
104    pub event_version: String,
105    pub event_source: String,
106    pub aws_region: String,
107    pub dynamodb: DynamoDbStreamRecord,
108    pub event_source_arn: String,
109    pub timestamp: DateTime<Utc>,
110}
111
112#[derive(Debug, Clone)]
113pub struct DynamoDbStreamRecord {
114    pub keys: HashMap<String, AttributeValue>,
115    pub new_image: Option<HashMap<String, AttributeValue>>,
116    pub old_image: Option<HashMap<String, AttributeValue>>,
117    pub sequence_number: String,
118    pub size_bytes: i64,
119    pub stream_view_type: String,
120}
121
122#[derive(Debug, Clone)]
123pub struct KinesisDestination {
124    pub stream_arn: String,
125    pub destination_status: String,
126    pub approximate_creation_date_time_precision: String,
127}
128
129#[derive(Debug, Clone)]
130pub struct BackupDescription {
131    pub backup_arn: String,
132    pub backup_name: String,
133    pub table_name: String,
134    pub table_arn: String,
135    pub backup_status: String,
136    pub backup_type: String,
137    pub backup_creation_date: DateTime<Utc>,
138    pub key_schema: Vec<KeySchemaElement>,
139    pub attribute_definitions: Vec<AttributeDefinition>,
140    pub provisioned_throughput: ProvisionedThroughput,
141    pub billing_mode: String,
142    pub item_count: i64,
143    pub size_bytes: i64,
144    /// Snapshot of the table items at backup creation time.
145    pub items: Vec<HashMap<String, AttributeValue>>,
146}
147
148#[derive(Debug, Clone)]
149pub struct GlobalTableDescription {
150    pub global_table_name: String,
151    pub global_table_arn: String,
152    pub global_table_status: String,
153    pub creation_date: DateTime<Utc>,
154    pub replication_group: Vec<ReplicaDescription>,
155}
156
157#[derive(Debug, Clone)]
158pub struct ReplicaDescription {
159    pub region_name: String,
160    pub replica_status: String,
161}
162
163#[derive(Debug, Clone)]
164pub struct ExportDescription {
165    pub export_arn: String,
166    pub export_status: String,
167    pub table_arn: String,
168    pub s3_bucket: String,
169    pub s3_prefix: Option<String>,
170    pub export_format: String,
171    pub start_time: DateTime<Utc>,
172    pub end_time: DateTime<Utc>,
173    pub export_time: DateTime<Utc>,
174    pub item_count: i64,
175    pub billed_size_bytes: i64,
176}
177
178#[derive(Debug, Clone)]
179pub struct ImportDescription {
180    pub import_arn: String,
181    pub import_status: String,
182    pub table_arn: String,
183    pub table_name: String,
184    pub s3_bucket_source: String,
185    pub input_format: String,
186    pub start_time: DateTime<Utc>,
187    pub end_time: DateTime<Utc>,
188    pub processed_item_count: i64,
189    pub processed_size_bytes: i64,
190}
191
192impl DynamoTable {
193    /// Get the hash key attribute name from the key schema.
194    pub fn hash_key_name(&self) -> &str {
195        self.key_schema
196            .iter()
197            .find(|k| k.key_type == "HASH")
198            .map(|k| k.attribute_name.as_str())
199            .unwrap_or("")
200    }
201
202    /// Get the range key attribute name from the key schema (if any).
203    pub fn range_key_name(&self) -> Option<&str> {
204        self.key_schema
205            .iter()
206            .find(|k| k.key_type == "RANGE")
207            .map(|k| k.attribute_name.as_str())
208    }
209
210    /// Find an item index by its primary key.
211    pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
212        let hash_key = self.hash_key_name();
213        let range_key = self.range_key_name();
214
215        self.items.iter().position(|item| {
216            let hash_match = match (item.get(hash_key), key.get(hash_key)) {
217                (Some(a), Some(b)) => a == b,
218                _ => false,
219            };
220            if !hash_match {
221                return false;
222            }
223            match range_key {
224                Some(rk) => match (item.get(rk), key.get(rk)) {
225                    (Some(a), Some(b)) => a == b,
226                    (None, None) => true,
227                    _ => false,
228                },
229                None => true,
230            }
231        })
232    }
233
234    /// Estimate item size in bytes (rough approximation).
235    fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
236        let mut size: i64 = 0;
237        for (k, v) in item {
238            size += k.len() as i64;
239            size += Self::estimate_value_size(v);
240        }
241        size
242    }
243
244    fn estimate_value_size(v: &Value) -> i64 {
245        match v {
246            Value::Object(obj) => {
247                if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
248                    s.len() as i64
249                } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
250                    n.len() as i64
251                } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
252                    1
253                } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
254                    3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
255                } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
256                    3 + m
257                        .iter()
258                        .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
259                        .sum::<i64>()
260                } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
261                    ss.iter()
262                        .filter_map(|v| v.as_str())
263                        .map(|s| s.len() as i64)
264                        .sum()
265                } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
266                    ns.iter()
267                        .filter_map(|v| v.as_str())
268                        .map(|s| s.len() as i64)
269                        .sum()
270                } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
271                    // Base64-encoded binary
272                    (b.len() as i64 * 3) / 4
273                } else {
274                    v.to_string().len() as i64
275                }
276            }
277            _ => v.to_string().len() as i64,
278        }
279    }
280
281    /// Record a partition key access for contributor insights.
282    /// Only records if contributor insights is enabled.
283    pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
284        if self.contributor_insights_status != "ENABLED" {
285            return;
286        }
287        let hash_key = self.hash_key_name().to_string();
288        if let Some(pk_value) = key.get(&hash_key) {
289            let key_str = pk_value.to_string();
290            *self
291                .contributor_insights_counters
292                .entry(key_str)
293                .or_insert(0) += 1;
294        }
295    }
296
297    /// Record a partition key access from a full item (extracts the key first).
298    pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
299        if self.contributor_insights_status != "ENABLED" {
300            return;
301        }
302        let hash_key = self.hash_key_name().to_string();
303        if let Some(pk_value) = item.get(&hash_key) {
304            let key_str = pk_value.to_string();
305            *self
306                .contributor_insights_counters
307                .entry(key_str)
308                .or_insert(0) += 1;
309        }
310    }
311
312    /// Get top N contributors sorted by access count (descending).
313    pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
314        let mut entries: Vec<(&str, u64)> = self
315            .contributor_insights_counters
316            .iter()
317            .map(|(k, &v)| (k.as_str(), v))
318            .collect();
319        entries.sort_by(|a, b| b.1.cmp(&a.1));
320        entries.truncate(n);
321        entries
322    }
323
324    /// Recalculate item_count and size_bytes from the items vec.
325    pub fn recalculate_stats(&mut self) {
326        self.item_count = self.items.len() as i64;
327        self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
328    }
329}
330
331pub struct DynamoDbState {
332    pub account_id: String,
333    pub region: String,
334    pub tables: HashMap<String, DynamoTable>,
335    pub backups: HashMap<String, BackupDescription>,
336    pub global_tables: HashMap<String, GlobalTableDescription>,
337    pub exports: HashMap<String, ExportDescription>,
338    pub imports: HashMap<String, ImportDescription>,
339}
340
341impl DynamoDbState {
342    pub fn new(account_id: &str, region: &str) -> Self {
343        Self {
344            account_id: account_id.to_string(),
345            region: region.to_string(),
346            tables: HashMap::new(),
347            backups: HashMap::new(),
348            global_tables: HashMap::new(),
349            exports: HashMap::new(),
350            imports: HashMap::new(),
351        }
352    }
353
354    pub fn reset(&mut self) {
355        self.tables.clear();
356        self.backups.clear();
357        self.global_tables.clear();
358        self.exports.clear();
359        self.imports.clear();
360    }
361}
362
363pub type SharedDynamoDbState = Arc<RwLock<DynamoDbState>>;