use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
fn empty_stream_records() -> Arc<RwLock<Vec<StreamRecord>>> {
Arc::new(RwLock::new(Vec::new()))
}
pub type AttributeValue = Value;
pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
let obj = av.as_object()?;
if obj.len() != 1 {
return None;
}
let (k, v) = obj.iter().next()?;
Some((k.as_str(), v))
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KeySchemaElement {
pub attribute_name: String,
pub key_type: String, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AttributeDefinition {
pub attribute_name: String,
pub attribute_type: String, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProvisionedThroughput {
pub read_capacity_units: i64,
pub write_capacity_units: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OnDemandThroughput {
pub max_read_request_units: i64,
pub max_write_request_units: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GlobalSecondaryIndex {
pub index_name: String,
pub key_schema: Vec<KeySchemaElement>,
pub projection: Projection,
pub provisioned_throughput: Option<ProvisionedThroughput>,
pub on_demand_throughput: Option<OnDemandThroughput>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalSecondaryIndex {
pub index_name: String,
pub key_schema: Vec<KeySchemaElement>,
pub projection: Projection,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Projection {
pub projection_type: String, pub non_key_attributes: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DynamoTable {
pub name: String,
pub arn: String,
pub table_id: String,
pub key_schema: Vec<KeySchemaElement>,
pub attribute_definitions: Vec<AttributeDefinition>,
pub provisioned_throughput: ProvisionedThroughput,
pub items: Vec<HashMap<String, AttributeValue>>,
pub gsi: Vec<GlobalSecondaryIndex>,
pub lsi: Vec<LocalSecondaryIndex>,
pub tags: BTreeMap<String, String>,
pub created_at: DateTime<Utc>,
pub status: String,
pub item_count: i64,
pub size_bytes: i64,
pub billing_mode: String, pub ttl_attribute: Option<String>,
pub ttl_enabled: bool,
pub resource_policy: Option<String>,
pub pitr_enabled: bool,
pub kinesis_destinations: Vec<KinesisDestination>,
pub contributor_insights_status: String,
pub contributor_insights_counters: BTreeMap<String, u64>,
pub stream_enabled: bool,
pub stream_view_type: Option<String>, pub stream_arn: Option<String>,
#[serde(skip, default = "empty_stream_records")]
pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
pub sse_type: Option<String>,
pub sse_kms_key_arn: Option<String>,
pub deletion_protection_enabled: bool,
pub on_demand_throughput: Option<OnDemandThroughput>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamRecord {
pub event_id: String,
pub event_name: String, pub event_version: String,
pub event_source: String,
pub aws_region: String,
pub dynamodb: DynamoDbStreamRecord,
pub event_source_arn: String,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DynamoDbStreamRecord {
pub keys: HashMap<String, AttributeValue>,
pub new_image: Option<HashMap<String, AttributeValue>>,
pub old_image: Option<HashMap<String, AttributeValue>>,
pub sequence_number: String,
pub size_bytes: i64,
pub stream_view_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KinesisDestination {
pub stream_arn: String,
pub destination_status: String,
pub approximate_creation_date_time_precision: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupDescription {
pub backup_arn: String,
pub backup_name: String,
pub table_name: String,
pub table_arn: String,
pub backup_status: String,
pub backup_type: String,
pub backup_creation_date: DateTime<Utc>,
pub key_schema: Vec<KeySchemaElement>,
pub attribute_definitions: Vec<AttributeDefinition>,
pub provisioned_throughput: ProvisionedThroughput,
pub billing_mode: String,
pub item_count: i64,
pub size_bytes: i64,
pub items: Vec<HashMap<String, AttributeValue>>,
#[serde(default)]
pub gsi: Vec<GlobalSecondaryIndex>,
#[serde(default)]
pub lsi: Vec<LocalSecondaryIndex>,
#[serde(default)]
pub tags: BTreeMap<String, String>,
#[serde(default)]
pub ttl_attribute: Option<String>,
#[serde(default)]
pub ttl_enabled: bool,
#[serde(default)]
pub sse_type: Option<String>,
#[serde(default)]
pub sse_kms_key_arn: Option<String>,
#[serde(default)]
pub stream_enabled: bool,
#[serde(default)]
pub stream_view_type: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GlobalTableDescription {
pub global_table_name: String,
pub global_table_arn: String,
pub global_table_status: String,
pub creation_date: DateTime<Utc>,
pub replication_group: Vec<ReplicaDescription>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReplicaDescription {
pub region_name: String,
pub replica_status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportDescription {
pub export_arn: String,
pub export_status: String,
pub table_arn: String,
pub s3_bucket: String,
pub s3_prefix: Option<String>,
pub export_format: String,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub export_time: DateTime<Utc>,
pub item_count: i64,
pub billed_size_bytes: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImportDescription {
pub import_arn: String,
pub import_status: String,
pub table_arn: String,
pub table_name: String,
pub s3_bucket_source: String,
pub input_format: String,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub processed_item_count: i64,
pub processed_size_bytes: i64,
}
impl DynamoTable {
pub fn hash_key_name(&self) -> &str {
self.key_schema
.iter()
.find(|k| k.key_type == "HASH")
.map(|k| k.attribute_name.as_str())
.unwrap_or("")
}
pub fn range_key_name(&self) -> Option<&str> {
self.key_schema
.iter()
.find(|k| k.key_type == "RANGE")
.map(|k| k.attribute_name.as_str())
}
pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
let hash_key = self.hash_key_name();
let range_key = self.range_key_name();
self.items.iter().position(|item| {
let hash_match = match (item.get(hash_key), key.get(hash_key)) {
(Some(a), Some(b)) => a == b,
_ => false,
};
if !hash_match {
return false;
}
match range_key {
Some(rk) => match (item.get(rk), key.get(rk)) {
(Some(a), Some(b)) => a == b,
(None, None) => true,
_ => false,
},
None => true,
}
})
}
fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
let mut size: i64 = 0;
for (k, v) in item {
size += k.len() as i64;
size += Self::estimate_value_size(v);
}
size
}
fn estimate_value_size(v: &Value) -> i64 {
match v {
Value::Object(obj) => {
if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
s.len() as i64
} else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
n.len() as i64
} else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
1
} else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
} else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
3 + m
.iter()
.map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
.sum::<i64>()
} else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
ss.iter()
.filter_map(|v| v.as_str())
.map(|s| s.len() as i64)
.sum()
} else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
ns.iter()
.filter_map(|v| v.as_str())
.map(|s| s.len() as i64)
.sum()
} else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
(b.len() as i64 * 3) / 4
} else {
v.to_string().len() as i64
}
}
_ => v.to_string().len() as i64,
}
}
pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
if self.contributor_insights_status != "ENABLED" {
return;
}
let hash_key = self.hash_key_name().to_string();
if let Some(pk_value) = key.get(&hash_key) {
let key_str = pk_value.to_string();
*self
.contributor_insights_counters
.entry(key_str)
.or_insert(0) += 1;
}
}
pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
if self.contributor_insights_status != "ENABLED" {
return;
}
let hash_key = self.hash_key_name().to_string();
if let Some(pk_value) = item.get(&hash_key) {
let key_str = pk_value.to_string();
*self
.contributor_insights_counters
.entry(key_str)
.or_insert(0) += 1;
}
}
pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
let mut entries: Vec<(&str, u64)> = self
.contributor_insights_counters
.iter()
.map(|(k, &v)| (k.as_str(), v))
.collect();
entries.sort_by_key(|e| std::cmp::Reverse(e.1));
entries.truncate(n);
entries
}
pub fn recalculate_stats(&mut self) {
self.item_count = self.items.len() as i64;
self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DynamoDbState {
pub account_id: String,
pub region: String,
pub tables: BTreeMap<String, DynamoTable>,
pub backups: BTreeMap<String, BackupDescription>,
pub global_tables: BTreeMap<String, GlobalTableDescription>,
pub exports: BTreeMap<String, ExportDescription>,
pub imports: BTreeMap<String, ImportDescription>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DynamoDbSnapshot {
pub schema_version: u32,
#[serde(default)]
pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>,
#[serde(default)]
pub state: Option<DynamoDbState>,
}
pub const DYNAMODB_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
impl DynamoDbState {
pub fn new(account_id: &str, region: &str) -> Self {
Self {
account_id: account_id.to_string(),
region: region.to_string(),
tables: BTreeMap::new(),
backups: BTreeMap::new(),
global_tables: BTreeMap::new(),
exports: BTreeMap::new(),
imports: BTreeMap::new(),
}
}
pub fn reset(&mut self) {
self.tables.clear();
self.backups.clear();
self.global_tables.clear();
self.exports.clear();
self.imports.clear();
}
}
impl fakecloud_core::multi_account::AccountState for DynamoDbState {
fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
Self::new(account_id, region)
}
}
pub type SharedDynamoDbState =
Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>>;
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn attribute_type_and_value_valid() {
let v = json!({"S": "hi"});
let (ty, val) = attribute_type_and_value(&v).unwrap();
assert_eq!(ty, "S");
assert_eq!(val, &json!("hi"));
}
#[test]
fn attribute_type_and_value_empty_returns_none() {
let v = json!({});
assert!(attribute_type_and_value(&v).is_none());
}
#[test]
fn attribute_type_and_value_multiple_entries_returns_none() {
let v = json!({"S": "hi", "N": "1"});
assert!(attribute_type_and_value(&v).is_none());
}
#[test]
fn attribute_type_and_value_non_object_returns_none() {
let v = json!("not-object");
assert!(attribute_type_and_value(&v).is_none());
}
#[test]
fn account_state_trait_impl() {
use fakecloud_core::multi_account::AccountState;
let state = DynamoDbState::new_for_account("123", "us-east-1", "");
assert_eq!(state.account_id, "123");
assert_eq!(state.region, "us-east-1");
}
#[test]
fn new_and_reset() {
let state = DynamoDbState::new("123", "us-east-1");
assert!(state.tables.is_empty());
}
fn table_with_hash_key(hash: &str) -> DynamoTable {
DynamoTable {
name: "t".to_string(),
arn: "arn:aws:dynamodb:us-east-1:123:table/t".to_string(),
table_id: "id".to_string(),
key_schema: vec![KeySchemaElement {
attribute_name: hash.to_string(),
key_type: "HASH".to_string(),
}],
attribute_definitions: vec![],
provisioned_throughput: ProvisionedThroughput {
read_capacity_units: 1,
write_capacity_units: 1,
},
items: Vec::new(),
gsi: Vec::new(),
lsi: Vec::new(),
tags: BTreeMap::new(),
created_at: Utc::now(),
status: "ACTIVE".to_string(),
item_count: 0,
size_bytes: 0,
billing_mode: "PROVISIONED".to_string(),
ttl_attribute: None,
ttl_enabled: false,
resource_policy: None,
pitr_enabled: false,
kinesis_destinations: Vec::new(),
contributor_insights_status: "DISABLED".to_string(),
contributor_insights_counters: BTreeMap::new(),
stream_enabled: false,
stream_view_type: None,
stream_arn: None,
stream_records: empty_stream_records(),
sse_type: None,
sse_kms_key_arn: None,
deletion_protection_enabled: false,
on_demand_throughput: None,
}
}
#[test]
fn hash_key_name_extracts_from_schema() {
let t = table_with_hash_key("pk");
assert_eq!(t.hash_key_name(), "pk");
}
#[test]
fn hash_key_name_empty_when_no_hash_schema() {
let mut t = table_with_hash_key("pk");
t.key_schema.clear();
assert_eq!(t.hash_key_name(), "");
}
#[test]
fn record_key_access_noop_when_disabled() {
let mut t = table_with_hash_key("pk");
let mut key = HashMap::new();
key.insert("pk".to_string(), json!({"S": "a"}));
t.record_key_access(&key);
assert!(t.contributor_insights_counters.is_empty());
}
#[test]
fn record_key_access_increments_when_enabled() {
let mut t = table_with_hash_key("pk");
t.contributor_insights_status = "ENABLED".to_string();
let mut key = HashMap::new();
key.insert("pk".to_string(), json!({"S": "a"}));
t.record_key_access(&key);
t.record_key_access(&key);
assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 2);
}
#[test]
fn record_item_access_uses_hash_key_from_item() {
let mut t = table_with_hash_key("pk");
t.contributor_insights_status = "ENABLED".to_string();
let mut item = HashMap::new();
item.insert("pk".to_string(), json!({"S": "user-1"}));
item.insert("other".to_string(), json!({"N": "42"}));
t.record_item_access(&item);
assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 1);
}
#[test]
fn top_contributors_returns_sorted() {
let mut t = table_with_hash_key("pk");
t.contributor_insights_counters.insert("a".to_string(), 3);
t.contributor_insights_counters.insert("b".to_string(), 10);
t.contributor_insights_counters.insert("c".to_string(), 1);
let top = t.top_contributors(2);
assert_eq!(top.len(), 2);
assert_eq!(top[0], ("b", 10));
assert_eq!(top[1], ("a", 3));
}
#[test]
fn recalculate_stats_matches_items() {
let mut t = table_with_hash_key("pk");
let mut item1 = HashMap::new();
item1.insert("pk".to_string(), json!({"S": "hello"}));
let mut item2 = HashMap::new();
item2.insert("pk".to_string(), json!({"N": "42"}));
item2.insert("flag".to_string(), json!({"BOOL": true}));
t.items.push(item1);
t.items.push(item2);
t.recalculate_stats();
assert_eq!(t.item_count, 2);
assert!(t.size_bytes > 0);
}
#[test]
fn estimate_value_size_covers_all_types() {
let s = DynamoTable::estimate_value_size(&json!({"S": "abc"}));
assert_eq!(s, 3);
let n = DynamoTable::estimate_value_size(&json!({"N": "42"}));
assert_eq!(n, 2);
let b = DynamoTable::estimate_value_size(&json!({"BOOL": true}));
assert_eq!(b, 1);
let null = DynamoTable::estimate_value_size(&json!({"NULL": true}));
assert_eq!(null, 1);
let l = DynamoTable::estimate_value_size(&json!({"L": [{"S": "x"}, {"S": "yy"}]}));
assert_eq!(l, 6);
let m = DynamoTable::estimate_value_size(&json!({"M": {"key": {"S": "v"}}}));
assert_eq!(m, 7);
let ss = DynamoTable::estimate_value_size(&json!({"SS": ["ab", "cde"]}));
assert_eq!(ss, 5);
let ns = DynamoTable::estimate_value_size(&json!({"NS": ["12", "345"]}));
assert_eq!(ns, 5);
let bin = DynamoTable::estimate_value_size(&json!({"B": "AAAAAAAA"}));
assert_eq!(bin, 6);
}
}