1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::HashMap;
6use std::sync::Arc;
7
8fn empty_stream_records() -> Arc<RwLock<Vec<StreamRecord>>> {
9 Arc::new(RwLock::new(Vec::new()))
10}
11
12pub type AttributeValue = Value;
15
16pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
19 let obj = av.as_object()?;
20 if obj.len() != 1 {
21 return None;
22 }
23 let (k, v) = obj.iter().next()?;
24 Some((k.as_str(), v))
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct KeySchemaElement {
29 pub attribute_name: String,
30 pub key_type: String, }
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct AttributeDefinition {
35 pub attribute_name: String,
36 pub attribute_type: String, }
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ProvisionedThroughput {
41 pub read_capacity_units: i64,
42 pub write_capacity_units: i64,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct OnDemandThroughput {
51 pub max_read_request_units: i64,
52 pub max_write_request_units: i64,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct GlobalSecondaryIndex {
57 pub index_name: String,
58 pub key_schema: Vec<KeySchemaElement>,
59 pub projection: Projection,
60 pub provisioned_throughput: Option<ProvisionedThroughput>,
61 pub on_demand_throughput: Option<OnDemandThroughput>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct LocalSecondaryIndex {
66 pub index_name: String,
67 pub key_schema: Vec<KeySchemaElement>,
68 pub projection: Projection,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct Projection {
73 pub projection_type: String, pub non_key_attributes: Vec<String>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct DynamoTable {
79 pub name: String,
80 pub arn: String,
81 pub table_id: String,
82 pub key_schema: Vec<KeySchemaElement>,
83 pub attribute_definitions: Vec<AttributeDefinition>,
84 pub provisioned_throughput: ProvisionedThroughput,
85 pub items: Vec<HashMap<String, AttributeValue>>,
86 pub gsi: Vec<GlobalSecondaryIndex>,
87 pub lsi: Vec<LocalSecondaryIndex>,
88 pub tags: HashMap<String, String>,
89 pub created_at: DateTime<Utc>,
90 pub status: String,
91 pub item_count: i64,
92 pub size_bytes: i64,
93 pub billing_mode: String, pub ttl_attribute: Option<String>,
95 pub ttl_enabled: bool,
96 pub resource_policy: Option<String>,
97 pub pitr_enabled: bool,
99 pub kinesis_destinations: Vec<KinesisDestination>,
101 pub contributor_insights_status: String,
103 pub contributor_insights_counters: HashMap<String, u64>,
105 pub stream_enabled: bool,
107 pub stream_view_type: Option<String>, pub stream_arn: Option<String>,
109 #[serde(skip, default = "empty_stream_records")]
112 pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
113 pub sse_type: Option<String>,
115 pub sse_kms_key_arn: Option<String>,
117 pub deletion_protection_enabled: bool,
121 pub on_demand_throughput: Option<OnDemandThroughput>,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct StreamRecord {
129 pub event_id: String,
130 pub event_name: String, pub event_version: String,
132 pub event_source: String,
133 pub aws_region: String,
134 pub dynamodb: DynamoDbStreamRecord,
135 pub event_source_arn: String,
136 pub timestamp: DateTime<Utc>,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct DynamoDbStreamRecord {
141 pub keys: HashMap<String, AttributeValue>,
142 pub new_image: Option<HashMap<String, AttributeValue>>,
143 pub old_image: Option<HashMap<String, AttributeValue>>,
144 pub sequence_number: String,
145 pub size_bytes: i64,
146 pub stream_view_type: String,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct KinesisDestination {
151 pub stream_arn: String,
152 pub destination_status: String,
153 pub approximate_creation_date_time_precision: String,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct BackupDescription {
158 pub backup_arn: String,
159 pub backup_name: String,
160 pub table_name: String,
161 pub table_arn: String,
162 pub backup_status: String,
163 pub backup_type: String,
164 pub backup_creation_date: DateTime<Utc>,
165 pub key_schema: Vec<KeySchemaElement>,
166 pub attribute_definitions: Vec<AttributeDefinition>,
167 pub provisioned_throughput: ProvisionedThroughput,
168 pub billing_mode: String,
169 pub item_count: i64,
170 pub size_bytes: i64,
171 pub items: Vec<HashMap<String, AttributeValue>>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GlobalTableDescription {
177 pub global_table_name: String,
178 pub global_table_arn: String,
179 pub global_table_status: String,
180 pub creation_date: DateTime<Utc>,
181 pub replication_group: Vec<ReplicaDescription>,
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct ReplicaDescription {
186 pub region_name: String,
187 pub replica_status: String,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct ExportDescription {
192 pub export_arn: String,
193 pub export_status: String,
194 pub table_arn: String,
195 pub s3_bucket: String,
196 pub s3_prefix: Option<String>,
197 pub export_format: String,
198 pub start_time: DateTime<Utc>,
199 pub end_time: DateTime<Utc>,
200 pub export_time: DateTime<Utc>,
201 pub item_count: i64,
202 pub billed_size_bytes: i64,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct ImportDescription {
207 pub import_arn: String,
208 pub import_status: String,
209 pub table_arn: String,
210 pub table_name: String,
211 pub s3_bucket_source: String,
212 pub input_format: String,
213 pub start_time: DateTime<Utc>,
214 pub end_time: DateTime<Utc>,
215 pub processed_item_count: i64,
216 pub processed_size_bytes: i64,
217}
218
219impl DynamoTable {
220 pub fn hash_key_name(&self) -> &str {
222 self.key_schema
223 .iter()
224 .find(|k| k.key_type == "HASH")
225 .map(|k| k.attribute_name.as_str())
226 .unwrap_or("")
227 }
228
229 pub fn range_key_name(&self) -> Option<&str> {
231 self.key_schema
232 .iter()
233 .find(|k| k.key_type == "RANGE")
234 .map(|k| k.attribute_name.as_str())
235 }
236
237 pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
239 let hash_key = self.hash_key_name();
240 let range_key = self.range_key_name();
241
242 self.items.iter().position(|item| {
243 let hash_match = match (item.get(hash_key), key.get(hash_key)) {
244 (Some(a), Some(b)) => a == b,
245 _ => false,
246 };
247 if !hash_match {
248 return false;
249 }
250 match range_key {
251 Some(rk) => match (item.get(rk), key.get(rk)) {
252 (Some(a), Some(b)) => a == b,
253 (None, None) => true,
254 _ => false,
255 },
256 None => true,
257 }
258 })
259 }
260
261 fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
263 let mut size: i64 = 0;
264 for (k, v) in item {
265 size += k.len() as i64;
266 size += Self::estimate_value_size(v);
267 }
268 size
269 }
270
271 fn estimate_value_size(v: &Value) -> i64 {
272 match v {
273 Value::Object(obj) => {
274 if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
275 s.len() as i64
276 } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
277 n.len() as i64
278 } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
279 1
280 } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
281 3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
282 } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
283 3 + m
284 .iter()
285 .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
286 .sum::<i64>()
287 } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
288 ss.iter()
289 .filter_map(|v| v.as_str())
290 .map(|s| s.len() as i64)
291 .sum()
292 } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
293 ns.iter()
294 .filter_map(|v| v.as_str())
295 .map(|s| s.len() as i64)
296 .sum()
297 } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
298 (b.len() as i64 * 3) / 4
300 } else {
301 v.to_string().len() as i64
302 }
303 }
304 _ => v.to_string().len() as i64,
305 }
306 }
307
308 pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
311 if self.contributor_insights_status != "ENABLED" {
312 return;
313 }
314 let hash_key = self.hash_key_name().to_string();
315 if let Some(pk_value) = key.get(&hash_key) {
316 let key_str = pk_value.to_string();
317 *self
318 .contributor_insights_counters
319 .entry(key_str)
320 .or_insert(0) += 1;
321 }
322 }
323
324 pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
326 if self.contributor_insights_status != "ENABLED" {
327 return;
328 }
329 let hash_key = self.hash_key_name().to_string();
330 if let Some(pk_value) = item.get(&hash_key) {
331 let key_str = pk_value.to_string();
332 *self
333 .contributor_insights_counters
334 .entry(key_str)
335 .or_insert(0) += 1;
336 }
337 }
338
339 pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
341 let mut entries: Vec<(&str, u64)> = self
342 .contributor_insights_counters
343 .iter()
344 .map(|(k, &v)| (k.as_str(), v))
345 .collect();
346 entries.sort_by_key(|e| std::cmp::Reverse(e.1));
347 entries.truncate(n);
348 entries
349 }
350
351 pub fn recalculate_stats(&mut self) {
353 self.item_count = self.items.len() as i64;
354 self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
355 }
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct DynamoDbState {
360 pub account_id: String,
361 pub region: String,
362 pub tables: HashMap<String, DynamoTable>,
363 pub backups: HashMap<String, BackupDescription>,
364 pub global_tables: HashMap<String, GlobalTableDescription>,
365 pub exports: HashMap<String, ExportDescription>,
366 pub imports: HashMap<String, ImportDescription>,
367}
368
369#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct DynamoDbSnapshot {
374 pub schema_version: u32,
375 #[serde(default)]
377 pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>,
378 #[serde(default)]
380 pub state: Option<DynamoDbState>,
381}
382
383pub const DYNAMODB_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
384
385impl DynamoDbState {
386 pub fn new(account_id: &str, region: &str) -> Self {
387 Self {
388 account_id: account_id.to_string(),
389 region: region.to_string(),
390 tables: HashMap::new(),
391 backups: HashMap::new(),
392 global_tables: HashMap::new(),
393 exports: HashMap::new(),
394 imports: HashMap::new(),
395 }
396 }
397
398 pub fn reset(&mut self) {
399 self.tables.clear();
400 self.backups.clear();
401 self.global_tables.clear();
402 self.exports.clear();
403 self.imports.clear();
404 }
405}
406
407impl fakecloud_core::multi_account::AccountState for DynamoDbState {
408 fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
409 Self::new(account_id, region)
410 }
411}
412
413pub type SharedDynamoDbState =
414 Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>>;
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use serde_json::json;
420
421 #[test]
422 fn attribute_type_and_value_valid() {
423 let v = json!({"S": "hi"});
424 let (ty, val) = attribute_type_and_value(&v).unwrap();
425 assert_eq!(ty, "S");
426 assert_eq!(val, &json!("hi"));
427 }
428
429 #[test]
430 fn attribute_type_and_value_empty_returns_none() {
431 let v = json!({});
432 assert!(attribute_type_and_value(&v).is_none());
433 }
434
435 #[test]
436 fn attribute_type_and_value_multiple_entries_returns_none() {
437 let v = json!({"S": "hi", "N": "1"});
438 assert!(attribute_type_and_value(&v).is_none());
439 }
440
441 #[test]
442 fn attribute_type_and_value_non_object_returns_none() {
443 let v = json!("not-object");
444 assert!(attribute_type_and_value(&v).is_none());
445 }
446
447 #[test]
448 fn account_state_trait_impl() {
449 use fakecloud_core::multi_account::AccountState;
450 let state = DynamoDbState::new_for_account("123", "us-east-1", "");
451 assert_eq!(state.account_id, "123");
452 assert_eq!(state.region, "us-east-1");
453 }
454
455 #[test]
456 fn new_and_reset() {
457 let state = DynamoDbState::new("123", "us-east-1");
458 assert!(state.tables.is_empty());
459 }
460
461 fn table_with_hash_key(hash: &str) -> DynamoTable {
462 DynamoTable {
463 name: "t".to_string(),
464 arn: "arn:aws:dynamodb:us-east-1:123:table/t".to_string(),
465 table_id: "id".to_string(),
466 key_schema: vec![KeySchemaElement {
467 attribute_name: hash.to_string(),
468 key_type: "HASH".to_string(),
469 }],
470 attribute_definitions: vec![],
471 provisioned_throughput: ProvisionedThroughput {
472 read_capacity_units: 1,
473 write_capacity_units: 1,
474 },
475 items: Vec::new(),
476 gsi: Vec::new(),
477 lsi: Vec::new(),
478 tags: HashMap::new(),
479 created_at: Utc::now(),
480 status: "ACTIVE".to_string(),
481 item_count: 0,
482 size_bytes: 0,
483 billing_mode: "PROVISIONED".to_string(),
484 ttl_attribute: None,
485 ttl_enabled: false,
486 resource_policy: None,
487 pitr_enabled: false,
488 kinesis_destinations: Vec::new(),
489 contributor_insights_status: "DISABLED".to_string(),
490 contributor_insights_counters: HashMap::new(),
491 stream_enabled: false,
492 stream_view_type: None,
493 stream_arn: None,
494 stream_records: empty_stream_records(),
495 sse_type: None,
496 sse_kms_key_arn: None,
497 deletion_protection_enabled: false,
498 on_demand_throughput: None,
499 }
500 }
501
502 #[test]
503 fn hash_key_name_extracts_from_schema() {
504 let t = table_with_hash_key("pk");
505 assert_eq!(t.hash_key_name(), "pk");
506 }
507
508 #[test]
509 fn hash_key_name_empty_when_no_hash_schema() {
510 let mut t = table_with_hash_key("pk");
511 t.key_schema.clear();
512 assert_eq!(t.hash_key_name(), "");
513 }
514
515 #[test]
516 fn record_key_access_noop_when_disabled() {
517 let mut t = table_with_hash_key("pk");
518 let mut key = HashMap::new();
519 key.insert("pk".to_string(), json!({"S": "a"}));
520 t.record_key_access(&key);
521 assert!(t.contributor_insights_counters.is_empty());
522 }
523
524 #[test]
525 fn record_key_access_increments_when_enabled() {
526 let mut t = table_with_hash_key("pk");
527 t.contributor_insights_status = "ENABLED".to_string();
528 let mut key = HashMap::new();
529 key.insert("pk".to_string(), json!({"S": "a"}));
530 t.record_key_access(&key);
531 t.record_key_access(&key);
532 assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 2);
533 }
534
535 #[test]
536 fn record_item_access_uses_hash_key_from_item() {
537 let mut t = table_with_hash_key("pk");
538 t.contributor_insights_status = "ENABLED".to_string();
539 let mut item = HashMap::new();
540 item.insert("pk".to_string(), json!({"S": "user-1"}));
541 item.insert("other".to_string(), json!({"N": "42"}));
542 t.record_item_access(&item);
543 assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 1);
544 }
545
546 #[test]
547 fn top_contributors_returns_sorted() {
548 let mut t = table_with_hash_key("pk");
549 t.contributor_insights_counters.insert("a".to_string(), 3);
550 t.contributor_insights_counters.insert("b".to_string(), 10);
551 t.contributor_insights_counters.insert("c".to_string(), 1);
552 let top = t.top_contributors(2);
553 assert_eq!(top.len(), 2);
554 assert_eq!(top[0], ("b", 10));
555 assert_eq!(top[1], ("a", 3));
556 }
557
558 #[test]
559 fn recalculate_stats_matches_items() {
560 let mut t = table_with_hash_key("pk");
561 let mut item1 = HashMap::new();
562 item1.insert("pk".to_string(), json!({"S": "hello"}));
563 let mut item2 = HashMap::new();
564 item2.insert("pk".to_string(), json!({"N": "42"}));
565 item2.insert("flag".to_string(), json!({"BOOL": true}));
566 t.items.push(item1);
567 t.items.push(item2);
568 t.recalculate_stats();
569 assert_eq!(t.item_count, 2);
570 assert!(t.size_bytes > 0);
571 }
572
573 #[test]
574 fn estimate_value_size_covers_all_types() {
575 let s = DynamoTable::estimate_value_size(&json!({"S": "abc"}));
576 assert_eq!(s, 3);
577 let n = DynamoTable::estimate_value_size(&json!({"N": "42"}));
578 assert_eq!(n, 2);
579 let b = DynamoTable::estimate_value_size(&json!({"BOOL": true}));
580 assert_eq!(b, 1);
581 let null = DynamoTable::estimate_value_size(&json!({"NULL": true}));
582 assert_eq!(null, 1);
583 let l = DynamoTable::estimate_value_size(&json!({"L": [{"S": "x"}, {"S": "yy"}]}));
584 assert_eq!(l, 6);
585 let m = DynamoTable::estimate_value_size(&json!({"M": {"key": {"S": "v"}}}));
586 assert_eq!(m, 7);
587 let ss = DynamoTable::estimate_value_size(&json!({"SS": ["ab", "cde"]}));
588 assert_eq!(ss, 5);
589 let ns = DynamoTable::estimate_value_size(&json!({"NS": ["12", "345"]}));
590 assert_eq!(ns, 5);
591 let bin = DynamoTable::estimate_value_size(&json!({"B": "AAAAAAAA"}));
592 assert_eq!(bin, 6);
593 }
594}