1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7pub type AttributeValue = Value;
10
11pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
14 let obj = av.as_object()?;
15 if obj.len() != 1 {
16 return None;
17 }
18 let (k, v) = obj.iter().next()?;
19 Some((k.as_str(), v))
20}
21
22#[derive(Debug, Clone)]
23pub struct KeySchemaElement {
24 pub attribute_name: String,
25 pub key_type: String, }
27
28#[derive(Debug, Clone)]
29pub struct AttributeDefinition {
30 pub attribute_name: String,
31 pub attribute_type: String, }
33
34#[derive(Debug, Clone)]
35pub struct ProvisionedThroughput {
36 pub read_capacity_units: i64,
37 pub write_capacity_units: i64,
38}
39
40#[derive(Debug, Clone)]
41pub struct GlobalSecondaryIndex {
42 pub index_name: String,
43 pub key_schema: Vec<KeySchemaElement>,
44 pub projection: Projection,
45 pub provisioned_throughput: Option<ProvisionedThroughput>,
46}
47
48#[derive(Debug, Clone)]
49pub struct LocalSecondaryIndex {
50 pub index_name: String,
51 pub key_schema: Vec<KeySchemaElement>,
52 pub projection: Projection,
53}
54
55#[derive(Debug, Clone)]
56pub struct Projection {
57 pub projection_type: String, pub non_key_attributes: Vec<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct DynamoTable {
63 pub name: String,
64 pub arn: String,
65 pub key_schema: Vec<KeySchemaElement>,
66 pub attribute_definitions: Vec<AttributeDefinition>,
67 pub provisioned_throughput: ProvisionedThroughput,
68 pub items: Vec<HashMap<String, AttributeValue>>,
69 pub gsi: Vec<GlobalSecondaryIndex>,
70 pub lsi: Vec<LocalSecondaryIndex>,
71 pub tags: HashMap<String, String>,
72 pub created_at: DateTime<Utc>,
73 pub status: String,
74 pub item_count: i64,
75 pub size_bytes: i64,
76 pub billing_mode: String, pub ttl_attribute: Option<String>,
78 pub ttl_enabled: bool,
79 pub resource_policy: Option<String>,
80 pub pitr_enabled: bool,
82 pub kinesis_destinations: Vec<KinesisDestination>,
84 pub contributor_insights_status: String,
86 pub contributor_insights_counters: HashMap<String, u64>,
88}
89
90#[derive(Debug, Clone)]
91pub struct KinesisDestination {
92 pub stream_arn: String,
93 pub destination_status: String,
94 pub approximate_creation_date_time_precision: String,
95}
96
97#[derive(Debug, Clone)]
98pub struct BackupDescription {
99 pub backup_arn: String,
100 pub backup_name: String,
101 pub table_name: String,
102 pub table_arn: String,
103 pub backup_status: String,
104 pub backup_type: String,
105 pub backup_creation_date: DateTime<Utc>,
106 pub key_schema: Vec<KeySchemaElement>,
107 pub attribute_definitions: Vec<AttributeDefinition>,
108 pub provisioned_throughput: ProvisionedThroughput,
109 pub billing_mode: String,
110 pub item_count: i64,
111 pub size_bytes: i64,
112 pub items: Vec<HashMap<String, AttributeValue>>,
114}
115
116#[derive(Debug, Clone)]
117pub struct GlobalTableDescription {
118 pub global_table_name: String,
119 pub global_table_arn: String,
120 pub global_table_status: String,
121 pub creation_date: DateTime<Utc>,
122 pub replication_group: Vec<ReplicaDescription>,
123}
124
125#[derive(Debug, Clone)]
126pub struct ReplicaDescription {
127 pub region_name: String,
128 pub replica_status: String,
129}
130
131#[derive(Debug, Clone)]
132pub struct ExportDescription {
133 pub export_arn: String,
134 pub export_status: String,
135 pub table_arn: String,
136 pub s3_bucket: String,
137 pub s3_prefix: Option<String>,
138 pub export_format: String,
139 pub start_time: DateTime<Utc>,
140 pub end_time: DateTime<Utc>,
141 pub export_time: DateTime<Utc>,
142 pub item_count: i64,
143 pub billed_size_bytes: i64,
144}
145
146#[derive(Debug, Clone)]
147pub struct ImportDescription {
148 pub import_arn: String,
149 pub import_status: String,
150 pub table_arn: String,
151 pub table_name: String,
152 pub s3_bucket_source: String,
153 pub input_format: String,
154 pub start_time: DateTime<Utc>,
155 pub end_time: DateTime<Utc>,
156 pub processed_item_count: i64,
157 pub processed_size_bytes: i64,
158}
159
160impl DynamoTable {
161 pub fn hash_key_name(&self) -> &str {
163 self.key_schema
164 .iter()
165 .find(|k| k.key_type == "HASH")
166 .map(|k| k.attribute_name.as_str())
167 .unwrap_or("")
168 }
169
170 pub fn range_key_name(&self) -> Option<&str> {
172 self.key_schema
173 .iter()
174 .find(|k| k.key_type == "RANGE")
175 .map(|k| k.attribute_name.as_str())
176 }
177
178 pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
180 let hash_key = self.hash_key_name();
181 let range_key = self.range_key_name();
182
183 self.items.iter().position(|item| {
184 let hash_match = match (item.get(hash_key), key.get(hash_key)) {
185 (Some(a), Some(b)) => a == b,
186 _ => false,
187 };
188 if !hash_match {
189 return false;
190 }
191 match range_key {
192 Some(rk) => match (item.get(rk), key.get(rk)) {
193 (Some(a), Some(b)) => a == b,
194 (None, None) => true,
195 _ => false,
196 },
197 None => true,
198 }
199 })
200 }
201
202 fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
204 let mut size: i64 = 0;
205 for (k, v) in item {
206 size += k.len() as i64;
207 size += Self::estimate_value_size(v);
208 }
209 size
210 }
211
212 fn estimate_value_size(v: &Value) -> i64 {
213 match v {
214 Value::Object(obj) => {
215 if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
216 s.len() as i64
217 } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
218 n.len() as i64
219 } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
220 1
221 } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
222 3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
223 } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
224 3 + m
225 .iter()
226 .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
227 .sum::<i64>()
228 } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
229 ss.iter()
230 .filter_map(|v| v.as_str())
231 .map(|s| s.len() as i64)
232 .sum()
233 } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
234 ns.iter()
235 .filter_map(|v| v.as_str())
236 .map(|s| s.len() as i64)
237 .sum()
238 } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
239 (b.len() as i64 * 3) / 4
241 } else {
242 v.to_string().len() as i64
243 }
244 }
245 _ => v.to_string().len() as i64,
246 }
247 }
248
249 pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
252 if self.contributor_insights_status != "ENABLED" {
253 return;
254 }
255 let hash_key = self.hash_key_name().to_string();
256 if let Some(pk_value) = key.get(&hash_key) {
257 let key_str = pk_value.to_string();
258 *self
259 .contributor_insights_counters
260 .entry(key_str)
261 .or_insert(0) += 1;
262 }
263 }
264
265 pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
267 if self.contributor_insights_status != "ENABLED" {
268 return;
269 }
270 let hash_key = self.hash_key_name().to_string();
271 if let Some(pk_value) = item.get(&hash_key) {
272 let key_str = pk_value.to_string();
273 *self
274 .contributor_insights_counters
275 .entry(key_str)
276 .or_insert(0) += 1;
277 }
278 }
279
280 pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
282 let mut entries: Vec<(&str, u64)> = self
283 .contributor_insights_counters
284 .iter()
285 .map(|(k, &v)| (k.as_str(), v))
286 .collect();
287 entries.sort_by(|a, b| b.1.cmp(&a.1));
288 entries.truncate(n);
289 entries
290 }
291
292 pub fn recalculate_stats(&mut self) {
294 self.item_count = self.items.len() as i64;
295 self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
296 }
297}
298
299pub struct DynamoDbState {
300 pub account_id: String,
301 pub region: String,
302 pub tables: HashMap<String, DynamoTable>,
303 pub backups: HashMap<String, BackupDescription>,
304 pub global_tables: HashMap<String, GlobalTableDescription>,
305 pub exports: HashMap<String, ExportDescription>,
306 pub imports: HashMap<String, ImportDescription>,
307}
308
309impl DynamoDbState {
310 pub fn new(account_id: &str, region: &str) -> Self {
311 Self {
312 account_id: account_id.to_string(),
313 region: region.to_string(),
314 tables: HashMap::new(),
315 backups: HashMap::new(),
316 global_tables: HashMap::new(),
317 exports: HashMap::new(),
318 imports: HashMap::new(),
319 }
320 }
321
322 pub fn reset(&mut self) {
323 self.tables.clear();
324 self.backups.clear();
325 self.global_tables.clear();
326 self.exports.clear();
327 self.imports.clear();
328 }
329}
330
331pub type SharedDynamoDbState = Arc<RwLock<DynamoDbState>>;