panproto_protocols/database/
dynamodb.rs1use std::collections::HashMap;
8use std::hash::BuildHasher;
9
10use panproto_gat::Theory;
11use panproto_schema::{EdgeRule, Protocol, Schema, SchemaBuilder};
12
13use crate::emit::{children_by_edge, constraint_value, find_roots};
14use crate::error::ProtocolError;
15use crate::theories;
16
17#[must_use]
19pub fn protocol() -> Protocol {
20 Protocol {
21 name: "dynamodb".into(),
22 schema_theory: "ThDynamoDBSchema".into(),
23 instance_theory: "ThDynamoDBInstance".into(),
24 edge_rules: edge_rules(),
25 obj_kinds: vec![
26 "table".into(),
27 "attribute".into(),
28 "string".into(),
29 "number".into(),
30 "binary".into(),
31 "gsi".into(),
32 "lsi".into(),
33 ],
34 constraint_sorts: vec![
35 "key-type".into(),
36 "projection-type".into(),
37 "read-capacity".into(),
38 "write-capacity".into(),
39 ],
40 nominal_identity: true,
41 ..Protocol::default()
42 }
43}
44
45pub fn register_theories<S: BuildHasher>(registry: &mut HashMap<String, Theory, S>) {
47 theories::register_hypergraph_functor(registry, "ThDynamoDBSchema", "ThDynamoDBInstance");
48}
49
50pub fn parse_dynamodb(json: &serde_json::Value) -> Result<Schema, ProtocolError> {
56 let proto = protocol();
57 let mut builder = SchemaBuilder::new(&proto);
58
59 let table_name = json
60 .get("TableName")
61 .and_then(serde_json::Value::as_str)
62 .ok_or_else(|| ProtocolError::MissingField("TableName".into()))?;
63
64 builder = builder.vertex(table_name, "table", None)?;
65
66 let (b, sig) = parse_attributes(builder, json, table_name)?;
67 builder = b;
68 builder = parse_key_schema(builder, json, &sig);
69 builder = parse_throughput(builder, json, table_name);
70 builder = parse_gsis(builder, json, table_name)?;
71 builder = parse_lsis(builder, json, table_name)?;
72
73 if !sig.is_empty() {
74 builder = builder.hyper_edge("he_0", "table", sig, table_name)?;
75 }
76
77 let schema = builder.build()?;
78 Ok(schema)
79}
80
81fn parse_attributes(
82 mut builder: SchemaBuilder,
83 json: &serde_json::Value,
84 table_name: &str,
85) -> Result<(SchemaBuilder, HashMap<String, String>), ProtocolError> {
86 let mut attr_types: HashMap<String, String> = HashMap::new();
87 if let Some(attrs) = json
88 .get("AttributeDefinitions")
89 .and_then(serde_json::Value::as_array)
90 {
91 for attr in attrs {
92 let attr_name = attr
93 .get("AttributeName")
94 .and_then(serde_json::Value::as_str)
95 .unwrap_or("");
96 let attr_type = attr
97 .get("AttributeType")
98 .and_then(serde_json::Value::as_str)
99 .unwrap_or("S");
100 if !attr_name.is_empty() {
101 attr_types.insert(attr_name.to_string(), attr_type.to_string());
102 }
103 }
104 }
105
106 let mut sig = HashMap::new();
107 for (attr_name, attr_type) in &attr_types {
108 let attr_id = format!("{table_name}.{attr_name}");
109 let kind = dynamodb_type_to_kind(attr_type);
110 builder = builder.vertex(&attr_id, &kind, None)?;
111 builder = builder.edge(table_name, &attr_id, "prop", Some(attr_name))?;
112 sig.insert(attr_name.clone(), attr_id);
113 }
114
115 Ok((builder, sig))
116}
117
118fn parse_key_schema(
119 mut builder: SchemaBuilder,
120 json: &serde_json::Value,
121 sig: &HashMap<String, String>,
122) -> SchemaBuilder {
123 if let Some(keys) = json.get("KeySchema").and_then(serde_json::Value::as_array) {
124 for key in keys {
125 let key_name = key
126 .get("AttributeName")
127 .and_then(serde_json::Value::as_str)
128 .unwrap_or("");
129 let key_type = key
130 .get("KeyType")
131 .and_then(serde_json::Value::as_str)
132 .unwrap_or("HASH");
133 if let Some(attr_id) = sig.get(key_name) {
134 builder = builder.constraint(attr_id, "key-type", key_type);
135 }
136 }
137 }
138 builder
139}
140
141fn parse_throughput(
142 mut builder: SchemaBuilder,
143 json: &serde_json::Value,
144 table_name: &str,
145) -> SchemaBuilder {
146 if let Some(throughput) = json.get("ProvisionedThroughput") {
147 if let Some(rcu) = throughput
148 .get("ReadCapacityUnits")
149 .and_then(serde_json::Value::as_u64)
150 {
151 builder = builder.constraint(table_name, "read-capacity", &rcu.to_string());
152 }
153 if let Some(wcu) = throughput
154 .get("WriteCapacityUnits")
155 .and_then(serde_json::Value::as_u64)
156 {
157 builder = builder.constraint(table_name, "write-capacity", &wcu.to_string());
158 }
159 }
160 builder
161}
162
163fn parse_gsis(
164 mut builder: SchemaBuilder,
165 json: &serde_json::Value,
166 table_name: &str,
167) -> Result<SchemaBuilder, ProtocolError> {
168 if let Some(gsis) = json
169 .get("GlobalSecondaryIndexes")
170 .and_then(serde_json::Value::as_array)
171 {
172 for gsi in gsis {
173 let index_name = gsi
174 .get("IndexName")
175 .and_then(serde_json::Value::as_str)
176 .unwrap_or("unnamed_gsi");
177 let gsi_id = format!("{table_name}:gsi:{index_name}");
178 builder = builder.vertex(&gsi_id, "gsi", None)?;
179 builder = builder.edge(table_name, &gsi_id, "prop", Some(index_name))?;
180
181 if let Some(proj) = gsi.get("Projection") {
182 if let Some(proj_type) = proj
183 .get("ProjectionType")
184 .and_then(serde_json::Value::as_str)
185 {
186 builder = builder.constraint(&gsi_id, "projection-type", proj_type);
187 }
188 }
189
190 if let Some(keys) = gsi.get("KeySchema").and_then(serde_json::Value::as_array) {
191 for key in keys {
192 let key_name = key
193 .get("AttributeName")
194 .and_then(serde_json::Value::as_str)
195 .unwrap_or("");
196 let key_type = key
197 .get("KeyType")
198 .and_then(serde_json::Value::as_str)
199 .unwrap_or("HASH");
200 builder =
201 builder.constraint(&gsi_id, "key-type", &format!("{key_name}:{key_type}"));
202 }
203 }
204 }
205 }
206 Ok(builder)
207}
208
209fn parse_lsis(
210 mut builder: SchemaBuilder,
211 json: &serde_json::Value,
212 table_name: &str,
213) -> Result<SchemaBuilder, ProtocolError> {
214 if let Some(lsis) = json
215 .get("LocalSecondaryIndexes")
216 .and_then(serde_json::Value::as_array)
217 {
218 for lsi in lsis {
219 let index_name = lsi
220 .get("IndexName")
221 .and_then(serde_json::Value::as_str)
222 .unwrap_or("unnamed_lsi");
223 let lsi_id = format!("{table_name}:lsi:{index_name}");
224 builder = builder.vertex(&lsi_id, "lsi", None)?;
225 builder = builder.edge(table_name, &lsi_id, "prop", Some(index_name))?;
226
227 if let Some(proj) = lsi.get("Projection") {
228 if let Some(proj_type) = proj
229 .get("ProjectionType")
230 .and_then(serde_json::Value::as_str)
231 {
232 builder = builder.constraint(&lsi_id, "projection-type", proj_type);
233 }
234 }
235 }
236 }
237 Ok(builder)
238}
239
240pub fn emit_dynamodb(schema: &Schema) -> Result<serde_json::Value, ProtocolError> {
246 let tables: Vec<_> = find_roots(schema, &["prop"]);
247 let table = tables
248 .into_iter()
249 .find(|v| v.kind == "table")
250 .ok_or_else(|| ProtocolError::Emit("no table vertex found".into()))?;
251
252 let children = children_by_edge(schema, &table.id, "prop");
253
254 let mut attr_defs = Vec::new();
255 let mut key_schema = Vec::new();
256 let mut gsis = Vec::new();
257
258 for (edge, vertex) in &children {
259 let attr_name = edge.name.as_deref().unwrap_or(&vertex.id);
260 match vertex.kind.as_str() {
261 "gsi" => {
262 let mut gsi_obj = serde_json::json!({
263 "IndexName": attr_name,
264 "KeySchema": [],
265 "Projection": { "ProjectionType": "ALL" }
266 });
267 if let Some(proj) = constraint_value(schema, &vertex.id, "projection-type") {
268 gsi_obj["Projection"]["ProjectionType"] =
269 serde_json::Value::String(proj.to_string());
270 }
271 gsis.push(gsi_obj);
272 }
273 "lsi" => {
274 }
276 _ => {
277 let ddb_type = kind_to_dynamodb_type(&vertex.kind);
278 attr_defs.push(serde_json::json!({
279 "AttributeName": attr_name,
280 "AttributeType": ddb_type
281 }));
282 if let Some(kt) = constraint_value(schema, &vertex.id, "key-type") {
283 key_schema.push(serde_json::json!({
284 "AttributeName": attr_name,
285 "KeyType": kt
286 }));
287 }
288 }
289 }
290 }
291
292 let mut result = serde_json::json!({
293 "TableName": table.id,
294 "AttributeDefinitions": attr_defs,
295 "KeySchema": key_schema
296 });
297
298 if let Some(rcu) = constraint_value(schema, &table.id, "read-capacity") {
299 if let Some(wcu) = constraint_value(schema, &table.id, "write-capacity") {
300 result["ProvisionedThroughput"] = serde_json::json!({
301 "ReadCapacityUnits": rcu.parse::<u64>().unwrap_or(5),
302 "WriteCapacityUnits": wcu.parse::<u64>().unwrap_or(5)
303 });
304 }
305 }
306
307 if !gsis.is_empty() {
308 result["GlobalSecondaryIndexes"] = serde_json::Value::Array(gsis);
309 }
310
311 Ok(result)
312}
313
314fn dynamodb_type_to_kind(ddb_type: &str) -> String {
315 match ddb_type {
316 "N" => "number",
317 "B" => "binary",
318 _ => "string",
319 }
320 .into()
321}
322
323fn kind_to_dynamodb_type(kind: &str) -> &'static str {
324 match kind {
325 "number" => "N",
326 "binary" => "B",
327 _ => "S",
328 }
329}
330
331fn edge_rules() -> Vec<EdgeRule> {
332 vec![
333 EdgeRule {
334 edge_kind: "prop".into(),
335 src_kinds: vec!["table".into()],
336 tgt_kinds: vec![],
337 },
338 EdgeRule {
339 edge_kind: "foreign-key".into(),
340 src_kinds: vec![],
341 tgt_kinds: vec![],
342 },
343 ]
344}
345
346#[cfg(test)]
347#[allow(clippy::expect_used, clippy::unwrap_used)]
348mod tests {
349 use super::*;
350
351 #[test]
352 fn protocol_creates_valid_definition() {
353 let p = protocol();
354 assert_eq!(p.name, "dynamodb");
355 assert_eq!(p.schema_theory, "ThDynamoDBSchema");
356 assert!(p.find_edge_rule("prop").is_some());
357 }
358
359 #[test]
360 fn register_theories_adds_correct_theories() {
361 let mut registry = HashMap::new();
362 register_theories(&mut registry);
363 assert!(registry.contains_key("ThDynamoDBSchema"));
364 assert!(registry.contains_key("ThDynamoDBInstance"));
365 }
366
367 #[test]
368 fn parse_simple_table() {
369 let json = serde_json::json!({
370 "TableName": "users",
371 "AttributeDefinitions": [
372 { "AttributeName": "user_id", "AttributeType": "S" },
373 { "AttributeName": "sort_key", "AttributeType": "N" }
374 ],
375 "KeySchema": [
376 { "AttributeName": "user_id", "KeyType": "HASH" },
377 { "AttributeName": "sort_key", "KeyType": "RANGE" }
378 ],
379 "ProvisionedThroughput": {
380 "ReadCapacityUnits": 10,
381 "WriteCapacityUnits": 5
382 }
383 });
384 let schema = parse_dynamodb(&json).expect("should parse");
385 assert!(schema.has_vertex("users"));
386 assert!(schema.has_vertex("users.user_id"));
387 assert_eq!(schema.vertices.get("users.user_id").unwrap().kind, "string");
388 assert_eq!(
389 schema.vertices.get("users.sort_key").unwrap().kind,
390 "number"
391 );
392 }
393
394 #[test]
395 fn parse_with_gsi() {
396 let json = serde_json::json!({
397 "TableName": "orders",
398 "AttributeDefinitions": [
399 { "AttributeName": "order_id", "AttributeType": "S" },
400 { "AttributeName": "customer_id", "AttributeType": "S" }
401 ],
402 "KeySchema": [
403 { "AttributeName": "order_id", "KeyType": "HASH" }
404 ],
405 "GlobalSecondaryIndexes": [{
406 "IndexName": "customer_index",
407 "KeySchema": [
408 { "AttributeName": "customer_id", "KeyType": "HASH" }
409 ],
410 "Projection": { "ProjectionType": "ALL" }
411 }]
412 });
413 let schema = parse_dynamodb(&json).expect("should parse");
414 assert!(schema.has_vertex("orders:gsi:customer_index"));
415 }
416
417 #[test]
418 fn emit_roundtrip() {
419 let json = serde_json::json!({
420 "TableName": "items",
421 "AttributeDefinitions": [
422 { "AttributeName": "item_id", "AttributeType": "S" }
423 ],
424 "KeySchema": [
425 { "AttributeName": "item_id", "KeyType": "HASH" }
426 ]
427 });
428 let schema = parse_dynamodb(&json).expect("parse");
429 let emitted = emit_dynamodb(&schema).expect("emit");
430 assert_eq!(emitted["TableName"], "items");
431 assert!(
432 !emitted["AttributeDefinitions"]
433 .as_array()
434 .unwrap()
435 .is_empty()
436 );
437 }
438
439 #[test]
440 fn parse_missing_table_name_fails() {
441 let json = serde_json::json!({});
442 let result = parse_dynamodb(&json);
443 assert!(result.is_err());
444 }
445}