1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7pub type AttributeValue = Value;
10
11pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
14 let obj = av.as_object()?;
15 if obj.len() != 1 {
16 return None;
17 }
18 let (k, v) = obj.iter().next()?;
19 Some((k.as_str(), v))
20}
21
22#[derive(Debug, Clone)]
23pub struct KeySchemaElement {
24 pub attribute_name: String,
25 pub key_type: String, }
27
28#[derive(Debug, Clone)]
29pub struct AttributeDefinition {
30 pub attribute_name: String,
31 pub attribute_type: String, }
33
34#[derive(Debug, Clone)]
35pub struct ProvisionedThroughput {
36 pub read_capacity_units: i64,
37 pub write_capacity_units: i64,
38}
39
40#[derive(Debug, Clone)]
41pub struct GlobalSecondaryIndex {
42 pub index_name: String,
43 pub key_schema: Vec<KeySchemaElement>,
44 pub projection: Projection,
45 pub provisioned_throughput: Option<ProvisionedThroughput>,
46}
47
48#[derive(Debug, Clone)]
49pub struct LocalSecondaryIndex {
50 pub index_name: String,
51 pub key_schema: Vec<KeySchemaElement>,
52 pub projection: Projection,
53}
54
55#[derive(Debug, Clone)]
56pub struct Projection {
57 pub projection_type: String, pub non_key_attributes: Vec<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct DynamoTable {
63 pub name: String,
64 pub arn: String,
65 pub key_schema: Vec<KeySchemaElement>,
66 pub attribute_definitions: Vec<AttributeDefinition>,
67 pub provisioned_throughput: ProvisionedThroughput,
68 pub items: Vec<HashMap<String, AttributeValue>>,
69 pub gsi: Vec<GlobalSecondaryIndex>,
70 pub lsi: Vec<LocalSecondaryIndex>,
71 pub tags: HashMap<String, String>,
72 pub created_at: DateTime<Utc>,
73 pub status: String,
74 pub item_count: i64,
75 pub size_bytes: i64,
76 pub billing_mode: String, pub ttl_attribute: Option<String>,
78 pub ttl_enabled: bool,
79 pub resource_policy: Option<String>,
80 pub pitr_enabled: bool,
82 pub kinesis_destinations: Vec<KinesisDestination>,
84 pub contributor_insights_status: String,
86 pub contributor_insights_counters: HashMap<String, u64>,
88 pub stream_enabled: bool,
90 pub stream_view_type: Option<String>, pub stream_arn: Option<String>,
92 pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
94 pub sse_type: Option<String>,
96 pub sse_kms_key_arn: Option<String>,
98}
99
100#[derive(Debug, Clone)]
101pub struct StreamRecord {
102 pub event_id: String,
103 pub event_name: String, pub event_version: String,
105 pub event_source: String,
106 pub aws_region: String,
107 pub dynamodb: DynamoDbStreamRecord,
108 pub event_source_arn: String,
109 pub timestamp: DateTime<Utc>,
110}
111
112#[derive(Debug, Clone)]
113pub struct DynamoDbStreamRecord {
114 pub keys: HashMap<String, AttributeValue>,
115 pub new_image: Option<HashMap<String, AttributeValue>>,
116 pub old_image: Option<HashMap<String, AttributeValue>>,
117 pub sequence_number: String,
118 pub size_bytes: i64,
119 pub stream_view_type: String,
120}
121
122#[derive(Debug, Clone)]
123pub struct KinesisDestination {
124 pub stream_arn: String,
125 pub destination_status: String,
126 pub approximate_creation_date_time_precision: String,
127}
128
129#[derive(Debug, Clone)]
130pub struct BackupDescription {
131 pub backup_arn: String,
132 pub backup_name: String,
133 pub table_name: String,
134 pub table_arn: String,
135 pub backup_status: String,
136 pub backup_type: String,
137 pub backup_creation_date: DateTime<Utc>,
138 pub key_schema: Vec<KeySchemaElement>,
139 pub attribute_definitions: Vec<AttributeDefinition>,
140 pub provisioned_throughput: ProvisionedThroughput,
141 pub billing_mode: String,
142 pub item_count: i64,
143 pub size_bytes: i64,
144 pub items: Vec<HashMap<String, AttributeValue>>,
146}
147
148#[derive(Debug, Clone)]
149pub struct GlobalTableDescription {
150 pub global_table_name: String,
151 pub global_table_arn: String,
152 pub global_table_status: String,
153 pub creation_date: DateTime<Utc>,
154 pub replication_group: Vec<ReplicaDescription>,
155}
156
157#[derive(Debug, Clone)]
158pub struct ReplicaDescription {
159 pub region_name: String,
160 pub replica_status: String,
161}
162
163#[derive(Debug, Clone)]
164pub struct ExportDescription {
165 pub export_arn: String,
166 pub export_status: String,
167 pub table_arn: String,
168 pub s3_bucket: String,
169 pub s3_prefix: Option<String>,
170 pub export_format: String,
171 pub start_time: DateTime<Utc>,
172 pub end_time: DateTime<Utc>,
173 pub export_time: DateTime<Utc>,
174 pub item_count: i64,
175 pub billed_size_bytes: i64,
176}
177
178#[derive(Debug, Clone)]
179pub struct ImportDescription {
180 pub import_arn: String,
181 pub import_status: String,
182 pub table_arn: String,
183 pub table_name: String,
184 pub s3_bucket_source: String,
185 pub input_format: String,
186 pub start_time: DateTime<Utc>,
187 pub end_time: DateTime<Utc>,
188 pub processed_item_count: i64,
189 pub processed_size_bytes: i64,
190}
191
192impl DynamoTable {
193 pub fn hash_key_name(&self) -> &str {
195 self.key_schema
196 .iter()
197 .find(|k| k.key_type == "HASH")
198 .map(|k| k.attribute_name.as_str())
199 .unwrap_or("")
200 }
201
202 pub fn range_key_name(&self) -> Option<&str> {
204 self.key_schema
205 .iter()
206 .find(|k| k.key_type == "RANGE")
207 .map(|k| k.attribute_name.as_str())
208 }
209
210 pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
212 let hash_key = self.hash_key_name();
213 let range_key = self.range_key_name();
214
215 self.items.iter().position(|item| {
216 let hash_match = match (item.get(hash_key), key.get(hash_key)) {
217 (Some(a), Some(b)) => a == b,
218 _ => false,
219 };
220 if !hash_match {
221 return false;
222 }
223 match range_key {
224 Some(rk) => match (item.get(rk), key.get(rk)) {
225 (Some(a), Some(b)) => a == b,
226 (None, None) => true,
227 _ => false,
228 },
229 None => true,
230 }
231 })
232 }
233
234 fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
236 let mut size: i64 = 0;
237 for (k, v) in item {
238 size += k.len() as i64;
239 size += Self::estimate_value_size(v);
240 }
241 size
242 }
243
244 fn estimate_value_size(v: &Value) -> i64 {
245 match v {
246 Value::Object(obj) => {
247 if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
248 s.len() as i64
249 } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
250 n.len() as i64
251 } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
252 1
253 } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
254 3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
255 } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
256 3 + m
257 .iter()
258 .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
259 .sum::<i64>()
260 } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
261 ss.iter()
262 .filter_map(|v| v.as_str())
263 .map(|s| s.len() as i64)
264 .sum()
265 } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
266 ns.iter()
267 .filter_map(|v| v.as_str())
268 .map(|s| s.len() as i64)
269 .sum()
270 } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
271 (b.len() as i64 * 3) / 4
273 } else {
274 v.to_string().len() as i64
275 }
276 }
277 _ => v.to_string().len() as i64,
278 }
279 }
280
281 pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
284 if self.contributor_insights_status != "ENABLED" {
285 return;
286 }
287 let hash_key = self.hash_key_name().to_string();
288 if let Some(pk_value) = key.get(&hash_key) {
289 let key_str = pk_value.to_string();
290 *self
291 .contributor_insights_counters
292 .entry(key_str)
293 .or_insert(0) += 1;
294 }
295 }
296
297 pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
299 if self.contributor_insights_status != "ENABLED" {
300 return;
301 }
302 let hash_key = self.hash_key_name().to_string();
303 if let Some(pk_value) = item.get(&hash_key) {
304 let key_str = pk_value.to_string();
305 *self
306 .contributor_insights_counters
307 .entry(key_str)
308 .or_insert(0) += 1;
309 }
310 }
311
312 pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
314 let mut entries: Vec<(&str, u64)> = self
315 .contributor_insights_counters
316 .iter()
317 .map(|(k, &v)| (k.as_str(), v))
318 .collect();
319 entries.sort_by(|a, b| b.1.cmp(&a.1));
320 entries.truncate(n);
321 entries
322 }
323
324 pub fn recalculate_stats(&mut self) {
326 self.item_count = self.items.len() as i64;
327 self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
328 }
329}
330
331pub struct DynamoDbState {
332 pub account_id: String,
333 pub region: String,
334 pub tables: HashMap<String, DynamoTable>,
335 pub backups: HashMap<String, BackupDescription>,
336 pub global_tables: HashMap<String, GlobalTableDescription>,
337 pub exports: HashMap<String, ExportDescription>,
338 pub imports: HashMap<String, ImportDescription>,
339}
340
341impl DynamoDbState {
342 pub fn new(account_id: &str, region: &str) -> Self {
343 Self {
344 account_id: account_id.to_string(),
345 region: region.to_string(),
346 tables: HashMap::new(),
347 backups: HashMap::new(),
348 global_tables: HashMap::new(),
349 exports: HashMap::new(),
350 imports: HashMap::new(),
351 }
352 }
353
354 pub fn reset(&mut self) {
355 self.tables.clear();
356 self.backups.clear();
357 self.global_tables.clear();
358 self.exports.clear();
359 self.imports.clear();
360 }
361}
362
363pub type SharedDynamoDbState = Arc<RwLock<DynamoDbState>>;