Skip to main content

fakecloud_dynamodb/
state.rs

1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8fn empty_stream_records() -> Arc<RwLock<Vec<StreamRecord>>> {
9    Arc::new(RwLock::new(Vec::new()))
10}
11
12/// A single DynamoDB attribute value (tagged union matching the AWS wire format).
13/// AWS sends attribute values as `{"S": "hello"}`, `{"N": "42"}`, etc.
14pub type AttributeValue = Value;
15
16/// Extract the "typed" inner value for comparison purposes.
17/// Returns (type_tag, inner_value) e.g. ("S", "hello") or ("N", "42").
18pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
19    let obj = av.as_object()?;
20    if obj.len() != 1 {
21        return None;
22    }
23    let (k, v) = obj.iter().next()?;
24    Some((k.as_str(), v))
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct KeySchemaElement {
29    pub attribute_name: String,
30    pub key_type: String, // HASH or RANGE
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct AttributeDefinition {
35    pub attribute_name: String,
36    pub attribute_type: String, // S, N, B
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ProvisionedThroughput {
41    pub read_capacity_units: i64,
42    pub write_capacity_units: i64,
43}
44
45/// On-demand capacity caps for PAY_PER_REQUEST tables and GSIs. Real AWS
46/// accepts both fields independently; `-1` (the AWS sentinel for "no cap")
47/// is the default and is what `DescribeTable` returns when the caller never
48/// set a value — the Terraform provider asserts on that exact value.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct OnDemandThroughput {
51    pub max_read_request_units: i64,
52    pub max_write_request_units: i64,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct GlobalSecondaryIndex {
57    pub index_name: String,
58    pub key_schema: Vec<KeySchemaElement>,
59    pub projection: Projection,
60    pub provisioned_throughput: Option<ProvisionedThroughput>,
61    pub on_demand_throughput: Option<OnDemandThroughput>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct LocalSecondaryIndex {
66    pub index_name: String,
67    pub key_schema: Vec<KeySchemaElement>,
68    pub projection: Projection,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct Projection {
73    pub projection_type: String, // ALL, KEYS_ONLY, INCLUDE
74    pub non_key_attributes: Vec<String>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct DynamoTable {
79    pub name: String,
80    pub arn: String,
81    pub table_id: String,
82    pub key_schema: Vec<KeySchemaElement>,
83    pub attribute_definitions: Vec<AttributeDefinition>,
84    pub provisioned_throughput: ProvisionedThroughput,
85    pub items: Vec<HashMap<String, AttributeValue>>,
86    pub gsi: Vec<GlobalSecondaryIndex>,
87    pub lsi: Vec<LocalSecondaryIndex>,
88    pub tags: HashMap<String, String>,
89    pub created_at: DateTime<Utc>,
90    pub status: String,
91    pub item_count: i64,
92    pub size_bytes: i64,
93    pub billing_mode: String, // PROVISIONED or PAY_PER_REQUEST
94    pub ttl_attribute: Option<String>,
95    pub ttl_enabled: bool,
96    pub resource_policy: Option<String>,
97    /// PITR enabled
98    pub pitr_enabled: bool,
99    /// Kinesis streaming destinations: stream_arn -> status
100    pub kinesis_destinations: Vec<KinesisDestination>,
101    /// Contributor insights status
102    pub contributor_insights_status: String,
103    /// Contributor insights: partition key access counters (key_value_string -> count)
104    pub contributor_insights_counters: HashMap<String, u64>,
105    /// DynamoDB Streams configuration
106    pub stream_enabled: bool,
107    pub stream_view_type: Option<String>, // KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES
108    pub stream_arn: Option<String>,
109    /// Stream records (retained for 24 hours). Not persisted: stream
110    /// records are ephemeral and would be garbage anyway across restarts.
111    #[serde(skip, default = "empty_stream_records")]
112    pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
113    /// Server-side encryption type: AES256 (owned) or KMS
114    pub sse_type: Option<String>,
115    /// KMS key ARN for SSE (only when sse_type is KMS)
116    pub sse_kms_key_arn: Option<String>,
117    /// Deletion protection: when true, DeleteTable is rejected with
118    /// `ResourceInUseException`. Defaults to false. Returned on every
119    /// `DescribeTable` and toggleable via `UpdateTable`.
120    pub deletion_protection_enabled: bool,
121    /// Table-level on-demand throughput caps. Only meaningful for
122    /// PAY_PER_REQUEST tables, but real AWS echoes the field on every
123    /// DescribeTable once set.
124    pub on_demand_throughput: Option<OnDemandThroughput>,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct StreamRecord {
129    pub event_id: String,
130    pub event_name: String, // INSERT, MODIFY, REMOVE
131    pub event_version: String,
132    pub event_source: String,
133    pub aws_region: String,
134    pub dynamodb: DynamoDbStreamRecord,
135    pub event_source_arn: String,
136    pub timestamp: DateTime<Utc>,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct DynamoDbStreamRecord {
141    pub keys: HashMap<String, AttributeValue>,
142    pub new_image: Option<HashMap<String, AttributeValue>>,
143    pub old_image: Option<HashMap<String, AttributeValue>>,
144    pub sequence_number: String,
145    pub size_bytes: i64,
146    pub stream_view_type: String,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct KinesisDestination {
151    pub stream_arn: String,
152    pub destination_status: String,
153    pub approximate_creation_date_time_precision: String,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct BackupDescription {
158    pub backup_arn: String,
159    pub backup_name: String,
160    pub table_name: String,
161    pub table_arn: String,
162    pub backup_status: String,
163    pub backup_type: String,
164    pub backup_creation_date: DateTime<Utc>,
165    pub key_schema: Vec<KeySchemaElement>,
166    pub attribute_definitions: Vec<AttributeDefinition>,
167    pub provisioned_throughput: ProvisionedThroughput,
168    pub billing_mode: String,
169    pub item_count: i64,
170    pub size_bytes: i64,
171    /// Snapshot of the table items at backup creation time.
172    pub items: Vec<HashMap<String, AttributeValue>>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GlobalTableDescription {
177    pub global_table_name: String,
178    pub global_table_arn: String,
179    pub global_table_status: String,
180    pub creation_date: DateTime<Utc>,
181    pub replication_group: Vec<ReplicaDescription>,
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct ReplicaDescription {
186    pub region_name: String,
187    pub replica_status: String,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct ExportDescription {
192    pub export_arn: String,
193    pub export_status: String,
194    pub table_arn: String,
195    pub s3_bucket: String,
196    pub s3_prefix: Option<String>,
197    pub export_format: String,
198    pub start_time: DateTime<Utc>,
199    pub end_time: DateTime<Utc>,
200    pub export_time: DateTime<Utc>,
201    pub item_count: i64,
202    pub billed_size_bytes: i64,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct ImportDescription {
207    pub import_arn: String,
208    pub import_status: String,
209    pub table_arn: String,
210    pub table_name: String,
211    pub s3_bucket_source: String,
212    pub input_format: String,
213    pub start_time: DateTime<Utc>,
214    pub end_time: DateTime<Utc>,
215    pub processed_item_count: i64,
216    pub processed_size_bytes: i64,
217}
218
219impl DynamoTable {
220    /// Get the hash key attribute name from the key schema.
221    pub fn hash_key_name(&self) -> &str {
222        self.key_schema
223            .iter()
224            .find(|k| k.key_type == "HASH")
225            .map(|k| k.attribute_name.as_str())
226            .unwrap_or("")
227    }
228
229    /// Get the range key attribute name from the key schema (if any).
230    pub fn range_key_name(&self) -> Option<&str> {
231        self.key_schema
232            .iter()
233            .find(|k| k.key_type == "RANGE")
234            .map(|k| k.attribute_name.as_str())
235    }
236
237    /// Find an item index by its primary key.
238    pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
239        let hash_key = self.hash_key_name();
240        let range_key = self.range_key_name();
241
242        self.items.iter().position(|item| {
243            let hash_match = match (item.get(hash_key), key.get(hash_key)) {
244                (Some(a), Some(b)) => a == b,
245                _ => false,
246            };
247            if !hash_match {
248                return false;
249            }
250            match range_key {
251                Some(rk) => match (item.get(rk), key.get(rk)) {
252                    (Some(a), Some(b)) => a == b,
253                    (None, None) => true,
254                    _ => false,
255                },
256                None => true,
257            }
258        })
259    }
260
261    /// Estimate item size in bytes (rough approximation).
262    fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
263        let mut size: i64 = 0;
264        for (k, v) in item {
265            size += k.len() as i64;
266            size += Self::estimate_value_size(v);
267        }
268        size
269    }
270
271    fn estimate_value_size(v: &Value) -> i64 {
272        match v {
273            Value::Object(obj) => {
274                if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
275                    s.len() as i64
276                } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
277                    n.len() as i64
278                } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
279                    1
280                } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
281                    3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
282                } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
283                    3 + m
284                        .iter()
285                        .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
286                        .sum::<i64>()
287                } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
288                    ss.iter()
289                        .filter_map(|v| v.as_str())
290                        .map(|s| s.len() as i64)
291                        .sum()
292                } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
293                    ns.iter()
294                        .filter_map(|v| v.as_str())
295                        .map(|s| s.len() as i64)
296                        .sum()
297                } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
298                    // Base64-encoded binary
299                    (b.len() as i64 * 3) / 4
300                } else {
301                    v.to_string().len() as i64
302                }
303            }
304            _ => v.to_string().len() as i64,
305        }
306    }
307
308    /// Record a partition key access for contributor insights.
309    /// Only records if contributor insights is enabled.
310    pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
311        if self.contributor_insights_status != "ENABLED" {
312            return;
313        }
314        let hash_key = self.hash_key_name().to_string();
315        if let Some(pk_value) = key.get(&hash_key) {
316            let key_str = pk_value.to_string();
317            *self
318                .contributor_insights_counters
319                .entry(key_str)
320                .or_insert(0) += 1;
321        }
322    }
323
324    /// Record a partition key access from a full item (extracts the key first).
325    pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
326        if self.contributor_insights_status != "ENABLED" {
327            return;
328        }
329        let hash_key = self.hash_key_name().to_string();
330        if let Some(pk_value) = item.get(&hash_key) {
331            let key_str = pk_value.to_string();
332            *self
333                .contributor_insights_counters
334                .entry(key_str)
335                .or_insert(0) += 1;
336        }
337    }
338
339    /// Get top N contributors sorted by access count (descending).
340    pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
341        let mut entries: Vec<(&str, u64)> = self
342            .contributor_insights_counters
343            .iter()
344            .map(|(k, &v)| (k.as_str(), v))
345            .collect();
346        entries.sort_by_key(|e| std::cmp::Reverse(e.1));
347        entries.truncate(n);
348        entries
349    }
350
351    /// Recalculate item_count and size_bytes from the items vec.
352    pub fn recalculate_stats(&mut self) {
353        self.item_count = self.items.len() as i64;
354        self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
355    }
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct DynamoDbState {
360    pub account_id: String,
361    pub region: String,
362    pub tables: HashMap<String, DynamoTable>,
363    pub backups: HashMap<String, BackupDescription>,
364    pub global_tables: HashMap<String, GlobalTableDescription>,
365    pub exports: HashMap<String, ExportDescription>,
366    pub imports: HashMap<String, ImportDescription>,
367}
368
369/// On-disk snapshot envelope. The payload is the full [`DynamoDbState`];
370/// `schema_version` lets us evolve the format without accidentally loading
371/// an incompatible dump on upgrade.
372#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct DynamoDbSnapshot {
374    pub schema_version: u32,
375    /// v2+: multi-account state.
376    #[serde(default)]
377    pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>,
378    /// v1 compat: single-account state.
379    #[serde(default)]
380    pub state: Option<DynamoDbState>,
381}
382
383pub const DYNAMODB_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
384
385impl DynamoDbState {
386    pub fn new(account_id: &str, region: &str) -> Self {
387        Self {
388            account_id: account_id.to_string(),
389            region: region.to_string(),
390            tables: HashMap::new(),
391            backups: HashMap::new(),
392            global_tables: HashMap::new(),
393            exports: HashMap::new(),
394            imports: HashMap::new(),
395        }
396    }
397
398    pub fn reset(&mut self) {
399        self.tables.clear();
400        self.backups.clear();
401        self.global_tables.clear();
402        self.exports.clear();
403        self.imports.clear();
404    }
405}
406
407impl fakecloud_core::multi_account::AccountState for DynamoDbState {
408    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
409        Self::new(account_id, region)
410    }
411}
412
413pub type SharedDynamoDbState =
414    Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>>;
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use serde_json::json;
420
421    #[test]
422    fn attribute_type_and_value_valid() {
423        let v = json!({"S": "hi"});
424        let (ty, val) = attribute_type_and_value(&v).unwrap();
425        assert_eq!(ty, "S");
426        assert_eq!(val, &json!("hi"));
427    }
428
429    #[test]
430    fn attribute_type_and_value_empty_returns_none() {
431        let v = json!({});
432        assert!(attribute_type_and_value(&v).is_none());
433    }
434
435    #[test]
436    fn attribute_type_and_value_multiple_entries_returns_none() {
437        let v = json!({"S": "hi", "N": "1"});
438        assert!(attribute_type_and_value(&v).is_none());
439    }
440
441    #[test]
442    fn attribute_type_and_value_non_object_returns_none() {
443        let v = json!("not-object");
444        assert!(attribute_type_and_value(&v).is_none());
445    }
446
447    #[test]
448    fn account_state_trait_impl() {
449        use fakecloud_core::multi_account::AccountState;
450        let state = DynamoDbState::new_for_account("123", "us-east-1", "");
451        assert_eq!(state.account_id, "123");
452        assert_eq!(state.region, "us-east-1");
453    }
454
455    #[test]
456    fn new_and_reset() {
457        let state = DynamoDbState::new("123", "us-east-1");
458        assert!(state.tables.is_empty());
459    }
460
461    fn table_with_hash_key(hash: &str) -> DynamoTable {
462        DynamoTable {
463            name: "t".to_string(),
464            arn: "arn:aws:dynamodb:us-east-1:123:table/t".to_string(),
465            table_id: "id".to_string(),
466            key_schema: vec![KeySchemaElement {
467                attribute_name: hash.to_string(),
468                key_type: "HASH".to_string(),
469            }],
470            attribute_definitions: vec![],
471            provisioned_throughput: ProvisionedThroughput {
472                read_capacity_units: 1,
473                write_capacity_units: 1,
474            },
475            items: Vec::new(),
476            gsi: Vec::new(),
477            lsi: Vec::new(),
478            tags: HashMap::new(),
479            created_at: Utc::now(),
480            status: "ACTIVE".to_string(),
481            item_count: 0,
482            size_bytes: 0,
483            billing_mode: "PROVISIONED".to_string(),
484            ttl_attribute: None,
485            ttl_enabled: false,
486            resource_policy: None,
487            pitr_enabled: false,
488            kinesis_destinations: Vec::new(),
489            contributor_insights_status: "DISABLED".to_string(),
490            contributor_insights_counters: HashMap::new(),
491            stream_enabled: false,
492            stream_view_type: None,
493            stream_arn: None,
494            stream_records: empty_stream_records(),
495            sse_type: None,
496            sse_kms_key_arn: None,
497            deletion_protection_enabled: false,
498            on_demand_throughput: None,
499        }
500    }
501
502    #[test]
503    fn hash_key_name_extracts_from_schema() {
504        let t = table_with_hash_key("pk");
505        assert_eq!(t.hash_key_name(), "pk");
506    }
507
508    #[test]
509    fn hash_key_name_empty_when_no_hash_schema() {
510        let mut t = table_with_hash_key("pk");
511        t.key_schema.clear();
512        assert_eq!(t.hash_key_name(), "");
513    }
514
515    #[test]
516    fn record_key_access_noop_when_disabled() {
517        let mut t = table_with_hash_key("pk");
518        let mut key = HashMap::new();
519        key.insert("pk".to_string(), json!({"S": "a"}));
520        t.record_key_access(&key);
521        assert!(t.contributor_insights_counters.is_empty());
522    }
523
524    #[test]
525    fn record_key_access_increments_when_enabled() {
526        let mut t = table_with_hash_key("pk");
527        t.contributor_insights_status = "ENABLED".to_string();
528        let mut key = HashMap::new();
529        key.insert("pk".to_string(), json!({"S": "a"}));
530        t.record_key_access(&key);
531        t.record_key_access(&key);
532        assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 2);
533    }
534
535    #[test]
536    fn record_item_access_uses_hash_key_from_item() {
537        let mut t = table_with_hash_key("pk");
538        t.contributor_insights_status = "ENABLED".to_string();
539        let mut item = HashMap::new();
540        item.insert("pk".to_string(), json!({"S": "user-1"}));
541        item.insert("other".to_string(), json!({"N": "42"}));
542        t.record_item_access(&item);
543        assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 1);
544    }
545
546    #[test]
547    fn top_contributors_returns_sorted() {
548        let mut t = table_with_hash_key("pk");
549        t.contributor_insights_counters.insert("a".to_string(), 3);
550        t.contributor_insights_counters.insert("b".to_string(), 10);
551        t.contributor_insights_counters.insert("c".to_string(), 1);
552        let top = t.top_contributors(2);
553        assert_eq!(top.len(), 2);
554        assert_eq!(top[0], ("b", 10));
555        assert_eq!(top[1], ("a", 3));
556    }
557
558    #[test]
559    fn recalculate_stats_matches_items() {
560        let mut t = table_with_hash_key("pk");
561        let mut item1 = HashMap::new();
562        item1.insert("pk".to_string(), json!({"S": "hello"}));
563        let mut item2 = HashMap::new();
564        item2.insert("pk".to_string(), json!({"N": "42"}));
565        item2.insert("flag".to_string(), json!({"BOOL": true}));
566        t.items.push(item1);
567        t.items.push(item2);
568        t.recalculate_stats();
569        assert_eq!(t.item_count, 2);
570        assert!(t.size_bytes > 0);
571    }
572
573    #[test]
574    fn estimate_value_size_covers_all_types() {
575        let s = DynamoTable::estimate_value_size(&json!({"S": "abc"}));
576        assert_eq!(s, 3);
577        let n = DynamoTable::estimate_value_size(&json!({"N": "42"}));
578        assert_eq!(n, 2);
579        let b = DynamoTable::estimate_value_size(&json!({"BOOL": true}));
580        assert_eq!(b, 1);
581        let null = DynamoTable::estimate_value_size(&json!({"NULL": true}));
582        assert_eq!(null, 1);
583        let l = DynamoTable::estimate_value_size(&json!({"L": [{"S": "x"}, {"S": "yy"}]}));
584        assert_eq!(l, 6);
585        let m = DynamoTable::estimate_value_size(&json!({"M": {"key": {"S": "v"}}}));
586        assert_eq!(m, 7);
587        let ss = DynamoTable::estimate_value_size(&json!({"SS": ["ab", "cde"]}));
588        assert_eq!(ss, 5);
589        let ns = DynamoTable::estimate_value_size(&json!({"NS": ["12", "345"]}));
590        assert_eq!(ns, 5);
591        let bin = DynamoTable::estimate_value_size(&json!({"B": "AAAAAAAA"}));
592        assert_eq!(bin, 6);
593    }
594}