1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::{BTreeMap, HashMap};
6use std::sync::Arc;
7
8fn empty_stream_records() -> Arc<RwLock<Vec<StreamRecord>>> {
9 Arc::new(RwLock::new(Vec::new()))
10}
11
12mod stream_records_serde {
17 use super::{Arc, RwLock, StreamRecord};
18 use serde::{Deserialize, Deserializer, Serialize, Serializer};
19
20 pub fn serialize<S: Serializer>(
21 v: &Arc<RwLock<Vec<StreamRecord>>>,
22 s: S,
23 ) -> Result<S::Ok, S::Error> {
24 v.read().serialize(s)
25 }
26
27 pub fn deserialize<'de, D: Deserializer<'de>>(
28 d: D,
29 ) -> Result<Arc<RwLock<Vec<StreamRecord>>>, D::Error> {
30 let records = Vec::<StreamRecord>::deserialize(d)?;
31 for r in &records {
35 crate::streams::observe_stream_sequence(&r.dynamodb.sequence_number);
36 }
37 Ok(Arc::new(RwLock::new(records)))
38 }
39}
40
41pub type AttributeValue = Value;
44
45pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
48 let obj = av.as_object()?;
49 if obj.len() != 1 {
50 return None;
51 }
52 let (k, v) = obj.iter().next()?;
53 Some((k.as_str(), v))
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct KeySchemaElement {
58 pub attribute_name: String,
59 pub key_type: String, }
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct AttributeDefinition {
64 pub attribute_name: String,
65 pub attribute_type: String, }
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct ProvisionedThroughput {
70 pub read_capacity_units: i64,
71 pub write_capacity_units: i64,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct OnDemandThroughput {
80 pub max_read_request_units: i64,
81 pub max_write_request_units: i64,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct GlobalSecondaryIndex {
86 pub index_name: String,
87 pub key_schema: Vec<KeySchemaElement>,
88 pub projection: Projection,
89 pub provisioned_throughput: Option<ProvisionedThroughput>,
90 pub on_demand_throughput: Option<OnDemandThroughput>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct LocalSecondaryIndex {
95 pub index_name: String,
96 pub key_schema: Vec<KeySchemaElement>,
97 pub projection: Projection,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct Projection {
102 pub projection_type: String, pub non_key_attributes: Vec<String>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct DynamoTable {
108 pub name: String,
109 pub arn: String,
110 pub table_id: String,
111 pub key_schema: Vec<KeySchemaElement>,
112 pub attribute_definitions: Vec<AttributeDefinition>,
113 pub provisioned_throughput: ProvisionedThroughput,
114 pub items: Vec<HashMap<String, AttributeValue>>,
115 pub gsi: Vec<GlobalSecondaryIndex>,
116 pub lsi: Vec<LocalSecondaryIndex>,
117 pub tags: BTreeMap<String, String>,
118 pub created_at: DateTime<Utc>,
119 pub status: String,
120 pub item_count: i64,
121 pub size_bytes: i64,
122 pub billing_mode: String, pub ttl_attribute: Option<String>,
124 pub ttl_enabled: bool,
125 pub resource_policy: Option<String>,
126 pub pitr_enabled: bool,
128 pub kinesis_destinations: Vec<KinesisDestination>,
130 pub contributor_insights_status: String,
132 pub contributor_insights_counters: BTreeMap<String, u64>,
134 pub stream_enabled: bool,
136 pub stream_view_type: Option<String>, pub stream_arn: Option<String>,
138 #[serde(with = "stream_records_serde", default = "empty_stream_records")]
141 pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
142 pub sse_type: Option<String>,
144 pub sse_kms_key_arn: Option<String>,
146 pub deletion_protection_enabled: bool,
150 pub on_demand_throughput: Option<OnDemandThroughput>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct StreamRecord {
158 pub event_id: String,
159 pub event_name: String, pub event_version: String,
161 pub event_source: String,
162 pub aws_region: String,
163 pub dynamodb: DynamoDbStreamRecord,
164 pub event_source_arn: String,
165 pub timestamp: DateTime<Utc>,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct DynamoDbStreamRecord {
170 pub keys: HashMap<String, AttributeValue>,
171 pub new_image: Option<HashMap<String, AttributeValue>>,
172 pub old_image: Option<HashMap<String, AttributeValue>>,
173 pub sequence_number: String,
174 pub size_bytes: i64,
175 pub stream_view_type: String,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct KinesisDestination {
180 pub stream_arn: String,
181 pub destination_status: String,
182 pub approximate_creation_date_time_precision: String,
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct BackupDescription {
187 pub backup_arn: String,
188 pub backup_name: String,
189 pub table_name: String,
190 pub table_arn: String,
191 pub backup_status: String,
192 pub backup_type: String,
193 pub backup_creation_date: DateTime<Utc>,
194 pub key_schema: Vec<KeySchemaElement>,
195 pub attribute_definitions: Vec<AttributeDefinition>,
196 pub provisioned_throughput: ProvisionedThroughput,
197 pub billing_mode: String,
198 pub item_count: i64,
199 pub size_bytes: i64,
200 pub items: Vec<HashMap<String, AttributeValue>>,
202 #[serde(default)]
207 pub gsi: Vec<GlobalSecondaryIndex>,
208 #[serde(default)]
209 pub lsi: Vec<LocalSecondaryIndex>,
210 #[serde(default)]
211 pub tags: BTreeMap<String, String>,
212 #[serde(default)]
213 pub ttl_attribute: Option<String>,
214 #[serde(default)]
215 pub ttl_enabled: bool,
216 #[serde(default)]
217 pub sse_type: Option<String>,
218 #[serde(default)]
219 pub sse_kms_key_arn: Option<String>,
220 #[serde(default)]
221 pub stream_enabled: bool,
222 #[serde(default)]
223 pub stream_view_type: Option<String>,
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct GlobalTableDescription {
228 pub global_table_name: String,
229 pub global_table_arn: String,
230 pub global_table_status: String,
231 pub creation_date: DateTime<Utc>,
232 pub replication_group: Vec<ReplicaDescription>,
233 #[serde(default = "default_global_billing_mode")]
237 pub billing_mode: String,
238 #[serde(default)]
241 pub provisioned_write_capacity_units: Option<i64>,
242}
243
244fn default_global_billing_mode() -> String {
245 "PROVISIONED".to_string()
246}
247
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct ReplicaDescription {
250 pub region_name: String,
251 pub replica_status: String,
252 #[serde(default)]
256 pub read_capacity_auto_scaling: Option<serde_json::Value>,
257 #[serde(default)]
259 pub write_capacity_auto_scaling: Option<serde_json::Value>,
260 #[serde(default)]
263 pub read_capacity_units: Option<i64>,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct ExportDescription {
268 pub export_arn: String,
269 pub export_status: String,
270 pub table_arn: String,
271 pub s3_bucket: String,
272 pub s3_prefix: Option<String>,
273 pub export_format: String,
274 pub start_time: DateTime<Utc>,
275 pub end_time: DateTime<Utc>,
276 pub export_time: DateTime<Utc>,
277 pub item_count: i64,
278 pub billed_size_bytes: i64,
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct ImportDescription {
283 pub import_arn: String,
284 pub import_status: String,
285 pub table_arn: String,
286 pub table_name: String,
287 pub s3_bucket_source: String,
288 pub input_format: String,
289 pub start_time: DateTime<Utc>,
290 pub end_time: DateTime<Utc>,
291 pub processed_item_count: i64,
292 pub processed_size_bytes: i64,
293}
294
295impl DynamoTable {
296 pub fn hash_key_name(&self) -> &str {
298 self.key_schema
299 .iter()
300 .find(|k| k.key_type == "HASH")
301 .map(|k| k.attribute_name.as_str())
302 .unwrap_or("")
303 }
304
305 pub fn range_key_name(&self) -> Option<&str> {
307 self.key_schema
308 .iter()
309 .find(|k| k.key_type == "RANGE")
310 .map(|k| k.attribute_name.as_str())
311 }
312
313 pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
315 let hash_key = self.hash_key_name();
316 let range_key = self.range_key_name();
317
318 self.items.iter().position(|item| {
319 let hash_match = match (item.get(hash_key), key.get(hash_key)) {
320 (Some(a), Some(b)) => a == b,
321 _ => false,
322 };
323 if !hash_match {
324 return false;
325 }
326 match range_key {
327 Some(rk) => match (item.get(rk), key.get(rk)) {
328 (Some(a), Some(b)) => a == b,
329 (None, None) => true,
330 _ => false,
331 },
332 None => true,
333 }
334 })
335 }
336
337 fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
339 let mut size: i64 = 0;
340 for (k, v) in item {
341 size += k.len() as i64;
342 size += Self::estimate_value_size(v);
343 }
344 size
345 }
346
347 fn estimate_value_size(v: &Value) -> i64 {
348 match v {
349 Value::Object(obj) => {
350 if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
351 s.len() as i64
352 } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
353 n.len() as i64
354 } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
355 1
356 } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
357 3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
358 } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
359 3 + m
360 .iter()
361 .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
362 .sum::<i64>()
363 } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
364 ss.iter()
365 .filter_map(|v| v.as_str())
366 .map(|s| s.len() as i64)
367 .sum()
368 } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
369 ns.iter()
370 .filter_map(|v| v.as_str())
371 .map(|s| s.len() as i64)
372 .sum()
373 } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
374 (b.len() as i64 * 3) / 4
376 } else {
377 v.to_string().len() as i64
378 }
379 }
380 _ => v.to_string().len() as i64,
381 }
382 }
383
384 pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
387 if self.contributor_insights_status != "ENABLED" {
388 return;
389 }
390 let hash_key = self.hash_key_name().to_string();
391 if let Some(pk_value) = key.get(&hash_key) {
392 let key_str = pk_value.to_string();
393 *self
394 .contributor_insights_counters
395 .entry(key_str)
396 .or_insert(0) += 1;
397 }
398 }
399
400 pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
402 if self.contributor_insights_status != "ENABLED" {
403 return;
404 }
405 let hash_key = self.hash_key_name().to_string();
406 if let Some(pk_value) = item.get(&hash_key) {
407 let key_str = pk_value.to_string();
408 *self
409 .contributor_insights_counters
410 .entry(key_str)
411 .or_insert(0) += 1;
412 }
413 }
414
415 pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
417 let mut entries: Vec<(&str, u64)> = self
418 .contributor_insights_counters
419 .iter()
420 .map(|(k, &v)| (k.as_str(), v))
421 .collect();
422 entries.sort_by_key(|e| std::cmp::Reverse(e.1));
423 entries.truncate(n);
424 entries
425 }
426
427 pub fn recalculate_stats(&mut self) {
429 self.item_count = self.items.len() as i64;
430 self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
431 }
432}
433
434#[derive(Debug, Clone, Serialize, Deserialize)]
435pub struct DynamoDbState {
436 pub account_id: String,
437 pub region: String,
438 pub tables: BTreeMap<String, DynamoTable>,
439 pub backups: BTreeMap<String, BackupDescription>,
440 pub global_tables: BTreeMap<String, GlobalTableDescription>,
441 pub exports: BTreeMap<String, ExportDescription>,
442 pub imports: BTreeMap<String, ImportDescription>,
443}
444
445#[derive(Debug, Clone, Serialize, Deserialize)]
449pub struct DynamoDbSnapshot {
450 pub schema_version: u32,
451 #[serde(default)]
453 pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>,
454 #[serde(default)]
456 pub state: Option<DynamoDbState>,
457}
458
459pub const DYNAMODB_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
460
461impl DynamoDbState {
462 pub fn new(account_id: &str, region: &str) -> Self {
463 Self {
464 account_id: account_id.to_string(),
465 region: region.to_string(),
466 tables: BTreeMap::new(),
467 backups: BTreeMap::new(),
468 global_tables: BTreeMap::new(),
469 exports: BTreeMap::new(),
470 imports: BTreeMap::new(),
471 }
472 }
473
474 pub fn reset(&mut self) {
475 self.tables.clear();
476 self.backups.clear();
477 self.global_tables.clear();
478 self.exports.clear();
479 self.imports.clear();
480 }
481}
482
483impl fakecloud_core::multi_account::AccountState for DynamoDbState {
484 fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
485 Self::new(account_id, region)
486 }
487}
488
489pub type SharedDynamoDbState =
490 Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>>;
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use serde_json::json;
496
497 #[test]
498 fn attribute_type_and_value_valid() {
499 let v = json!({"S": "hi"});
500 let (ty, val) = attribute_type_and_value(&v).unwrap();
501 assert_eq!(ty, "S");
502 assert_eq!(val, &json!("hi"));
503 }
504
505 #[test]
506 fn attribute_type_and_value_empty_returns_none() {
507 let v = json!({});
508 assert!(attribute_type_and_value(&v).is_none());
509 }
510
511 #[test]
512 fn attribute_type_and_value_multiple_entries_returns_none() {
513 let v = json!({"S": "hi", "N": "1"});
514 assert!(attribute_type_and_value(&v).is_none());
515 }
516
517 #[test]
518 fn attribute_type_and_value_non_object_returns_none() {
519 let v = json!("not-object");
520 assert!(attribute_type_and_value(&v).is_none());
521 }
522
523 #[test]
524 fn account_state_trait_impl() {
525 use fakecloud_core::multi_account::AccountState;
526 let state = DynamoDbState::new_for_account("123", "us-east-1", "");
527 assert_eq!(state.account_id, "123");
528 assert_eq!(state.region, "us-east-1");
529 }
530
531 #[test]
532 fn new_and_reset() {
533 let state = DynamoDbState::new("123", "us-east-1");
534 assert!(state.tables.is_empty());
535 }
536
537 fn table_with_hash_key(hash: &str) -> DynamoTable {
538 DynamoTable {
539 name: "t".to_string(),
540 arn: "arn:aws:dynamodb:us-east-1:123:table/t".to_string(),
541 table_id: "id".to_string(),
542 key_schema: vec![KeySchemaElement {
543 attribute_name: hash.to_string(),
544 key_type: "HASH".to_string(),
545 }],
546 attribute_definitions: vec![],
547 provisioned_throughput: ProvisionedThroughput {
548 read_capacity_units: 1,
549 write_capacity_units: 1,
550 },
551 items: Vec::new(),
552 gsi: Vec::new(),
553 lsi: Vec::new(),
554 tags: BTreeMap::new(),
555 created_at: Utc::now(),
556 status: "ACTIVE".to_string(),
557 item_count: 0,
558 size_bytes: 0,
559 billing_mode: "PROVISIONED".to_string(),
560 ttl_attribute: None,
561 ttl_enabled: false,
562 resource_policy: None,
563 pitr_enabled: false,
564 kinesis_destinations: Vec::new(),
565 contributor_insights_status: "DISABLED".to_string(),
566 contributor_insights_counters: BTreeMap::new(),
567 stream_enabled: false,
568 stream_view_type: None,
569 stream_arn: None,
570 stream_records: empty_stream_records(),
571 sse_type: None,
572 sse_kms_key_arn: None,
573 deletion_protection_enabled: false,
574 on_demand_throughput: None,
575 }
576 }
577
578 #[test]
579 fn hash_key_name_extracts_from_schema() {
580 let t = table_with_hash_key("pk");
581 assert_eq!(t.hash_key_name(), "pk");
582 }
583
584 #[test]
585 fn hash_key_name_empty_when_no_hash_schema() {
586 let mut t = table_with_hash_key("pk");
587 t.key_schema.clear();
588 assert_eq!(t.hash_key_name(), "");
589 }
590
591 #[test]
592 fn record_key_access_noop_when_disabled() {
593 let mut t = table_with_hash_key("pk");
594 let mut key = HashMap::new();
595 key.insert("pk".to_string(), json!({"S": "a"}));
596 t.record_key_access(&key);
597 assert!(t.contributor_insights_counters.is_empty());
598 }
599
600 #[test]
601 fn record_key_access_increments_when_enabled() {
602 let mut t = table_with_hash_key("pk");
603 t.contributor_insights_status = "ENABLED".to_string();
604 let mut key = HashMap::new();
605 key.insert("pk".to_string(), json!({"S": "a"}));
606 t.record_key_access(&key);
607 t.record_key_access(&key);
608 assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 2);
609 }
610
611 #[test]
612 fn record_item_access_uses_hash_key_from_item() {
613 let mut t = table_with_hash_key("pk");
614 t.contributor_insights_status = "ENABLED".to_string();
615 let mut item = HashMap::new();
616 item.insert("pk".to_string(), json!({"S": "user-1"}));
617 item.insert("other".to_string(), json!({"N": "42"}));
618 t.record_item_access(&item);
619 assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 1);
620 }
621
622 #[test]
623 fn top_contributors_returns_sorted() {
624 let mut t = table_with_hash_key("pk");
625 t.contributor_insights_counters.insert("a".to_string(), 3);
626 t.contributor_insights_counters.insert("b".to_string(), 10);
627 t.contributor_insights_counters.insert("c".to_string(), 1);
628 let top = t.top_contributors(2);
629 assert_eq!(top.len(), 2);
630 assert_eq!(top[0], ("b", 10));
631 assert_eq!(top[1], ("a", 3));
632 }
633
634 #[test]
635 fn recalculate_stats_matches_items() {
636 let mut t = table_with_hash_key("pk");
637 let mut item1 = HashMap::new();
638 item1.insert("pk".to_string(), json!({"S": "hello"}));
639 let mut item2 = HashMap::new();
640 item2.insert("pk".to_string(), json!({"N": "42"}));
641 item2.insert("flag".to_string(), json!({"BOOL": true}));
642 t.items.push(item1);
643 t.items.push(item2);
644 t.recalculate_stats();
645 assert_eq!(t.item_count, 2);
646 assert!(t.size_bytes > 0);
647 }
648
649 #[test]
650 fn estimate_value_size_covers_all_types() {
651 let s = DynamoTable::estimate_value_size(&json!({"S": "abc"}));
652 assert_eq!(s, 3);
653 let n = DynamoTable::estimate_value_size(&json!({"N": "42"}));
654 assert_eq!(n, 2);
655 let b = DynamoTable::estimate_value_size(&json!({"BOOL": true}));
656 assert_eq!(b, 1);
657 let null = DynamoTable::estimate_value_size(&json!({"NULL": true}));
658 assert_eq!(null, 1);
659 let l = DynamoTable::estimate_value_size(&json!({"L": [{"S": "x"}, {"S": "yy"}]}));
660 assert_eq!(l, 6);
661 let m = DynamoTable::estimate_value_size(&json!({"M": {"key": {"S": "v"}}}));
662 assert_eq!(m, 7);
663 let ss = DynamoTable::estimate_value_size(&json!({"SS": ["ab", "cde"]}));
664 assert_eq!(ss, 5);
665 let ns = DynamoTable::estimate_value_size(&json!({"NS": ["12", "345"]}));
666 assert_eq!(ns, 5);
667 let bin = DynamoTable::estimate_value_size(&json!({"B": "AAAAAAAA"}));
668 assert_eq!(bin, 6);
669 }
670}