1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7pub type AttributeValue = Value;
10
11pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
14 let obj = av.as_object()?;
15 if obj.len() != 1 {
16 return None;
17 }
18 let (k, v) = obj.iter().next()?;
19 Some((k.as_str(), v))
20}
21
22#[derive(Debug, Clone)]
23pub struct KeySchemaElement {
24 pub attribute_name: String,
25 pub key_type: String, }
27
28#[derive(Debug, Clone)]
29pub struct AttributeDefinition {
30 pub attribute_name: String,
31 pub attribute_type: String, }
33
34#[derive(Debug, Clone)]
35pub struct ProvisionedThroughput {
36 pub read_capacity_units: i64,
37 pub write_capacity_units: i64,
38}
39
40#[derive(Debug, Clone)]
41pub struct GlobalSecondaryIndex {
42 pub index_name: String,
43 pub key_schema: Vec<KeySchemaElement>,
44 pub projection: Projection,
45 pub provisioned_throughput: Option<ProvisionedThroughput>,
46}
47
48#[derive(Debug, Clone)]
49pub struct LocalSecondaryIndex {
50 pub index_name: String,
51 pub key_schema: Vec<KeySchemaElement>,
52 pub projection: Projection,
53}
54
55#[derive(Debug, Clone)]
56pub struct Projection {
57 pub projection_type: String, pub non_key_attributes: Vec<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct DynamoTable {
63 pub name: String,
64 pub arn: String,
65 pub table_id: String,
66 pub key_schema: Vec<KeySchemaElement>,
67 pub attribute_definitions: Vec<AttributeDefinition>,
68 pub provisioned_throughput: ProvisionedThroughput,
69 pub items: Vec<HashMap<String, AttributeValue>>,
70 pub gsi: Vec<GlobalSecondaryIndex>,
71 pub lsi: Vec<LocalSecondaryIndex>,
72 pub tags: HashMap<String, String>,
73 pub created_at: DateTime<Utc>,
74 pub status: String,
75 pub item_count: i64,
76 pub size_bytes: i64,
77 pub billing_mode: String, pub ttl_attribute: Option<String>,
79 pub ttl_enabled: bool,
80 pub resource_policy: Option<String>,
81 pub pitr_enabled: bool,
83 pub kinesis_destinations: Vec<KinesisDestination>,
85 pub contributor_insights_status: String,
87 pub contributor_insights_counters: HashMap<String, u64>,
89 pub stream_enabled: bool,
91 pub stream_view_type: Option<String>, pub stream_arn: Option<String>,
93 pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
95 pub sse_type: Option<String>,
97 pub sse_kms_key_arn: Option<String>,
99 pub deletion_protection_enabled: bool,
103}
104
105#[derive(Debug, Clone)]
106pub struct StreamRecord {
107 pub event_id: String,
108 pub event_name: String, pub event_version: String,
110 pub event_source: String,
111 pub aws_region: String,
112 pub dynamodb: DynamoDbStreamRecord,
113 pub event_source_arn: String,
114 pub timestamp: DateTime<Utc>,
115}
116
117#[derive(Debug, Clone)]
118pub struct DynamoDbStreamRecord {
119 pub keys: HashMap<String, AttributeValue>,
120 pub new_image: Option<HashMap<String, AttributeValue>>,
121 pub old_image: Option<HashMap<String, AttributeValue>>,
122 pub sequence_number: String,
123 pub size_bytes: i64,
124 pub stream_view_type: String,
125}
126
127#[derive(Debug, Clone)]
128pub struct KinesisDestination {
129 pub stream_arn: String,
130 pub destination_status: String,
131 pub approximate_creation_date_time_precision: String,
132}
133
134#[derive(Debug, Clone)]
135pub struct BackupDescription {
136 pub backup_arn: String,
137 pub backup_name: String,
138 pub table_name: String,
139 pub table_arn: String,
140 pub backup_status: String,
141 pub backup_type: String,
142 pub backup_creation_date: DateTime<Utc>,
143 pub key_schema: Vec<KeySchemaElement>,
144 pub attribute_definitions: Vec<AttributeDefinition>,
145 pub provisioned_throughput: ProvisionedThroughput,
146 pub billing_mode: String,
147 pub item_count: i64,
148 pub size_bytes: i64,
149 pub items: Vec<HashMap<String, AttributeValue>>,
151}
152
153#[derive(Debug, Clone)]
154pub struct GlobalTableDescription {
155 pub global_table_name: String,
156 pub global_table_arn: String,
157 pub global_table_status: String,
158 pub creation_date: DateTime<Utc>,
159 pub replication_group: Vec<ReplicaDescription>,
160}
161
162#[derive(Debug, Clone)]
163pub struct ReplicaDescription {
164 pub region_name: String,
165 pub replica_status: String,
166}
167
168#[derive(Debug, Clone)]
169pub struct ExportDescription {
170 pub export_arn: String,
171 pub export_status: String,
172 pub table_arn: String,
173 pub s3_bucket: String,
174 pub s3_prefix: Option<String>,
175 pub export_format: String,
176 pub start_time: DateTime<Utc>,
177 pub end_time: DateTime<Utc>,
178 pub export_time: DateTime<Utc>,
179 pub item_count: i64,
180 pub billed_size_bytes: i64,
181}
182
183#[derive(Debug, Clone)]
184pub struct ImportDescription {
185 pub import_arn: String,
186 pub import_status: String,
187 pub table_arn: String,
188 pub table_name: String,
189 pub s3_bucket_source: String,
190 pub input_format: String,
191 pub start_time: DateTime<Utc>,
192 pub end_time: DateTime<Utc>,
193 pub processed_item_count: i64,
194 pub processed_size_bytes: i64,
195}
196
197impl DynamoTable {
198 pub fn hash_key_name(&self) -> &str {
200 self.key_schema
201 .iter()
202 .find(|k| k.key_type == "HASH")
203 .map(|k| k.attribute_name.as_str())
204 .unwrap_or("")
205 }
206
207 pub fn range_key_name(&self) -> Option<&str> {
209 self.key_schema
210 .iter()
211 .find(|k| k.key_type == "RANGE")
212 .map(|k| k.attribute_name.as_str())
213 }
214
215 pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
217 let hash_key = self.hash_key_name();
218 let range_key = self.range_key_name();
219
220 self.items.iter().position(|item| {
221 let hash_match = match (item.get(hash_key), key.get(hash_key)) {
222 (Some(a), Some(b)) => a == b,
223 _ => false,
224 };
225 if !hash_match {
226 return false;
227 }
228 match range_key {
229 Some(rk) => match (item.get(rk), key.get(rk)) {
230 (Some(a), Some(b)) => a == b,
231 (None, None) => true,
232 _ => false,
233 },
234 None => true,
235 }
236 })
237 }
238
239 fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
241 let mut size: i64 = 0;
242 for (k, v) in item {
243 size += k.len() as i64;
244 size += Self::estimate_value_size(v);
245 }
246 size
247 }
248
249 fn estimate_value_size(v: &Value) -> i64 {
250 match v {
251 Value::Object(obj) => {
252 if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
253 s.len() as i64
254 } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
255 n.len() as i64
256 } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
257 1
258 } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
259 3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
260 } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
261 3 + m
262 .iter()
263 .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
264 .sum::<i64>()
265 } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
266 ss.iter()
267 .filter_map(|v| v.as_str())
268 .map(|s| s.len() as i64)
269 .sum()
270 } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
271 ns.iter()
272 .filter_map(|v| v.as_str())
273 .map(|s| s.len() as i64)
274 .sum()
275 } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
276 (b.len() as i64 * 3) / 4
278 } else {
279 v.to_string().len() as i64
280 }
281 }
282 _ => v.to_string().len() as i64,
283 }
284 }
285
286 pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
289 if self.contributor_insights_status != "ENABLED" {
290 return;
291 }
292 let hash_key = self.hash_key_name().to_string();
293 if let Some(pk_value) = key.get(&hash_key) {
294 let key_str = pk_value.to_string();
295 *self
296 .contributor_insights_counters
297 .entry(key_str)
298 .or_insert(0) += 1;
299 }
300 }
301
302 pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
304 if self.contributor_insights_status != "ENABLED" {
305 return;
306 }
307 let hash_key = self.hash_key_name().to_string();
308 if let Some(pk_value) = item.get(&hash_key) {
309 let key_str = pk_value.to_string();
310 *self
311 .contributor_insights_counters
312 .entry(key_str)
313 .or_insert(0) += 1;
314 }
315 }
316
317 pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
319 let mut entries: Vec<(&str, u64)> = self
320 .contributor_insights_counters
321 .iter()
322 .map(|(k, &v)| (k.as_str(), v))
323 .collect();
324 entries.sort_by(|a, b| b.1.cmp(&a.1));
325 entries.truncate(n);
326 entries
327 }
328
329 pub fn recalculate_stats(&mut self) {
331 self.item_count = self.items.len() as i64;
332 self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
333 }
334}
335
336pub struct DynamoDbState {
337 pub account_id: String,
338 pub region: String,
339 pub tables: HashMap<String, DynamoTable>,
340 pub backups: HashMap<String, BackupDescription>,
341 pub global_tables: HashMap<String, GlobalTableDescription>,
342 pub exports: HashMap<String, ExportDescription>,
343 pub imports: HashMap<String, ImportDescription>,
344}
345
346impl DynamoDbState {
347 pub fn new(account_id: &str, region: &str) -> Self {
348 Self {
349 account_id: account_id.to_string(),
350 region: region.to_string(),
351 tables: HashMap::new(),
352 backups: HashMap::new(),
353 global_tables: HashMap::new(),
354 exports: HashMap::new(),
355 imports: HashMap::new(),
356 }
357 }
358
359 pub fn reset(&mut self) {
360 self.tables.clear();
361 self.backups.clear();
362 self.global_tables.clear();
363 self.exports.clear();
364 self.imports.clear();
365 }
366}
367
368pub type SharedDynamoDbState = Arc<RwLock<DynamoDbState>>;