1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::{BTreeMap, HashMap};
6use std::sync::Arc;
7
8fn empty_stream_records() -> Arc<RwLock<Vec<StreamRecord>>> {
9 Arc::new(RwLock::new(Vec::new()))
10}
11
12mod stream_records_serde {
17 use super::{Arc, RwLock, StreamRecord};
18 use serde::{Deserialize, Deserializer, Serialize, Serializer};
19
20 pub fn serialize<S: Serializer>(
21 v: &Arc<RwLock<Vec<StreamRecord>>>,
22 s: S,
23 ) -> Result<S::Ok, S::Error> {
24 v.read().serialize(s)
25 }
26
27 pub fn deserialize<'de, D: Deserializer<'de>>(
28 d: D,
29 ) -> Result<Arc<RwLock<Vec<StreamRecord>>>, D::Error> {
30 let records = Vec::<StreamRecord>::deserialize(d)?;
31 for r in &records {
35 crate::streams::observe_stream_sequence(&r.dynamodb.sequence_number);
36 }
37 Ok(Arc::new(RwLock::new(records)))
38 }
39}
40
41pub type AttributeValue = Value;
44
45pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
48 let obj = av.as_object()?;
49 if obj.len() != 1 {
50 return None;
51 }
52 let (k, v) = obj.iter().next()?;
53 Some((k.as_str(), v))
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct KeySchemaElement {
58 pub attribute_name: String,
59 pub key_type: String, }
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct AttributeDefinition {
64 pub attribute_name: String,
65 pub attribute_type: String, }
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct ProvisionedThroughput {
70 pub read_capacity_units: i64,
71 pub write_capacity_units: i64,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct OnDemandThroughput {
80 pub max_read_request_units: i64,
81 pub max_write_request_units: i64,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct GlobalSecondaryIndex {
86 pub index_name: String,
87 pub key_schema: Vec<KeySchemaElement>,
88 pub projection: Projection,
89 pub provisioned_throughput: Option<ProvisionedThroughput>,
90 pub on_demand_throughput: Option<OnDemandThroughput>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct LocalSecondaryIndex {
95 pub index_name: String,
96 pub key_schema: Vec<KeySchemaElement>,
97 pub projection: Projection,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct Projection {
102 pub projection_type: String, pub non_key_attributes: Vec<String>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct DynamoTable {
108 pub name: String,
109 pub arn: String,
110 pub table_id: String,
111 pub key_schema: Vec<KeySchemaElement>,
112 pub attribute_definitions: Vec<AttributeDefinition>,
113 pub provisioned_throughput: ProvisionedThroughput,
114 pub items: Vec<HashMap<String, AttributeValue>>,
115 pub gsi: Vec<GlobalSecondaryIndex>,
116 pub lsi: Vec<LocalSecondaryIndex>,
117 pub tags: BTreeMap<String, String>,
118 pub created_at: DateTime<Utc>,
119 pub status: String,
120 pub item_count: i64,
121 pub size_bytes: i64,
122 pub billing_mode: String, pub ttl_attribute: Option<String>,
124 pub ttl_enabled: bool,
125 pub resource_policy: Option<String>,
126 pub pitr_enabled: bool,
128 pub kinesis_destinations: Vec<KinesisDestination>,
130 pub contributor_insights_status: String,
132 pub contributor_insights_counters: BTreeMap<String, u64>,
134 pub stream_enabled: bool,
136 pub stream_view_type: Option<String>, pub stream_arn: Option<String>,
138 #[serde(with = "stream_records_serde", default = "empty_stream_records")]
141 pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
142 pub sse_type: Option<String>,
144 pub sse_kms_key_arn: Option<String>,
146 pub deletion_protection_enabled: bool,
150 pub on_demand_throughput: Option<OnDemandThroughput>,
154 #[serde(default = "default_table_class")]
158 pub table_class: String,
159}
160
161pub(crate) fn default_table_class() -> String {
162 "STANDARD".to_string()
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct StreamRecord {
167 pub event_id: String,
168 pub event_name: String, pub event_version: String,
170 pub event_source: String,
171 pub aws_region: String,
172 pub dynamodb: DynamoDbStreamRecord,
173 pub event_source_arn: String,
174 pub timestamp: DateTime<Utc>,
175 #[serde(default)]
180 pub user_identity: Option<StreamUserIdentity>,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct StreamUserIdentity {
187 pub principal_id: String,
188 pub identity_type: String,
189}
190
191impl StreamUserIdentity {
192 pub fn ttl() -> Self {
194 Self {
195 principal_id: "dynamodb.amazonaws.com".to_string(),
196 identity_type: "Service".to_string(),
197 }
198 }
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct DynamoDbStreamRecord {
203 pub keys: HashMap<String, AttributeValue>,
204 pub new_image: Option<HashMap<String, AttributeValue>>,
205 pub old_image: Option<HashMap<String, AttributeValue>>,
206 pub sequence_number: String,
207 pub size_bytes: i64,
208 pub stream_view_type: String,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct KinesisDestination {
213 pub stream_arn: String,
214 pub destination_status: String,
215 pub approximate_creation_date_time_precision: String,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct BackupDescription {
220 pub backup_arn: String,
221 pub backup_name: String,
222 pub table_name: String,
223 pub table_arn: String,
224 pub backup_status: String,
225 pub backup_type: String,
226 pub backup_creation_date: DateTime<Utc>,
227 pub key_schema: Vec<KeySchemaElement>,
228 pub attribute_definitions: Vec<AttributeDefinition>,
229 pub provisioned_throughput: ProvisionedThroughput,
230 pub billing_mode: String,
231 pub item_count: i64,
232 pub size_bytes: i64,
233 pub items: Vec<HashMap<String, AttributeValue>>,
235 #[serde(default)]
240 pub gsi: Vec<GlobalSecondaryIndex>,
241 #[serde(default)]
242 pub lsi: Vec<LocalSecondaryIndex>,
243 #[serde(default)]
244 pub tags: BTreeMap<String, String>,
245 #[serde(default)]
246 pub ttl_attribute: Option<String>,
247 #[serde(default)]
248 pub ttl_enabled: bool,
249 #[serde(default)]
250 pub sse_type: Option<String>,
251 #[serde(default)]
252 pub sse_kms_key_arn: Option<String>,
253 #[serde(default)]
254 pub stream_enabled: bool,
255 #[serde(default)]
256 pub stream_view_type: Option<String>,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct GlobalTableDescription {
261 pub global_table_name: String,
262 pub global_table_arn: String,
263 pub global_table_status: String,
264 pub creation_date: DateTime<Utc>,
265 pub replication_group: Vec<ReplicaDescription>,
266 #[serde(default = "default_global_billing_mode")]
270 pub billing_mode: String,
271 #[serde(default)]
274 pub provisioned_write_capacity_units: Option<i64>,
275}
276
277fn default_global_billing_mode() -> String {
278 "PROVISIONED".to_string()
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct ReplicaDescription {
283 pub region_name: String,
284 pub replica_status: String,
285 #[serde(default)]
289 pub read_capacity_auto_scaling: Option<serde_json::Value>,
290 #[serde(default)]
292 pub write_capacity_auto_scaling: Option<serde_json::Value>,
293 #[serde(default)]
296 pub read_capacity_units: Option<i64>,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct ExportDescription {
301 pub export_arn: String,
302 pub export_status: String,
303 pub table_arn: String,
304 pub s3_bucket: String,
305 pub s3_prefix: Option<String>,
306 pub export_format: String,
307 pub start_time: DateTime<Utc>,
308 pub end_time: DateTime<Utc>,
309 pub export_time: DateTime<Utc>,
310 pub item_count: i64,
311 pub billed_size_bytes: i64,
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct ImportDescription {
316 pub import_arn: String,
317 pub import_status: String,
318 pub table_arn: String,
319 pub table_name: String,
320 pub s3_bucket_source: String,
321 pub input_format: String,
322 pub start_time: DateTime<Utc>,
323 pub end_time: DateTime<Utc>,
324 pub processed_item_count: i64,
325 pub processed_size_bytes: i64,
326}
327
328impl DynamoTable {
329 pub fn hash_key_name(&self) -> &str {
331 self.key_schema
332 .iter()
333 .find(|k| k.key_type == "HASH")
334 .map(|k| k.attribute_name.as_str())
335 .unwrap_or("")
336 }
337
338 pub fn range_key_name(&self) -> Option<&str> {
340 self.key_schema
341 .iter()
342 .find(|k| k.key_type == "RANGE")
343 .map(|k| k.attribute_name.as_str())
344 }
345
346 pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
348 let hash_key = self.hash_key_name();
349 let range_key = self.range_key_name();
350
351 use crate::service::helpers::partiql::values_equal;
358 self.items.iter().position(|item| {
359 let hash_match =
360 values_equal(item.get(hash_key), key.get(hash_key)) && item.get(hash_key).is_some();
361 if !hash_match {
362 return false;
363 }
364 match range_key {
365 Some(rk) => values_equal(item.get(rk), key.get(rk)),
366 None => true,
367 }
368 })
369 }
370
371 fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
373 let mut size: i64 = 0;
374 for (k, v) in item {
375 size += k.len() as i64;
376 size += Self::estimate_value_size(v);
377 }
378 size
379 }
380
381 fn estimate_value_size(v: &Value) -> i64 {
382 match v {
383 Value::Object(obj) => {
384 if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
385 s.len() as i64
386 } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
387 n.len() as i64
388 } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
389 1
390 } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
391 3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
392 } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
393 3 + m
394 .iter()
395 .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
396 .sum::<i64>()
397 } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
398 ss.iter()
399 .filter_map(|v| v.as_str())
400 .map(|s| s.len() as i64)
401 .sum()
402 } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
403 ns.iter()
404 .filter_map(|v| v.as_str())
405 .map(|s| s.len() as i64)
406 .sum()
407 } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
408 (b.len() as i64 * 3) / 4
410 } else {
411 v.to_string().len() as i64
412 }
413 }
414 _ => v.to_string().len() as i64,
415 }
416 }
417
418 pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
421 if self.contributor_insights_status != "ENABLED" {
422 return;
423 }
424 let hash_key = self.hash_key_name().to_string();
425 if let Some(pk_value) = key.get(&hash_key) {
426 let key_str = pk_value.to_string();
427 *self
428 .contributor_insights_counters
429 .entry(key_str)
430 .or_insert(0) += 1;
431 }
432 }
433
434 pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
436 if self.contributor_insights_status != "ENABLED" {
437 return;
438 }
439 let hash_key = self.hash_key_name().to_string();
440 if let Some(pk_value) = item.get(&hash_key) {
441 let key_str = pk_value.to_string();
442 *self
443 .contributor_insights_counters
444 .entry(key_str)
445 .or_insert(0) += 1;
446 }
447 }
448
449 pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
451 let mut entries: Vec<(&str, u64)> = self
452 .contributor_insights_counters
453 .iter()
454 .map(|(k, &v)| (k.as_str(), v))
455 .collect();
456 entries.sort_by_key(|e| std::cmp::Reverse(e.1));
457 entries.truncate(n);
458 entries
459 }
460
461 pub fn recalculate_stats(&mut self) {
463 self.item_count = self.items.len() as i64;
464 self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
465 }
466}
467
468#[derive(Debug, Clone, Serialize, Deserialize)]
469pub struct DynamoDbState {
470 pub account_id: String,
471 pub region: String,
472 pub tables: BTreeMap<String, DynamoTable>,
473 pub backups: BTreeMap<String, BackupDescription>,
474 pub global_tables: BTreeMap<String, GlobalTableDescription>,
475 pub exports: BTreeMap<String, ExportDescription>,
476 pub imports: BTreeMap<String, ImportDescription>,
477 #[serde(default)]
485 pub lambda_stream_checkpoints: BTreeMap<String, String>,
486}
487
488#[derive(Debug, Clone, Serialize, Deserialize)]
492pub struct DynamoDbSnapshot {
493 pub schema_version: u32,
494 #[serde(default)]
496 pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>,
497 #[serde(default)]
499 pub state: Option<DynamoDbState>,
500}
501
502pub const DYNAMODB_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
503
504impl DynamoDbState {
505 pub fn new(account_id: &str, region: &str) -> Self {
506 Self {
507 account_id: account_id.to_string(),
508 region: region.to_string(),
509 tables: BTreeMap::new(),
510 backups: BTreeMap::new(),
511 global_tables: BTreeMap::new(),
512 exports: BTreeMap::new(),
513 imports: BTreeMap::new(),
514 lambda_stream_checkpoints: BTreeMap::new(),
515 }
516 }
517
518 pub fn reset(&mut self) {
519 self.tables.clear();
520 self.backups.clear();
521 self.global_tables.clear();
522 self.exports.clear();
523 self.imports.clear();
524 self.lambda_stream_checkpoints.clear();
525 }
526
527 pub fn lambda_stream_checkpoint(&self, mapping_uuid: &str) -> Option<String> {
531 self.lambda_stream_checkpoints.get(mapping_uuid).cloned()
532 }
533
534 pub fn set_lambda_stream_checkpoint(&mut self, mapping_uuid: &str, sequence_number: String) {
538 self.lambda_stream_checkpoints
539 .insert(mapping_uuid.to_string(), sequence_number);
540 }
541}
542
543impl fakecloud_core::multi_account::AccountState for DynamoDbState {
544 fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
545 Self::new(account_id, region)
546 }
547}
548
549pub type SharedDynamoDbState =
550 Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>>;
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555 use serde_json::json;
556
557 #[test]
558 fn attribute_type_and_value_valid() {
559 let v = json!({"S": "hi"});
560 let (ty, val) = attribute_type_and_value(&v).unwrap();
561 assert_eq!(ty, "S");
562 assert_eq!(val, &json!("hi"));
563 }
564
565 #[test]
566 fn attribute_type_and_value_empty_returns_none() {
567 let v = json!({});
568 assert!(attribute_type_and_value(&v).is_none());
569 }
570
571 #[test]
572 fn attribute_type_and_value_multiple_entries_returns_none() {
573 let v = json!({"S": "hi", "N": "1"});
574 assert!(attribute_type_and_value(&v).is_none());
575 }
576
577 #[test]
578 fn attribute_type_and_value_non_object_returns_none() {
579 let v = json!("not-object");
580 assert!(attribute_type_and_value(&v).is_none());
581 }
582
583 #[test]
584 fn account_state_trait_impl() {
585 use fakecloud_core::multi_account::AccountState;
586 let state = DynamoDbState::new_for_account("123", "us-east-1", "");
587 assert_eq!(state.account_id, "123");
588 assert_eq!(state.region, "us-east-1");
589 }
590
591 #[test]
592 fn new_and_reset() {
593 let state = DynamoDbState::new("123", "us-east-1");
594 assert!(state.tables.is_empty());
595 }
596
597 fn table_with_hash_key(hash: &str) -> DynamoTable {
598 DynamoTable {
599 name: "t".to_string(),
600 arn: "arn:aws:dynamodb:us-east-1:123:table/t".to_string(),
601 table_id: "id".to_string(),
602 key_schema: vec![KeySchemaElement {
603 attribute_name: hash.to_string(),
604 key_type: "HASH".to_string(),
605 }],
606 attribute_definitions: vec![],
607 provisioned_throughput: ProvisionedThroughput {
608 read_capacity_units: 1,
609 write_capacity_units: 1,
610 },
611 items: Vec::new(),
612 gsi: Vec::new(),
613 lsi: Vec::new(),
614 tags: BTreeMap::new(),
615 created_at: Utc::now(),
616 status: "ACTIVE".to_string(),
617 item_count: 0,
618 size_bytes: 0,
619 billing_mode: "PROVISIONED".to_string(),
620 ttl_attribute: None,
621 ttl_enabled: false,
622 resource_policy: None,
623 pitr_enabled: false,
624 kinesis_destinations: Vec::new(),
625 contributor_insights_status: "DISABLED".to_string(),
626 contributor_insights_counters: BTreeMap::new(),
627 stream_enabled: false,
628 stream_view_type: None,
629 stream_arn: None,
630 stream_records: empty_stream_records(),
631 sse_type: None,
632 sse_kms_key_arn: None,
633 deletion_protection_enabled: false,
634 on_demand_throughput: None,
635 table_class: default_table_class(),
636 }
637 }
638
639 #[test]
640 fn hash_key_name_extracts_from_schema() {
641 let t = table_with_hash_key("pk");
642 assert_eq!(t.hash_key_name(), "pk");
643 }
644
645 #[test]
646 fn hash_key_name_empty_when_no_hash_schema() {
647 let mut t = table_with_hash_key("pk");
648 t.key_schema.clear();
649 assert_eq!(t.hash_key_name(), "");
650 }
651
652 #[test]
653 fn record_key_access_noop_when_disabled() {
654 let mut t = table_with_hash_key("pk");
655 let mut key = HashMap::new();
656 key.insert("pk".to_string(), json!({"S": "a"}));
657 t.record_key_access(&key);
658 assert!(t.contributor_insights_counters.is_empty());
659 }
660
661 #[test]
662 fn record_key_access_increments_when_enabled() {
663 let mut t = table_with_hash_key("pk");
664 t.contributor_insights_status = "ENABLED".to_string();
665 let mut key = HashMap::new();
666 key.insert("pk".to_string(), json!({"S": "a"}));
667 t.record_key_access(&key);
668 t.record_key_access(&key);
669 assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 2);
670 }
671
672 #[test]
673 fn record_item_access_uses_hash_key_from_item() {
674 let mut t = table_with_hash_key("pk");
675 t.contributor_insights_status = "ENABLED".to_string();
676 let mut item = HashMap::new();
677 item.insert("pk".to_string(), json!({"S": "user-1"}));
678 item.insert("other".to_string(), json!({"N": "42"}));
679 t.record_item_access(&item);
680 assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 1);
681 }
682
683 #[test]
684 fn find_item_index_canonicalizes_number_keys() {
685 let mut t = table_with_hash_key("pk");
688 let mut item = HashMap::new();
689 item.insert("pk".to_string(), json!({"N": "1.0"}));
690 t.items.push(item);
691
692 let mut lookup = HashMap::new();
693 lookup.insert("pk".to_string(), json!({"N": "1"}));
694 assert_eq!(t.find_item_index(&lookup), Some(0));
695
696 let mut other = HashMap::new();
698 other.insert("pk".to_string(), json!({"N": "2"}));
699 assert_eq!(t.find_item_index(&other), None);
700 }
701
702 #[test]
703 fn find_item_index_malformed_number_key_does_not_match_valid() {
704 let mut t = table_with_hash_key("pk");
708 let mut item = HashMap::new();
709 item.insert("pk".to_string(), json!({"N": "5"}));
710 t.items.push(item);
711
712 let mut bad = HashMap::new();
713 bad.insert("pk".to_string(), json!({"N": "abc"}));
714 assert_eq!(t.find_item_index(&bad), None);
715 }
716
717 #[test]
718 fn top_contributors_returns_sorted() {
719 let mut t = table_with_hash_key("pk");
720 t.contributor_insights_counters.insert("a".to_string(), 3);
721 t.contributor_insights_counters.insert("b".to_string(), 10);
722 t.contributor_insights_counters.insert("c".to_string(), 1);
723 let top = t.top_contributors(2);
724 assert_eq!(top.len(), 2);
725 assert_eq!(top[0], ("b", 10));
726 assert_eq!(top[1], ("a", 3));
727 }
728
729 #[test]
730 fn recalculate_stats_matches_items() {
731 let mut t = table_with_hash_key("pk");
732 let mut item1 = HashMap::new();
733 item1.insert("pk".to_string(), json!({"S": "hello"}));
734 let mut item2 = HashMap::new();
735 item2.insert("pk".to_string(), json!({"N": "42"}));
736 item2.insert("flag".to_string(), json!({"BOOL": true}));
737 t.items.push(item1);
738 t.items.push(item2);
739 t.recalculate_stats();
740 assert_eq!(t.item_count, 2);
741 assert!(t.size_bytes > 0);
742 }
743
744 #[test]
745 fn estimate_value_size_covers_all_types() {
746 let s = DynamoTable::estimate_value_size(&json!({"S": "abc"}));
747 assert_eq!(s, 3);
748 let n = DynamoTable::estimate_value_size(&json!({"N": "42"}));
749 assert_eq!(n, 2);
750 let b = DynamoTable::estimate_value_size(&json!({"BOOL": true}));
751 assert_eq!(b, 1);
752 let null = DynamoTable::estimate_value_size(&json!({"NULL": true}));
753 assert_eq!(null, 1);
754 let l = DynamoTable::estimate_value_size(&json!({"L": [{"S": "x"}, {"S": "yy"}]}));
755 assert_eq!(l, 6);
756 let m = DynamoTable::estimate_value_size(&json!({"M": {"key": {"S": "v"}}}));
757 assert_eq!(m, 7);
758 let ss = DynamoTable::estimate_value_size(&json!({"SS": ["ab", "cde"]}));
759 assert_eq!(ss, 5);
760 let ns = DynamoTable::estimate_value_size(&json!({"NS": ["12", "345"]}));
761 assert_eq!(ns, 5);
762 let bin = DynamoTable::estimate_value_size(&json!({"B": "AAAAAAAA"}));
763 assert_eq!(bin, 6);
764 }
765}