1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::{BTreeMap, HashMap};
6use std::sync::Arc;
7
8fn empty_stream_records() -> Arc<RwLock<Vec<StreamRecord>>> {
9 Arc::new(RwLock::new(Vec::new()))
10}
11
12pub type AttributeValue = Value;
15
16pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
19 let obj = av.as_object()?;
20 if obj.len() != 1 {
21 return None;
22 }
23 let (k, v) = obj.iter().next()?;
24 Some((k.as_str(), v))
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct KeySchemaElement {
29 pub attribute_name: String,
30 pub key_type: String, }
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct AttributeDefinition {
35 pub attribute_name: String,
36 pub attribute_type: String, }
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ProvisionedThroughput {
41 pub read_capacity_units: i64,
42 pub write_capacity_units: i64,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct OnDemandThroughput {
51 pub max_read_request_units: i64,
52 pub max_write_request_units: i64,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct GlobalSecondaryIndex {
57 pub index_name: String,
58 pub key_schema: Vec<KeySchemaElement>,
59 pub projection: Projection,
60 pub provisioned_throughput: Option<ProvisionedThroughput>,
61 pub on_demand_throughput: Option<OnDemandThroughput>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct LocalSecondaryIndex {
66 pub index_name: String,
67 pub key_schema: Vec<KeySchemaElement>,
68 pub projection: Projection,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct Projection {
73 pub projection_type: String, pub non_key_attributes: Vec<String>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct DynamoTable {
79 pub name: String,
80 pub arn: String,
81 pub table_id: String,
82 pub key_schema: Vec<KeySchemaElement>,
83 pub attribute_definitions: Vec<AttributeDefinition>,
84 pub provisioned_throughput: ProvisionedThroughput,
85 pub items: Vec<HashMap<String, AttributeValue>>,
86 pub gsi: Vec<GlobalSecondaryIndex>,
87 pub lsi: Vec<LocalSecondaryIndex>,
88 pub tags: BTreeMap<String, String>,
89 pub created_at: DateTime<Utc>,
90 pub status: String,
91 pub item_count: i64,
92 pub size_bytes: i64,
93 pub billing_mode: String, pub ttl_attribute: Option<String>,
95 pub ttl_enabled: bool,
96 pub resource_policy: Option<String>,
97 pub pitr_enabled: bool,
99 pub kinesis_destinations: Vec<KinesisDestination>,
101 pub contributor_insights_status: String,
103 pub contributor_insights_counters: BTreeMap<String, u64>,
105 pub stream_enabled: bool,
107 pub stream_view_type: Option<String>, pub stream_arn: Option<String>,
109 #[serde(skip, default = "empty_stream_records")]
112 pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
113 pub sse_type: Option<String>,
115 pub sse_kms_key_arn: Option<String>,
117 pub deletion_protection_enabled: bool,
121 pub on_demand_throughput: Option<OnDemandThroughput>,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct StreamRecord {
129 pub event_id: String,
130 pub event_name: String, pub event_version: String,
132 pub event_source: String,
133 pub aws_region: String,
134 pub dynamodb: DynamoDbStreamRecord,
135 pub event_source_arn: String,
136 pub timestamp: DateTime<Utc>,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct DynamoDbStreamRecord {
141 pub keys: HashMap<String, AttributeValue>,
142 pub new_image: Option<HashMap<String, AttributeValue>>,
143 pub old_image: Option<HashMap<String, AttributeValue>>,
144 pub sequence_number: String,
145 pub size_bytes: i64,
146 pub stream_view_type: String,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct KinesisDestination {
151 pub stream_arn: String,
152 pub destination_status: String,
153 pub approximate_creation_date_time_precision: String,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct BackupDescription {
158 pub backup_arn: String,
159 pub backup_name: String,
160 pub table_name: String,
161 pub table_arn: String,
162 pub backup_status: String,
163 pub backup_type: String,
164 pub backup_creation_date: DateTime<Utc>,
165 pub key_schema: Vec<KeySchemaElement>,
166 pub attribute_definitions: Vec<AttributeDefinition>,
167 pub provisioned_throughput: ProvisionedThroughput,
168 pub billing_mode: String,
169 pub item_count: i64,
170 pub size_bytes: i64,
171 pub items: Vec<HashMap<String, AttributeValue>>,
173 #[serde(default)]
178 pub gsi: Vec<GlobalSecondaryIndex>,
179 #[serde(default)]
180 pub lsi: Vec<LocalSecondaryIndex>,
181 #[serde(default)]
182 pub tags: BTreeMap<String, String>,
183 #[serde(default)]
184 pub ttl_attribute: Option<String>,
185 #[serde(default)]
186 pub ttl_enabled: bool,
187 #[serde(default)]
188 pub sse_type: Option<String>,
189 #[serde(default)]
190 pub sse_kms_key_arn: Option<String>,
191 #[serde(default)]
192 pub stream_enabled: bool,
193 #[serde(default)]
194 pub stream_view_type: Option<String>,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct GlobalTableDescription {
199 pub global_table_name: String,
200 pub global_table_arn: String,
201 pub global_table_status: String,
202 pub creation_date: DateTime<Utc>,
203 pub replication_group: Vec<ReplicaDescription>,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct ReplicaDescription {
208 pub region_name: String,
209 pub replica_status: String,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct ExportDescription {
214 pub export_arn: String,
215 pub export_status: String,
216 pub table_arn: String,
217 pub s3_bucket: String,
218 pub s3_prefix: Option<String>,
219 pub export_format: String,
220 pub start_time: DateTime<Utc>,
221 pub end_time: DateTime<Utc>,
222 pub export_time: DateTime<Utc>,
223 pub item_count: i64,
224 pub billed_size_bytes: i64,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct ImportDescription {
229 pub import_arn: String,
230 pub import_status: String,
231 pub table_arn: String,
232 pub table_name: String,
233 pub s3_bucket_source: String,
234 pub input_format: String,
235 pub start_time: DateTime<Utc>,
236 pub end_time: DateTime<Utc>,
237 pub processed_item_count: i64,
238 pub processed_size_bytes: i64,
239}
240
241impl DynamoTable {
242 pub fn hash_key_name(&self) -> &str {
244 self.key_schema
245 .iter()
246 .find(|k| k.key_type == "HASH")
247 .map(|k| k.attribute_name.as_str())
248 .unwrap_or("")
249 }
250
251 pub fn range_key_name(&self) -> Option<&str> {
253 self.key_schema
254 .iter()
255 .find(|k| k.key_type == "RANGE")
256 .map(|k| k.attribute_name.as_str())
257 }
258
259 pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
261 let hash_key = self.hash_key_name();
262 let range_key = self.range_key_name();
263
264 self.items.iter().position(|item| {
265 let hash_match = match (item.get(hash_key), key.get(hash_key)) {
266 (Some(a), Some(b)) => a == b,
267 _ => false,
268 };
269 if !hash_match {
270 return false;
271 }
272 match range_key {
273 Some(rk) => match (item.get(rk), key.get(rk)) {
274 (Some(a), Some(b)) => a == b,
275 (None, None) => true,
276 _ => false,
277 },
278 None => true,
279 }
280 })
281 }
282
283 fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
285 let mut size: i64 = 0;
286 for (k, v) in item {
287 size += k.len() as i64;
288 size += Self::estimate_value_size(v);
289 }
290 size
291 }
292
293 fn estimate_value_size(v: &Value) -> i64 {
294 match v {
295 Value::Object(obj) => {
296 if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
297 s.len() as i64
298 } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
299 n.len() as i64
300 } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
301 1
302 } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
303 3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
304 } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
305 3 + m
306 .iter()
307 .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
308 .sum::<i64>()
309 } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
310 ss.iter()
311 .filter_map(|v| v.as_str())
312 .map(|s| s.len() as i64)
313 .sum()
314 } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
315 ns.iter()
316 .filter_map(|v| v.as_str())
317 .map(|s| s.len() as i64)
318 .sum()
319 } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
320 (b.len() as i64 * 3) / 4
322 } else {
323 v.to_string().len() as i64
324 }
325 }
326 _ => v.to_string().len() as i64,
327 }
328 }
329
330 pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
333 if self.contributor_insights_status != "ENABLED" {
334 return;
335 }
336 let hash_key = self.hash_key_name().to_string();
337 if let Some(pk_value) = key.get(&hash_key) {
338 let key_str = pk_value.to_string();
339 *self
340 .contributor_insights_counters
341 .entry(key_str)
342 .or_insert(0) += 1;
343 }
344 }
345
346 pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
348 if self.contributor_insights_status != "ENABLED" {
349 return;
350 }
351 let hash_key = self.hash_key_name().to_string();
352 if let Some(pk_value) = item.get(&hash_key) {
353 let key_str = pk_value.to_string();
354 *self
355 .contributor_insights_counters
356 .entry(key_str)
357 .or_insert(0) += 1;
358 }
359 }
360
361 pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
363 let mut entries: Vec<(&str, u64)> = self
364 .contributor_insights_counters
365 .iter()
366 .map(|(k, &v)| (k.as_str(), v))
367 .collect();
368 entries.sort_by_key(|e| std::cmp::Reverse(e.1));
369 entries.truncate(n);
370 entries
371 }
372
373 pub fn recalculate_stats(&mut self) {
375 self.item_count = self.items.len() as i64;
376 self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
377 }
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct DynamoDbState {
382 pub account_id: String,
383 pub region: String,
384 pub tables: BTreeMap<String, DynamoTable>,
385 pub backups: BTreeMap<String, BackupDescription>,
386 pub global_tables: BTreeMap<String, GlobalTableDescription>,
387 pub exports: BTreeMap<String, ExportDescription>,
388 pub imports: BTreeMap<String, ImportDescription>,
389}
390
391#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct DynamoDbSnapshot {
396 pub schema_version: u32,
397 #[serde(default)]
399 pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>,
400 #[serde(default)]
402 pub state: Option<DynamoDbState>,
403}
404
405pub const DYNAMODB_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
406
407impl DynamoDbState {
408 pub fn new(account_id: &str, region: &str) -> Self {
409 Self {
410 account_id: account_id.to_string(),
411 region: region.to_string(),
412 tables: BTreeMap::new(),
413 backups: BTreeMap::new(),
414 global_tables: BTreeMap::new(),
415 exports: BTreeMap::new(),
416 imports: BTreeMap::new(),
417 }
418 }
419
420 pub fn reset(&mut self) {
421 self.tables.clear();
422 self.backups.clear();
423 self.global_tables.clear();
424 self.exports.clear();
425 self.imports.clear();
426 }
427}
428
429impl fakecloud_core::multi_account::AccountState for DynamoDbState {
430 fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
431 Self::new(account_id, region)
432 }
433}
434
435pub type SharedDynamoDbState =
436 Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>>;
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use serde_json::json;
442
443 #[test]
444 fn attribute_type_and_value_valid() {
445 let v = json!({"S": "hi"});
446 let (ty, val) = attribute_type_and_value(&v).unwrap();
447 assert_eq!(ty, "S");
448 assert_eq!(val, &json!("hi"));
449 }
450
451 #[test]
452 fn attribute_type_and_value_empty_returns_none() {
453 let v = json!({});
454 assert!(attribute_type_and_value(&v).is_none());
455 }
456
457 #[test]
458 fn attribute_type_and_value_multiple_entries_returns_none() {
459 let v = json!({"S": "hi", "N": "1"});
460 assert!(attribute_type_and_value(&v).is_none());
461 }
462
463 #[test]
464 fn attribute_type_and_value_non_object_returns_none() {
465 let v = json!("not-object");
466 assert!(attribute_type_and_value(&v).is_none());
467 }
468
469 #[test]
470 fn account_state_trait_impl() {
471 use fakecloud_core::multi_account::AccountState;
472 let state = DynamoDbState::new_for_account("123", "us-east-1", "");
473 assert_eq!(state.account_id, "123");
474 assert_eq!(state.region, "us-east-1");
475 }
476
477 #[test]
478 fn new_and_reset() {
479 let state = DynamoDbState::new("123", "us-east-1");
480 assert!(state.tables.is_empty());
481 }
482
483 fn table_with_hash_key(hash: &str) -> DynamoTable {
484 DynamoTable {
485 name: "t".to_string(),
486 arn: "arn:aws:dynamodb:us-east-1:123:table/t".to_string(),
487 table_id: "id".to_string(),
488 key_schema: vec![KeySchemaElement {
489 attribute_name: hash.to_string(),
490 key_type: "HASH".to_string(),
491 }],
492 attribute_definitions: vec![],
493 provisioned_throughput: ProvisionedThroughput {
494 read_capacity_units: 1,
495 write_capacity_units: 1,
496 },
497 items: Vec::new(),
498 gsi: Vec::new(),
499 lsi: Vec::new(),
500 tags: BTreeMap::new(),
501 created_at: Utc::now(),
502 status: "ACTIVE".to_string(),
503 item_count: 0,
504 size_bytes: 0,
505 billing_mode: "PROVISIONED".to_string(),
506 ttl_attribute: None,
507 ttl_enabled: false,
508 resource_policy: None,
509 pitr_enabled: false,
510 kinesis_destinations: Vec::new(),
511 contributor_insights_status: "DISABLED".to_string(),
512 contributor_insights_counters: BTreeMap::new(),
513 stream_enabled: false,
514 stream_view_type: None,
515 stream_arn: None,
516 stream_records: empty_stream_records(),
517 sse_type: None,
518 sse_kms_key_arn: None,
519 deletion_protection_enabled: false,
520 on_demand_throughput: None,
521 }
522 }
523
524 #[test]
525 fn hash_key_name_extracts_from_schema() {
526 let t = table_with_hash_key("pk");
527 assert_eq!(t.hash_key_name(), "pk");
528 }
529
530 #[test]
531 fn hash_key_name_empty_when_no_hash_schema() {
532 let mut t = table_with_hash_key("pk");
533 t.key_schema.clear();
534 assert_eq!(t.hash_key_name(), "");
535 }
536
537 #[test]
538 fn record_key_access_noop_when_disabled() {
539 let mut t = table_with_hash_key("pk");
540 let mut key = HashMap::new();
541 key.insert("pk".to_string(), json!({"S": "a"}));
542 t.record_key_access(&key);
543 assert!(t.contributor_insights_counters.is_empty());
544 }
545
546 #[test]
547 fn record_key_access_increments_when_enabled() {
548 let mut t = table_with_hash_key("pk");
549 t.contributor_insights_status = "ENABLED".to_string();
550 let mut key = HashMap::new();
551 key.insert("pk".to_string(), json!({"S": "a"}));
552 t.record_key_access(&key);
553 t.record_key_access(&key);
554 assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 2);
555 }
556
557 #[test]
558 fn record_item_access_uses_hash_key_from_item() {
559 let mut t = table_with_hash_key("pk");
560 t.contributor_insights_status = "ENABLED".to_string();
561 let mut item = HashMap::new();
562 item.insert("pk".to_string(), json!({"S": "user-1"}));
563 item.insert("other".to_string(), json!({"N": "42"}));
564 t.record_item_access(&item);
565 assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 1);
566 }
567
568 #[test]
569 fn top_contributors_returns_sorted() {
570 let mut t = table_with_hash_key("pk");
571 t.contributor_insights_counters.insert("a".to_string(), 3);
572 t.contributor_insights_counters.insert("b".to_string(), 10);
573 t.contributor_insights_counters.insert("c".to_string(), 1);
574 let top = t.top_contributors(2);
575 assert_eq!(top.len(), 2);
576 assert_eq!(top[0], ("b", 10));
577 assert_eq!(top[1], ("a", 3));
578 }
579
580 #[test]
581 fn recalculate_stats_matches_items() {
582 let mut t = table_with_hash_key("pk");
583 let mut item1 = HashMap::new();
584 item1.insert("pk".to_string(), json!({"S": "hello"}));
585 let mut item2 = HashMap::new();
586 item2.insert("pk".to_string(), json!({"N": "42"}));
587 item2.insert("flag".to_string(), json!({"BOOL": true}));
588 t.items.push(item1);
589 t.items.push(item2);
590 t.recalculate_stats();
591 assert_eq!(t.item_count, 2);
592 assert!(t.size_bytes > 0);
593 }
594
595 #[test]
596 fn estimate_value_size_covers_all_types() {
597 let s = DynamoTable::estimate_value_size(&json!({"S": "abc"}));
598 assert_eq!(s, 3);
599 let n = DynamoTable::estimate_value_size(&json!({"N": "42"}));
600 assert_eq!(n, 2);
601 let b = DynamoTable::estimate_value_size(&json!({"BOOL": true}));
602 assert_eq!(b, 1);
603 let null = DynamoTable::estimate_value_size(&json!({"NULL": true}));
604 assert_eq!(null, 1);
605 let l = DynamoTable::estimate_value_size(&json!({"L": [{"S": "x"}, {"S": "yy"}]}));
606 assert_eq!(l, 6);
607 let m = DynamoTable::estimate_value_size(&json!({"M": {"key": {"S": "v"}}}));
608 assert_eq!(m, 7);
609 let ss = DynamoTable::estimate_value_size(&json!({"SS": ["ab", "cde"]}));
610 assert_eq!(ss, 5);
611 let ns = DynamoTable::estimate_value_size(&json!({"NS": ["12", "345"]}));
612 assert_eq!(ns, 5);
613 let bin = DynamoTable::estimate_value_size(&json!({"B": "AAAAAAAA"}));
614 assert_eq!(bin, 6);
615 }
616}