Skip to main content

fakecloud_dynamodb/
state.rs

1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::{BTreeMap, HashMap};
6use std::sync::Arc;
7
8fn empty_stream_records() -> Arc<RwLock<Vec<StreamRecord>>> {
9    Arc::new(RwLock::new(Vec::new()))
10}
11
12/// A single DynamoDB attribute value (tagged union matching the AWS wire format).
13/// AWS sends attribute values as `{"S": "hello"}`, `{"N": "42"}`, etc.
14pub type AttributeValue = Value;
15
16/// Extract the "typed" inner value for comparison purposes.
17/// Returns (type_tag, inner_value) e.g. ("S", "hello") or ("N", "42").
18pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
19    let obj = av.as_object()?;
20    if obj.len() != 1 {
21        return None;
22    }
23    let (k, v) = obj.iter().next()?;
24    Some((k.as_str(), v))
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct KeySchemaElement {
29    pub attribute_name: String,
30    pub key_type: String, // HASH or RANGE
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct AttributeDefinition {
35    pub attribute_name: String,
36    pub attribute_type: String, // S, N, B
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ProvisionedThroughput {
41    pub read_capacity_units: i64,
42    pub write_capacity_units: i64,
43}
44
45/// On-demand capacity caps for PAY_PER_REQUEST tables and GSIs. Real AWS
46/// accepts both fields independently; `-1` (the AWS sentinel for "no cap")
47/// is the default and is what `DescribeTable` returns when the caller never
48/// set a value — the Terraform provider asserts on that exact value.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct OnDemandThroughput {
51    pub max_read_request_units: i64,
52    pub max_write_request_units: i64,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct GlobalSecondaryIndex {
57    pub index_name: String,
58    pub key_schema: Vec<KeySchemaElement>,
59    pub projection: Projection,
60    pub provisioned_throughput: Option<ProvisionedThroughput>,
61    pub on_demand_throughput: Option<OnDemandThroughput>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct LocalSecondaryIndex {
66    pub index_name: String,
67    pub key_schema: Vec<KeySchemaElement>,
68    pub projection: Projection,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct Projection {
73    pub projection_type: String, // ALL, KEYS_ONLY, INCLUDE
74    pub non_key_attributes: Vec<String>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct DynamoTable {
79    pub name: String,
80    pub arn: String,
81    pub table_id: String,
82    pub key_schema: Vec<KeySchemaElement>,
83    pub attribute_definitions: Vec<AttributeDefinition>,
84    pub provisioned_throughput: ProvisionedThroughput,
85    pub items: Vec<HashMap<String, AttributeValue>>,
86    pub gsi: Vec<GlobalSecondaryIndex>,
87    pub lsi: Vec<LocalSecondaryIndex>,
88    pub tags: BTreeMap<String, String>,
89    pub created_at: DateTime<Utc>,
90    pub status: String,
91    pub item_count: i64,
92    pub size_bytes: i64,
93    pub billing_mode: String, // PROVISIONED or PAY_PER_REQUEST
94    pub ttl_attribute: Option<String>,
95    pub ttl_enabled: bool,
96    pub resource_policy: Option<String>,
97    /// PITR enabled
98    pub pitr_enabled: bool,
99    /// Kinesis streaming destinations: stream_arn -> status
100    pub kinesis_destinations: Vec<KinesisDestination>,
101    /// Contributor insights status
102    pub contributor_insights_status: String,
103    /// Contributor insights: partition key access counters (key_value_string -> count)
104    pub contributor_insights_counters: BTreeMap<String, u64>,
105    /// DynamoDB Streams configuration
106    pub stream_enabled: bool,
107    pub stream_view_type: Option<String>, // KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES
108    pub stream_arn: Option<String>,
109    /// Stream records (retained for 24 hours). Not persisted: stream
110    /// records are ephemeral and would be garbage anyway across restarts.
111    #[serde(skip, default = "empty_stream_records")]
112    pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
113    /// Server-side encryption type: AES256 (owned) or KMS
114    pub sse_type: Option<String>,
115    /// KMS key ARN for SSE (only when sse_type is KMS)
116    pub sse_kms_key_arn: Option<String>,
117    /// Deletion protection: when true, DeleteTable is rejected with
118    /// `ResourceInUseException`. Defaults to false. Returned on every
119    /// `DescribeTable` and toggleable via `UpdateTable`.
120    pub deletion_protection_enabled: bool,
121    /// Table-level on-demand throughput caps. Only meaningful for
122    /// PAY_PER_REQUEST tables, but real AWS echoes the field on every
123    /// DescribeTable once set.
124    pub on_demand_throughput: Option<OnDemandThroughput>,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct StreamRecord {
129    pub event_id: String,
130    pub event_name: String, // INSERT, MODIFY, REMOVE
131    pub event_version: String,
132    pub event_source: String,
133    pub aws_region: String,
134    pub dynamodb: DynamoDbStreamRecord,
135    pub event_source_arn: String,
136    pub timestamp: DateTime<Utc>,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct DynamoDbStreamRecord {
141    pub keys: HashMap<String, AttributeValue>,
142    pub new_image: Option<HashMap<String, AttributeValue>>,
143    pub old_image: Option<HashMap<String, AttributeValue>>,
144    pub sequence_number: String,
145    pub size_bytes: i64,
146    pub stream_view_type: String,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct KinesisDestination {
151    pub stream_arn: String,
152    pub destination_status: String,
153    pub approximate_creation_date_time_precision: String,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct BackupDescription {
158    pub backup_arn: String,
159    pub backup_name: String,
160    pub table_name: String,
161    pub table_arn: String,
162    pub backup_status: String,
163    pub backup_type: String,
164    pub backup_creation_date: DateTime<Utc>,
165    pub key_schema: Vec<KeySchemaElement>,
166    pub attribute_definitions: Vec<AttributeDefinition>,
167    pub provisioned_throughput: ProvisionedThroughput,
168    pub billing_mode: String,
169    pub item_count: i64,
170    pub size_bytes: i64,
171    /// Snapshot of the table items at backup creation time.
172    pub items: Vec<HashMap<String, AttributeValue>>,
173    /// Real DDB persists GSI/LSI/tags/TTL/SSE/Stream into the backup
174    /// payload so RestoreTableFromBackup brings the full table back
175    /// up. Older snapshots may not have these fields, so all default
176    /// to empty/false via serde.
177    #[serde(default)]
178    pub gsi: Vec<GlobalSecondaryIndex>,
179    #[serde(default)]
180    pub lsi: Vec<LocalSecondaryIndex>,
181    #[serde(default)]
182    pub tags: BTreeMap<String, String>,
183    #[serde(default)]
184    pub ttl_attribute: Option<String>,
185    #[serde(default)]
186    pub ttl_enabled: bool,
187    #[serde(default)]
188    pub sse_type: Option<String>,
189    #[serde(default)]
190    pub sse_kms_key_arn: Option<String>,
191    #[serde(default)]
192    pub stream_enabled: bool,
193    #[serde(default)]
194    pub stream_view_type: Option<String>,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct GlobalTableDescription {
199    pub global_table_name: String,
200    pub global_table_arn: String,
201    pub global_table_status: String,
202    pub creation_date: DateTime<Utc>,
203    pub replication_group: Vec<ReplicaDescription>,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct ReplicaDescription {
208    pub region_name: String,
209    pub replica_status: String,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct ExportDescription {
214    pub export_arn: String,
215    pub export_status: String,
216    pub table_arn: String,
217    pub s3_bucket: String,
218    pub s3_prefix: Option<String>,
219    pub export_format: String,
220    pub start_time: DateTime<Utc>,
221    pub end_time: DateTime<Utc>,
222    pub export_time: DateTime<Utc>,
223    pub item_count: i64,
224    pub billed_size_bytes: i64,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct ImportDescription {
229    pub import_arn: String,
230    pub import_status: String,
231    pub table_arn: String,
232    pub table_name: String,
233    pub s3_bucket_source: String,
234    pub input_format: String,
235    pub start_time: DateTime<Utc>,
236    pub end_time: DateTime<Utc>,
237    pub processed_item_count: i64,
238    pub processed_size_bytes: i64,
239}
240
241impl DynamoTable {
242    /// Get the hash key attribute name from the key schema.
243    pub fn hash_key_name(&self) -> &str {
244        self.key_schema
245            .iter()
246            .find(|k| k.key_type == "HASH")
247            .map(|k| k.attribute_name.as_str())
248            .unwrap_or("")
249    }
250
251    /// Get the range key attribute name from the key schema (if any).
252    pub fn range_key_name(&self) -> Option<&str> {
253        self.key_schema
254            .iter()
255            .find(|k| k.key_type == "RANGE")
256            .map(|k| k.attribute_name.as_str())
257    }
258
259    /// Find an item index by its primary key.
260    pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
261        let hash_key = self.hash_key_name();
262        let range_key = self.range_key_name();
263
264        self.items.iter().position(|item| {
265            let hash_match = match (item.get(hash_key), key.get(hash_key)) {
266                (Some(a), Some(b)) => a == b,
267                _ => false,
268            };
269            if !hash_match {
270                return false;
271            }
272            match range_key {
273                Some(rk) => match (item.get(rk), key.get(rk)) {
274                    (Some(a), Some(b)) => a == b,
275                    (None, None) => true,
276                    _ => false,
277                },
278                None => true,
279            }
280        })
281    }
282
283    /// Estimate item size in bytes (rough approximation).
284    fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
285        let mut size: i64 = 0;
286        for (k, v) in item {
287            size += k.len() as i64;
288            size += Self::estimate_value_size(v);
289        }
290        size
291    }
292
293    fn estimate_value_size(v: &Value) -> i64 {
294        match v {
295            Value::Object(obj) => {
296                if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
297                    s.len() as i64
298                } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
299                    n.len() as i64
300                } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
301                    1
302                } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
303                    3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
304                } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
305                    3 + m
306                        .iter()
307                        .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
308                        .sum::<i64>()
309                } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
310                    ss.iter()
311                        .filter_map(|v| v.as_str())
312                        .map(|s| s.len() as i64)
313                        .sum()
314                } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
315                    ns.iter()
316                        .filter_map(|v| v.as_str())
317                        .map(|s| s.len() as i64)
318                        .sum()
319                } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
320                    // Base64-encoded binary
321                    (b.len() as i64 * 3) / 4
322                } else {
323                    v.to_string().len() as i64
324                }
325            }
326            _ => v.to_string().len() as i64,
327        }
328    }
329
330    /// Record a partition key access for contributor insights.
331    /// Only records if contributor insights is enabled.
332    pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
333        if self.contributor_insights_status != "ENABLED" {
334            return;
335        }
336        let hash_key = self.hash_key_name().to_string();
337        if let Some(pk_value) = key.get(&hash_key) {
338            let key_str = pk_value.to_string();
339            *self
340                .contributor_insights_counters
341                .entry(key_str)
342                .or_insert(0) += 1;
343        }
344    }
345
346    /// Record a partition key access from a full item (extracts the key first).
347    pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
348        if self.contributor_insights_status != "ENABLED" {
349            return;
350        }
351        let hash_key = self.hash_key_name().to_string();
352        if let Some(pk_value) = item.get(&hash_key) {
353            let key_str = pk_value.to_string();
354            *self
355                .contributor_insights_counters
356                .entry(key_str)
357                .or_insert(0) += 1;
358        }
359    }
360
361    /// Get top N contributors sorted by access count (descending).
362    pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
363        let mut entries: Vec<(&str, u64)> = self
364            .contributor_insights_counters
365            .iter()
366            .map(|(k, &v)| (k.as_str(), v))
367            .collect();
368        entries.sort_by_key(|e| std::cmp::Reverse(e.1));
369        entries.truncate(n);
370        entries
371    }
372
373    /// Recalculate item_count and size_bytes from the items vec.
374    pub fn recalculate_stats(&mut self) {
375        self.item_count = self.items.len() as i64;
376        self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
377    }
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct DynamoDbState {
382    pub account_id: String,
383    pub region: String,
384    pub tables: BTreeMap<String, DynamoTable>,
385    pub backups: BTreeMap<String, BackupDescription>,
386    pub global_tables: BTreeMap<String, GlobalTableDescription>,
387    pub exports: BTreeMap<String, ExportDescription>,
388    pub imports: BTreeMap<String, ImportDescription>,
389}
390
391/// On-disk snapshot envelope. The payload is the full [`DynamoDbState`];
392/// `schema_version` lets us evolve the format without accidentally loading
393/// an incompatible dump on upgrade.
394#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct DynamoDbSnapshot {
396    pub schema_version: u32,
397    /// v2+: multi-account state.
398    #[serde(default)]
399    pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>,
400    /// v1 compat: single-account state.
401    #[serde(default)]
402    pub state: Option<DynamoDbState>,
403}
404
405pub const DYNAMODB_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
406
407impl DynamoDbState {
408    pub fn new(account_id: &str, region: &str) -> Self {
409        Self {
410            account_id: account_id.to_string(),
411            region: region.to_string(),
412            tables: BTreeMap::new(),
413            backups: BTreeMap::new(),
414            global_tables: BTreeMap::new(),
415            exports: BTreeMap::new(),
416            imports: BTreeMap::new(),
417        }
418    }
419
420    pub fn reset(&mut self) {
421        self.tables.clear();
422        self.backups.clear();
423        self.global_tables.clear();
424        self.exports.clear();
425        self.imports.clear();
426    }
427}
428
429impl fakecloud_core::multi_account::AccountState for DynamoDbState {
430    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
431        Self::new(account_id, region)
432    }
433}
434
435pub type SharedDynamoDbState =
436    Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>>;
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use serde_json::json;
442
443    #[test]
444    fn attribute_type_and_value_valid() {
445        let v = json!({"S": "hi"});
446        let (ty, val) = attribute_type_and_value(&v).unwrap();
447        assert_eq!(ty, "S");
448        assert_eq!(val, &json!("hi"));
449    }
450
451    #[test]
452    fn attribute_type_and_value_empty_returns_none() {
453        let v = json!({});
454        assert!(attribute_type_and_value(&v).is_none());
455    }
456
457    #[test]
458    fn attribute_type_and_value_multiple_entries_returns_none() {
459        let v = json!({"S": "hi", "N": "1"});
460        assert!(attribute_type_and_value(&v).is_none());
461    }
462
463    #[test]
464    fn attribute_type_and_value_non_object_returns_none() {
465        let v = json!("not-object");
466        assert!(attribute_type_and_value(&v).is_none());
467    }
468
469    #[test]
470    fn account_state_trait_impl() {
471        use fakecloud_core::multi_account::AccountState;
472        let state = DynamoDbState::new_for_account("123", "us-east-1", "");
473        assert_eq!(state.account_id, "123");
474        assert_eq!(state.region, "us-east-1");
475    }
476
477    #[test]
478    fn new_and_reset() {
479        let state = DynamoDbState::new("123", "us-east-1");
480        assert!(state.tables.is_empty());
481    }
482
483    fn table_with_hash_key(hash: &str) -> DynamoTable {
484        DynamoTable {
485            name: "t".to_string(),
486            arn: "arn:aws:dynamodb:us-east-1:123:table/t".to_string(),
487            table_id: "id".to_string(),
488            key_schema: vec![KeySchemaElement {
489                attribute_name: hash.to_string(),
490                key_type: "HASH".to_string(),
491            }],
492            attribute_definitions: vec![],
493            provisioned_throughput: ProvisionedThroughput {
494                read_capacity_units: 1,
495                write_capacity_units: 1,
496            },
497            items: Vec::new(),
498            gsi: Vec::new(),
499            lsi: Vec::new(),
500            tags: BTreeMap::new(),
501            created_at: Utc::now(),
502            status: "ACTIVE".to_string(),
503            item_count: 0,
504            size_bytes: 0,
505            billing_mode: "PROVISIONED".to_string(),
506            ttl_attribute: None,
507            ttl_enabled: false,
508            resource_policy: None,
509            pitr_enabled: false,
510            kinesis_destinations: Vec::new(),
511            contributor_insights_status: "DISABLED".to_string(),
512            contributor_insights_counters: BTreeMap::new(),
513            stream_enabled: false,
514            stream_view_type: None,
515            stream_arn: None,
516            stream_records: empty_stream_records(),
517            sse_type: None,
518            sse_kms_key_arn: None,
519            deletion_protection_enabled: false,
520            on_demand_throughput: None,
521        }
522    }
523
524    #[test]
525    fn hash_key_name_extracts_from_schema() {
526        let t = table_with_hash_key("pk");
527        assert_eq!(t.hash_key_name(), "pk");
528    }
529
530    #[test]
531    fn hash_key_name_empty_when_no_hash_schema() {
532        let mut t = table_with_hash_key("pk");
533        t.key_schema.clear();
534        assert_eq!(t.hash_key_name(), "");
535    }
536
537    #[test]
538    fn record_key_access_noop_when_disabled() {
539        let mut t = table_with_hash_key("pk");
540        let mut key = HashMap::new();
541        key.insert("pk".to_string(), json!({"S": "a"}));
542        t.record_key_access(&key);
543        assert!(t.contributor_insights_counters.is_empty());
544    }
545
546    #[test]
547    fn record_key_access_increments_when_enabled() {
548        let mut t = table_with_hash_key("pk");
549        t.contributor_insights_status = "ENABLED".to_string();
550        let mut key = HashMap::new();
551        key.insert("pk".to_string(), json!({"S": "a"}));
552        t.record_key_access(&key);
553        t.record_key_access(&key);
554        assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 2);
555    }
556
557    #[test]
558    fn record_item_access_uses_hash_key_from_item() {
559        let mut t = table_with_hash_key("pk");
560        t.contributor_insights_status = "ENABLED".to_string();
561        let mut item = HashMap::new();
562        item.insert("pk".to_string(), json!({"S": "user-1"}));
563        item.insert("other".to_string(), json!({"N": "42"}));
564        t.record_item_access(&item);
565        assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 1);
566    }
567
568    #[test]
569    fn top_contributors_returns_sorted() {
570        let mut t = table_with_hash_key("pk");
571        t.contributor_insights_counters.insert("a".to_string(), 3);
572        t.contributor_insights_counters.insert("b".to_string(), 10);
573        t.contributor_insights_counters.insert("c".to_string(), 1);
574        let top = t.top_contributors(2);
575        assert_eq!(top.len(), 2);
576        assert_eq!(top[0], ("b", 10));
577        assert_eq!(top[1], ("a", 3));
578    }
579
580    #[test]
581    fn recalculate_stats_matches_items() {
582        let mut t = table_with_hash_key("pk");
583        let mut item1 = HashMap::new();
584        item1.insert("pk".to_string(), json!({"S": "hello"}));
585        let mut item2 = HashMap::new();
586        item2.insert("pk".to_string(), json!({"N": "42"}));
587        item2.insert("flag".to_string(), json!({"BOOL": true}));
588        t.items.push(item1);
589        t.items.push(item2);
590        t.recalculate_stats();
591        assert_eq!(t.item_count, 2);
592        assert!(t.size_bytes > 0);
593    }
594
595    #[test]
596    fn estimate_value_size_covers_all_types() {
597        let s = DynamoTable::estimate_value_size(&json!({"S": "abc"}));
598        assert_eq!(s, 3);
599        let n = DynamoTable::estimate_value_size(&json!({"N": "42"}));
600        assert_eq!(n, 2);
601        let b = DynamoTable::estimate_value_size(&json!({"BOOL": true}));
602        assert_eq!(b, 1);
603        let null = DynamoTable::estimate_value_size(&json!({"NULL": true}));
604        assert_eq!(null, 1);
605        let l = DynamoTable::estimate_value_size(&json!({"L": [{"S": "x"}, {"S": "yy"}]}));
606        assert_eq!(l, 6);
607        let m = DynamoTable::estimate_value_size(&json!({"M": {"key": {"S": "v"}}}));
608        assert_eq!(m, 7);
609        let ss = DynamoTable::estimate_value_size(&json!({"SS": ["ab", "cde"]}));
610        assert_eq!(ss, 5);
611        let ns = DynamoTable::estimate_value_size(&json!({"NS": ["12", "345"]}));
612        assert_eq!(ns, 5);
613        let bin = DynamoTable::estimate_value_size(&json!({"B": "AAAAAAAA"}));
614        assert_eq!(bin, 6);
615    }
616}