1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::{BTreeMap, HashMap};
6use std::sync::Arc;
7
8fn empty_stream_records() -> Arc<RwLock<Vec<StreamRecord>>> {
9 Arc::new(RwLock::new(Vec::new()))
10}
11
12mod stream_records_serde {
17 use super::{Arc, RwLock, StreamRecord};
18 use serde::{Deserialize, Deserializer, Serialize, Serializer};
19
20 pub fn serialize<S: Serializer>(
21 v: &Arc<RwLock<Vec<StreamRecord>>>,
22 s: S,
23 ) -> Result<S::Ok, S::Error> {
24 v.read().serialize(s)
25 }
26
27 pub fn deserialize<'de, D: Deserializer<'de>>(
28 d: D,
29 ) -> Result<Arc<RwLock<Vec<StreamRecord>>>, D::Error> {
30 let records = Vec::<StreamRecord>::deserialize(d)?;
31 for r in &records {
35 crate::streams::observe_stream_sequence(&r.dynamodb.sequence_number);
36 }
37 Ok(Arc::new(RwLock::new(records)))
38 }
39}
40
41pub type AttributeValue = Value;
44
45pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
48 let obj = av.as_object()?;
49 if obj.len() != 1 {
50 return None;
51 }
52 let (k, v) = obj.iter().next()?;
53 Some((k.as_str(), v))
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct KeySchemaElement {
58 pub attribute_name: String,
59 pub key_type: String, }
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct AttributeDefinition {
64 pub attribute_name: String,
65 pub attribute_type: String, }
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct ProvisionedThroughput {
70 pub read_capacity_units: i64,
71 pub write_capacity_units: i64,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct OnDemandThroughput {
80 pub max_read_request_units: i64,
81 pub max_write_request_units: i64,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct GlobalSecondaryIndex {
86 pub index_name: String,
87 pub key_schema: Vec<KeySchemaElement>,
88 pub projection: Projection,
89 pub provisioned_throughput: Option<ProvisionedThroughput>,
90 pub on_demand_throughput: Option<OnDemandThroughput>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct LocalSecondaryIndex {
95 pub index_name: String,
96 pub key_schema: Vec<KeySchemaElement>,
97 pub projection: Projection,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct Projection {
102 pub projection_type: String, pub non_key_attributes: Vec<String>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct DynamoTable {
108 pub name: String,
109 pub arn: String,
110 pub table_id: String,
111 pub key_schema: Vec<KeySchemaElement>,
112 pub attribute_definitions: Vec<AttributeDefinition>,
113 pub provisioned_throughput: ProvisionedThroughput,
114 pub items: Vec<HashMap<String, AttributeValue>>,
115 pub gsi: Vec<GlobalSecondaryIndex>,
116 pub lsi: Vec<LocalSecondaryIndex>,
117 pub tags: BTreeMap<String, String>,
118 pub created_at: DateTime<Utc>,
119 pub status: String,
120 pub item_count: i64,
121 pub size_bytes: i64,
122 pub billing_mode: String, pub ttl_attribute: Option<String>,
124 pub ttl_enabled: bool,
125 pub resource_policy: Option<String>,
126 pub pitr_enabled: bool,
128 pub kinesis_destinations: Vec<KinesisDestination>,
130 pub contributor_insights_status: String,
132 pub contributor_insights_counters: BTreeMap<String, u64>,
134 pub stream_enabled: bool,
136 pub stream_view_type: Option<String>, pub stream_arn: Option<String>,
138 #[serde(with = "stream_records_serde", default = "empty_stream_records")]
141 pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
142 pub sse_type: Option<String>,
144 pub sse_kms_key_arn: Option<String>,
146 pub deletion_protection_enabled: bool,
150 pub on_demand_throughput: Option<OnDemandThroughput>,
154 #[serde(default = "default_table_class")]
158 pub table_class: String,
159}
160
161pub(crate) fn default_table_class() -> String {
162 "STANDARD".to_string()
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct StreamRecord {
167 pub event_id: String,
168 pub event_name: String, pub event_version: String,
170 pub event_source: String,
171 pub aws_region: String,
172 pub dynamodb: DynamoDbStreamRecord,
173 pub event_source_arn: String,
174 pub timestamp: DateTime<Utc>,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct DynamoDbStreamRecord {
179 pub keys: HashMap<String, AttributeValue>,
180 pub new_image: Option<HashMap<String, AttributeValue>>,
181 pub old_image: Option<HashMap<String, AttributeValue>>,
182 pub sequence_number: String,
183 pub size_bytes: i64,
184 pub stream_view_type: String,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct KinesisDestination {
189 pub stream_arn: String,
190 pub destination_status: String,
191 pub approximate_creation_date_time_precision: String,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct BackupDescription {
196 pub backup_arn: String,
197 pub backup_name: String,
198 pub table_name: String,
199 pub table_arn: String,
200 pub backup_status: String,
201 pub backup_type: String,
202 pub backup_creation_date: DateTime<Utc>,
203 pub key_schema: Vec<KeySchemaElement>,
204 pub attribute_definitions: Vec<AttributeDefinition>,
205 pub provisioned_throughput: ProvisionedThroughput,
206 pub billing_mode: String,
207 pub item_count: i64,
208 pub size_bytes: i64,
209 pub items: Vec<HashMap<String, AttributeValue>>,
211 #[serde(default)]
216 pub gsi: Vec<GlobalSecondaryIndex>,
217 #[serde(default)]
218 pub lsi: Vec<LocalSecondaryIndex>,
219 #[serde(default)]
220 pub tags: BTreeMap<String, String>,
221 #[serde(default)]
222 pub ttl_attribute: Option<String>,
223 #[serde(default)]
224 pub ttl_enabled: bool,
225 #[serde(default)]
226 pub sse_type: Option<String>,
227 #[serde(default)]
228 pub sse_kms_key_arn: Option<String>,
229 #[serde(default)]
230 pub stream_enabled: bool,
231 #[serde(default)]
232 pub stream_view_type: Option<String>,
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct GlobalTableDescription {
237 pub global_table_name: String,
238 pub global_table_arn: String,
239 pub global_table_status: String,
240 pub creation_date: DateTime<Utc>,
241 pub replication_group: Vec<ReplicaDescription>,
242 #[serde(default = "default_global_billing_mode")]
246 pub billing_mode: String,
247 #[serde(default)]
250 pub provisioned_write_capacity_units: Option<i64>,
251}
252
253fn default_global_billing_mode() -> String {
254 "PROVISIONED".to_string()
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct ReplicaDescription {
259 pub region_name: String,
260 pub replica_status: String,
261 #[serde(default)]
265 pub read_capacity_auto_scaling: Option<serde_json::Value>,
266 #[serde(default)]
268 pub write_capacity_auto_scaling: Option<serde_json::Value>,
269 #[serde(default)]
272 pub read_capacity_units: Option<i64>,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
276pub struct ExportDescription {
277 pub export_arn: String,
278 pub export_status: String,
279 pub table_arn: String,
280 pub s3_bucket: String,
281 pub s3_prefix: Option<String>,
282 pub export_format: String,
283 pub start_time: DateTime<Utc>,
284 pub end_time: DateTime<Utc>,
285 pub export_time: DateTime<Utc>,
286 pub item_count: i64,
287 pub billed_size_bytes: i64,
288}
289
290#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct ImportDescription {
292 pub import_arn: String,
293 pub import_status: String,
294 pub table_arn: String,
295 pub table_name: String,
296 pub s3_bucket_source: String,
297 pub input_format: String,
298 pub start_time: DateTime<Utc>,
299 pub end_time: DateTime<Utc>,
300 pub processed_item_count: i64,
301 pub processed_size_bytes: i64,
302}
303
304impl DynamoTable {
305 pub fn hash_key_name(&self) -> &str {
307 self.key_schema
308 .iter()
309 .find(|k| k.key_type == "HASH")
310 .map(|k| k.attribute_name.as_str())
311 .unwrap_or("")
312 }
313
314 pub fn range_key_name(&self) -> Option<&str> {
316 self.key_schema
317 .iter()
318 .find(|k| k.key_type == "RANGE")
319 .map(|k| k.attribute_name.as_str())
320 }
321
322 pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
324 let hash_key = self.hash_key_name();
325 let range_key = self.range_key_name();
326
327 self.items.iter().position(|item| {
328 let hash_match = match (item.get(hash_key), key.get(hash_key)) {
329 (Some(a), Some(b)) => a == b,
330 _ => false,
331 };
332 if !hash_match {
333 return false;
334 }
335 match range_key {
336 Some(rk) => match (item.get(rk), key.get(rk)) {
337 (Some(a), Some(b)) => a == b,
338 (None, None) => true,
339 _ => false,
340 },
341 None => true,
342 }
343 })
344 }
345
346 fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
348 let mut size: i64 = 0;
349 for (k, v) in item {
350 size += k.len() as i64;
351 size += Self::estimate_value_size(v);
352 }
353 size
354 }
355
356 fn estimate_value_size(v: &Value) -> i64 {
357 match v {
358 Value::Object(obj) => {
359 if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
360 s.len() as i64
361 } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
362 n.len() as i64
363 } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
364 1
365 } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
366 3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
367 } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
368 3 + m
369 .iter()
370 .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
371 .sum::<i64>()
372 } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
373 ss.iter()
374 .filter_map(|v| v.as_str())
375 .map(|s| s.len() as i64)
376 .sum()
377 } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
378 ns.iter()
379 .filter_map(|v| v.as_str())
380 .map(|s| s.len() as i64)
381 .sum()
382 } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
383 (b.len() as i64 * 3) / 4
385 } else {
386 v.to_string().len() as i64
387 }
388 }
389 _ => v.to_string().len() as i64,
390 }
391 }
392
393 pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
396 if self.contributor_insights_status != "ENABLED" {
397 return;
398 }
399 let hash_key = self.hash_key_name().to_string();
400 if let Some(pk_value) = key.get(&hash_key) {
401 let key_str = pk_value.to_string();
402 *self
403 .contributor_insights_counters
404 .entry(key_str)
405 .or_insert(0) += 1;
406 }
407 }
408
409 pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
411 if self.contributor_insights_status != "ENABLED" {
412 return;
413 }
414 let hash_key = self.hash_key_name().to_string();
415 if let Some(pk_value) = item.get(&hash_key) {
416 let key_str = pk_value.to_string();
417 *self
418 .contributor_insights_counters
419 .entry(key_str)
420 .or_insert(0) += 1;
421 }
422 }
423
424 pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
426 let mut entries: Vec<(&str, u64)> = self
427 .contributor_insights_counters
428 .iter()
429 .map(|(k, &v)| (k.as_str(), v))
430 .collect();
431 entries.sort_by_key(|e| std::cmp::Reverse(e.1));
432 entries.truncate(n);
433 entries
434 }
435
436 pub fn recalculate_stats(&mut self) {
438 self.item_count = self.items.len() as i64;
439 self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
440 }
441}
442
443#[derive(Debug, Clone, Serialize, Deserialize)]
444pub struct DynamoDbState {
445 pub account_id: String,
446 pub region: String,
447 pub tables: BTreeMap<String, DynamoTable>,
448 pub backups: BTreeMap<String, BackupDescription>,
449 pub global_tables: BTreeMap<String, GlobalTableDescription>,
450 pub exports: BTreeMap<String, ExportDescription>,
451 pub imports: BTreeMap<String, ImportDescription>,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
458pub struct DynamoDbSnapshot {
459 pub schema_version: u32,
460 #[serde(default)]
462 pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>,
463 #[serde(default)]
465 pub state: Option<DynamoDbState>,
466}
467
468pub const DYNAMODB_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
469
470impl DynamoDbState {
471 pub fn new(account_id: &str, region: &str) -> Self {
472 Self {
473 account_id: account_id.to_string(),
474 region: region.to_string(),
475 tables: BTreeMap::new(),
476 backups: BTreeMap::new(),
477 global_tables: BTreeMap::new(),
478 exports: BTreeMap::new(),
479 imports: BTreeMap::new(),
480 }
481 }
482
483 pub fn reset(&mut self) {
484 self.tables.clear();
485 self.backups.clear();
486 self.global_tables.clear();
487 self.exports.clear();
488 self.imports.clear();
489 }
490}
491
492impl fakecloud_core::multi_account::AccountState for DynamoDbState {
493 fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
494 Self::new(account_id, region)
495 }
496}
497
498pub type SharedDynamoDbState =
499 Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>>;
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504 use serde_json::json;
505
506 #[test]
507 fn attribute_type_and_value_valid() {
508 let v = json!({"S": "hi"});
509 let (ty, val) = attribute_type_and_value(&v).unwrap();
510 assert_eq!(ty, "S");
511 assert_eq!(val, &json!("hi"));
512 }
513
514 #[test]
515 fn attribute_type_and_value_empty_returns_none() {
516 let v = json!({});
517 assert!(attribute_type_and_value(&v).is_none());
518 }
519
520 #[test]
521 fn attribute_type_and_value_multiple_entries_returns_none() {
522 let v = json!({"S": "hi", "N": "1"});
523 assert!(attribute_type_and_value(&v).is_none());
524 }
525
526 #[test]
527 fn attribute_type_and_value_non_object_returns_none() {
528 let v = json!("not-object");
529 assert!(attribute_type_and_value(&v).is_none());
530 }
531
532 #[test]
533 fn account_state_trait_impl() {
534 use fakecloud_core::multi_account::AccountState;
535 let state = DynamoDbState::new_for_account("123", "us-east-1", "");
536 assert_eq!(state.account_id, "123");
537 assert_eq!(state.region, "us-east-1");
538 }
539
540 #[test]
541 fn new_and_reset() {
542 let state = DynamoDbState::new("123", "us-east-1");
543 assert!(state.tables.is_empty());
544 }
545
546 fn table_with_hash_key(hash: &str) -> DynamoTable {
547 DynamoTable {
548 name: "t".to_string(),
549 arn: "arn:aws:dynamodb:us-east-1:123:table/t".to_string(),
550 table_id: "id".to_string(),
551 key_schema: vec![KeySchemaElement {
552 attribute_name: hash.to_string(),
553 key_type: "HASH".to_string(),
554 }],
555 attribute_definitions: vec![],
556 provisioned_throughput: ProvisionedThroughput {
557 read_capacity_units: 1,
558 write_capacity_units: 1,
559 },
560 items: Vec::new(),
561 gsi: Vec::new(),
562 lsi: Vec::new(),
563 tags: BTreeMap::new(),
564 created_at: Utc::now(),
565 status: "ACTIVE".to_string(),
566 item_count: 0,
567 size_bytes: 0,
568 billing_mode: "PROVISIONED".to_string(),
569 ttl_attribute: None,
570 ttl_enabled: false,
571 resource_policy: None,
572 pitr_enabled: false,
573 kinesis_destinations: Vec::new(),
574 contributor_insights_status: "DISABLED".to_string(),
575 contributor_insights_counters: BTreeMap::new(),
576 stream_enabled: false,
577 stream_view_type: None,
578 stream_arn: None,
579 stream_records: empty_stream_records(),
580 sse_type: None,
581 sse_kms_key_arn: None,
582 deletion_protection_enabled: false,
583 on_demand_throughput: None,
584 table_class: default_table_class(),
585 }
586 }
587
588 #[test]
589 fn hash_key_name_extracts_from_schema() {
590 let t = table_with_hash_key("pk");
591 assert_eq!(t.hash_key_name(), "pk");
592 }
593
594 #[test]
595 fn hash_key_name_empty_when_no_hash_schema() {
596 let mut t = table_with_hash_key("pk");
597 t.key_schema.clear();
598 assert_eq!(t.hash_key_name(), "");
599 }
600
601 #[test]
602 fn record_key_access_noop_when_disabled() {
603 let mut t = table_with_hash_key("pk");
604 let mut key = HashMap::new();
605 key.insert("pk".to_string(), json!({"S": "a"}));
606 t.record_key_access(&key);
607 assert!(t.contributor_insights_counters.is_empty());
608 }
609
610 #[test]
611 fn record_key_access_increments_when_enabled() {
612 let mut t = table_with_hash_key("pk");
613 t.contributor_insights_status = "ENABLED".to_string();
614 let mut key = HashMap::new();
615 key.insert("pk".to_string(), json!({"S": "a"}));
616 t.record_key_access(&key);
617 t.record_key_access(&key);
618 assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 2);
619 }
620
621 #[test]
622 fn record_item_access_uses_hash_key_from_item() {
623 let mut t = table_with_hash_key("pk");
624 t.contributor_insights_status = "ENABLED".to_string();
625 let mut item = HashMap::new();
626 item.insert("pk".to_string(), json!({"S": "user-1"}));
627 item.insert("other".to_string(), json!({"N": "42"}));
628 t.record_item_access(&item);
629 assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 1);
630 }
631
632 #[test]
633 fn top_contributors_returns_sorted() {
634 let mut t = table_with_hash_key("pk");
635 t.contributor_insights_counters.insert("a".to_string(), 3);
636 t.contributor_insights_counters.insert("b".to_string(), 10);
637 t.contributor_insights_counters.insert("c".to_string(), 1);
638 let top = t.top_contributors(2);
639 assert_eq!(top.len(), 2);
640 assert_eq!(top[0], ("b", 10));
641 assert_eq!(top[1], ("a", 3));
642 }
643
644 #[test]
645 fn recalculate_stats_matches_items() {
646 let mut t = table_with_hash_key("pk");
647 let mut item1 = HashMap::new();
648 item1.insert("pk".to_string(), json!({"S": "hello"}));
649 let mut item2 = HashMap::new();
650 item2.insert("pk".to_string(), json!({"N": "42"}));
651 item2.insert("flag".to_string(), json!({"BOOL": true}));
652 t.items.push(item1);
653 t.items.push(item2);
654 t.recalculate_stats();
655 assert_eq!(t.item_count, 2);
656 assert!(t.size_bytes > 0);
657 }
658
659 #[test]
660 fn estimate_value_size_covers_all_types() {
661 let s = DynamoTable::estimate_value_size(&json!({"S": "abc"}));
662 assert_eq!(s, 3);
663 let n = DynamoTable::estimate_value_size(&json!({"N": "42"}));
664 assert_eq!(n, 2);
665 let b = DynamoTable::estimate_value_size(&json!({"BOOL": true}));
666 assert_eq!(b, 1);
667 let null = DynamoTable::estimate_value_size(&json!({"NULL": true}));
668 assert_eq!(null, 1);
669 let l = DynamoTable::estimate_value_size(&json!({"L": [{"S": "x"}, {"S": "yy"}]}));
670 assert_eq!(l, 6);
671 let m = DynamoTable::estimate_value_size(&json!({"M": {"key": {"S": "v"}}}));
672 assert_eq!(m, 7);
673 let ss = DynamoTable::estimate_value_size(&json!({"SS": ["ab", "cde"]}));
674 assert_eq!(ss, 5);
675 let ns = DynamoTable::estimate_value_size(&json!({"NS": ["12", "345"]}));
676 assert_eq!(ns, 5);
677 let bin = DynamoTable::estimate_value_size(&json!({"B": "AAAAAAAA"}));
678 assert_eq!(bin, 6);
679 }
680}