1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7pub type AttributeValue = Value;
10
11pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
14 let obj = av.as_object()?;
15 if obj.len() != 1 {
16 return None;
17 }
18 let (k, v) = obj.iter().next()?;
19 Some((k.as_str(), v))
20}
21
22#[derive(Debug, Clone)]
23pub struct KeySchemaElement {
24 pub attribute_name: String,
25 pub key_type: String, }
27
28#[derive(Debug, Clone)]
29pub struct AttributeDefinition {
30 pub attribute_name: String,
31 pub attribute_type: String, }
33
34#[derive(Debug, Clone)]
35pub struct ProvisionedThroughput {
36 pub read_capacity_units: i64,
37 pub write_capacity_units: i64,
38}
39
40#[derive(Debug, Clone)]
41pub struct GlobalSecondaryIndex {
42 pub index_name: String,
43 pub key_schema: Vec<KeySchemaElement>,
44 pub projection: Projection,
45 pub provisioned_throughput: Option<ProvisionedThroughput>,
46}
47
48#[derive(Debug, Clone)]
49pub struct LocalSecondaryIndex {
50 pub index_name: String,
51 pub key_schema: Vec<KeySchemaElement>,
52 pub projection: Projection,
53}
54
55#[derive(Debug, Clone)]
56pub struct Projection {
57 pub projection_type: String, pub non_key_attributes: Vec<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct DynamoTable {
63 pub name: String,
64 pub arn: String,
65 pub key_schema: Vec<KeySchemaElement>,
66 pub attribute_definitions: Vec<AttributeDefinition>,
67 pub provisioned_throughput: ProvisionedThroughput,
68 pub items: Vec<HashMap<String, AttributeValue>>,
69 pub gsi: Vec<GlobalSecondaryIndex>,
70 pub lsi: Vec<LocalSecondaryIndex>,
71 pub tags: HashMap<String, String>,
72 pub created_at: DateTime<Utc>,
73 pub status: String,
74 pub item_count: i64,
75 pub size_bytes: i64,
76 pub billing_mode: String, pub ttl_attribute: Option<String>,
78 pub ttl_enabled: bool,
79 pub resource_policy: Option<String>,
80 pub pitr_enabled: bool,
82 pub kinesis_destinations: Vec<KinesisDestination>,
84 pub contributor_insights_status: String,
86}
87
88#[derive(Debug, Clone)]
89pub struct KinesisDestination {
90 pub stream_arn: String,
91 pub destination_status: String,
92 pub approximate_creation_date_time_precision: String,
93}
94
95#[derive(Debug, Clone)]
96pub struct BackupDescription {
97 pub backup_arn: String,
98 pub backup_name: String,
99 pub table_name: String,
100 pub table_arn: String,
101 pub backup_status: String,
102 pub backup_type: String,
103 pub backup_creation_date: DateTime<Utc>,
104 pub key_schema: Vec<KeySchemaElement>,
105 pub attribute_definitions: Vec<AttributeDefinition>,
106 pub provisioned_throughput: ProvisionedThroughput,
107 pub billing_mode: String,
108 pub item_count: i64,
109 pub size_bytes: i64,
110}
111
112#[derive(Debug, Clone)]
113pub struct GlobalTableDescription {
114 pub global_table_name: String,
115 pub global_table_arn: String,
116 pub global_table_status: String,
117 pub creation_date: DateTime<Utc>,
118 pub replication_group: Vec<ReplicaDescription>,
119}
120
121#[derive(Debug, Clone)]
122pub struct ReplicaDescription {
123 pub region_name: String,
124 pub replica_status: String,
125}
126
127#[derive(Debug, Clone)]
128pub struct ExportDescription {
129 pub export_arn: String,
130 pub export_status: String,
131 pub table_arn: String,
132 pub s3_bucket: String,
133 pub s3_prefix: Option<String>,
134 pub export_format: String,
135 pub start_time: DateTime<Utc>,
136 pub end_time: DateTime<Utc>,
137 pub export_time: DateTime<Utc>,
138 pub item_count: i64,
139 pub billed_size_bytes: i64,
140}
141
142#[derive(Debug, Clone)]
143pub struct ImportDescription {
144 pub import_arn: String,
145 pub import_status: String,
146 pub table_arn: String,
147 pub table_name: String,
148 pub s3_bucket_source: String,
149 pub input_format: String,
150 pub start_time: DateTime<Utc>,
151 pub end_time: DateTime<Utc>,
152 pub processed_item_count: i64,
153 pub processed_size_bytes: i64,
154}
155
156impl DynamoTable {
157 pub fn hash_key_name(&self) -> &str {
159 self.key_schema
160 .iter()
161 .find(|k| k.key_type == "HASH")
162 .map(|k| k.attribute_name.as_str())
163 .unwrap_or("")
164 }
165
166 pub fn range_key_name(&self) -> Option<&str> {
168 self.key_schema
169 .iter()
170 .find(|k| k.key_type == "RANGE")
171 .map(|k| k.attribute_name.as_str())
172 }
173
174 pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
176 let hash_key = self.hash_key_name();
177 let range_key = self.range_key_name();
178
179 self.items.iter().position(|item| {
180 let hash_match = match (item.get(hash_key), key.get(hash_key)) {
181 (Some(a), Some(b)) => a == b,
182 _ => false,
183 };
184 if !hash_match {
185 return false;
186 }
187 match range_key {
188 Some(rk) => match (item.get(rk), key.get(rk)) {
189 (Some(a), Some(b)) => a == b,
190 (None, None) => true,
191 _ => false,
192 },
193 None => true,
194 }
195 })
196 }
197
198 fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
200 let mut size: i64 = 0;
201 for (k, v) in item {
202 size += k.len() as i64;
203 size += Self::estimate_value_size(v);
204 }
205 size
206 }
207
208 fn estimate_value_size(v: &Value) -> i64 {
209 match v {
210 Value::Object(obj) => {
211 if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
212 s.len() as i64
213 } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
214 n.len() as i64
215 } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
216 1
217 } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
218 3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
219 } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
220 3 + m
221 .iter()
222 .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
223 .sum::<i64>()
224 } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
225 ss.iter()
226 .filter_map(|v| v.as_str())
227 .map(|s| s.len() as i64)
228 .sum()
229 } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
230 ns.iter()
231 .filter_map(|v| v.as_str())
232 .map(|s| s.len() as i64)
233 .sum()
234 } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
235 (b.len() as i64 * 3) / 4
237 } else {
238 v.to_string().len() as i64
239 }
240 }
241 _ => v.to_string().len() as i64,
242 }
243 }
244
245 pub fn recalculate_stats(&mut self) {
247 self.item_count = self.items.len() as i64;
248 self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
249 }
250}
251
252pub struct DynamoDbState {
253 pub account_id: String,
254 pub region: String,
255 pub tables: HashMap<String, DynamoTable>,
256 pub backups: HashMap<String, BackupDescription>,
257 pub global_tables: HashMap<String, GlobalTableDescription>,
258 pub exports: HashMap<String, ExportDescription>,
259 pub imports: HashMap<String, ImportDescription>,
260}
261
262impl DynamoDbState {
263 pub fn new(account_id: &str, region: &str) -> Self {
264 Self {
265 account_id: account_id.to_string(),
266 region: region.to_string(),
267 tables: HashMap::new(),
268 backups: HashMap::new(),
269 global_tables: HashMap::new(),
270 exports: HashMap::new(),
271 imports: HashMap::new(),
272 }
273 }
274
275 pub fn reset(&mut self) {
276 self.tables.clear();
277 self.backups.clear();
278 self.global_tables.clear();
279 self.exports.clear();
280 self.imports.clear();
281 }
282}
283
284pub type SharedDynamoDbState = Arc<RwLock<DynamoDbState>>;