Skip to main content

panproto_protocols/database/
dynamodb.rs

1//! DynamoDB protocol definition.
2//!
3//! DynamoDB uses a constrained hypergraph schema theory
4//! (`colimit(ThHypergraph, ThConstraint)`) and a set-valued functor
5//! instance theory (`ThFunctor`).
6
7use std::collections::HashMap;
8use std::hash::BuildHasher;
9
10use panproto_gat::Theory;
11use panproto_schema::{EdgeRule, Protocol, Schema, SchemaBuilder};
12
13use crate::emit::{children_by_edge, constraint_value, find_roots};
14use crate::error::ProtocolError;
15use crate::theories;
16
17/// Returns the `DynamoDB` protocol definition.
18#[must_use]
19pub fn protocol() -> Protocol {
20    Protocol {
21        name: "dynamodb".into(),
22        schema_theory: "ThDynamoDBSchema".into(),
23        instance_theory: "ThDynamoDBInstance".into(),
24        edge_rules: edge_rules(),
25        obj_kinds: vec![
26            "table".into(),
27            "attribute".into(),
28            "string".into(),
29            "number".into(),
30            "binary".into(),
31            "gsi".into(),
32            "lsi".into(),
33        ],
34        constraint_sorts: vec![
35            "key-type".into(),
36            "projection-type".into(),
37            "read-capacity".into(),
38            "write-capacity".into(),
39        ],
40        nominal_identity: true,
41        ..Protocol::default()
42    }
43}
44
45/// Register the component GATs for `DynamoDB` with a theory registry.
46pub fn register_theories<S: BuildHasher>(registry: &mut HashMap<String, Theory, S>) {
47    theories::register_hypergraph_functor(registry, "ThDynamoDBSchema", "ThDynamoDBInstance");
48}
49
50/// Parse a `DynamoDB` `CreateTable` JSON into a [`Schema`].
51///
52/// # Errors
53///
54/// Returns [`ProtocolError`] if the JSON is invalid.
55pub fn parse_dynamodb(json: &serde_json::Value) -> Result<Schema, ProtocolError> {
56    let proto = protocol();
57    let mut builder = SchemaBuilder::new(&proto);
58
59    let table_name = json
60        .get("TableName")
61        .and_then(serde_json::Value::as_str)
62        .ok_or_else(|| ProtocolError::MissingField("TableName".into()))?;
63
64    builder = builder.vertex(table_name, "table", None)?;
65
66    let (b, sig) = parse_attributes(builder, json, table_name)?;
67    builder = b;
68    builder = parse_key_schema(builder, json, &sig);
69    builder = parse_throughput(builder, json, table_name);
70    builder = parse_gsis(builder, json, table_name)?;
71    builder = parse_lsis(builder, json, table_name)?;
72
73    if !sig.is_empty() {
74        builder = builder.hyper_edge("he_0", "table", sig, table_name)?;
75    }
76
77    let schema = builder.build()?;
78    Ok(schema)
79}
80
81fn parse_attributes(
82    mut builder: SchemaBuilder,
83    json: &serde_json::Value,
84    table_name: &str,
85) -> Result<(SchemaBuilder, HashMap<String, String>), ProtocolError> {
86    let mut attr_types: HashMap<String, String> = HashMap::new();
87    if let Some(attrs) = json
88        .get("AttributeDefinitions")
89        .and_then(serde_json::Value::as_array)
90    {
91        for attr in attrs {
92            let attr_name = attr
93                .get("AttributeName")
94                .and_then(serde_json::Value::as_str)
95                .unwrap_or("");
96            let attr_type = attr
97                .get("AttributeType")
98                .and_then(serde_json::Value::as_str)
99                .unwrap_or("S");
100            if !attr_name.is_empty() {
101                attr_types.insert(attr_name.to_string(), attr_type.to_string());
102            }
103        }
104    }
105
106    let mut sig = HashMap::new();
107    for (attr_name, attr_type) in &attr_types {
108        let attr_id = format!("{table_name}.{attr_name}");
109        let kind = dynamodb_type_to_kind(attr_type);
110        builder = builder.vertex(&attr_id, &kind, None)?;
111        builder = builder.edge(table_name, &attr_id, "prop", Some(attr_name))?;
112        sig.insert(attr_name.clone(), attr_id);
113    }
114
115    Ok((builder, sig))
116}
117
118fn parse_key_schema(
119    mut builder: SchemaBuilder,
120    json: &serde_json::Value,
121    sig: &HashMap<String, String>,
122) -> SchemaBuilder {
123    if let Some(keys) = json.get("KeySchema").and_then(serde_json::Value::as_array) {
124        for key in keys {
125            let key_name = key
126                .get("AttributeName")
127                .and_then(serde_json::Value::as_str)
128                .unwrap_or("");
129            let key_type = key
130                .get("KeyType")
131                .and_then(serde_json::Value::as_str)
132                .unwrap_or("HASH");
133            if let Some(attr_id) = sig.get(key_name) {
134                builder = builder.constraint(attr_id, "key-type", key_type);
135            }
136        }
137    }
138    builder
139}
140
141fn parse_throughput(
142    mut builder: SchemaBuilder,
143    json: &serde_json::Value,
144    table_name: &str,
145) -> SchemaBuilder {
146    if let Some(throughput) = json.get("ProvisionedThroughput") {
147        if let Some(rcu) = throughput
148            .get("ReadCapacityUnits")
149            .and_then(serde_json::Value::as_u64)
150        {
151            builder = builder.constraint(table_name, "read-capacity", &rcu.to_string());
152        }
153        if let Some(wcu) = throughput
154            .get("WriteCapacityUnits")
155            .and_then(serde_json::Value::as_u64)
156        {
157            builder = builder.constraint(table_name, "write-capacity", &wcu.to_string());
158        }
159    }
160    builder
161}
162
163fn parse_gsis(
164    mut builder: SchemaBuilder,
165    json: &serde_json::Value,
166    table_name: &str,
167) -> Result<SchemaBuilder, ProtocolError> {
168    if let Some(gsis) = json
169        .get("GlobalSecondaryIndexes")
170        .and_then(serde_json::Value::as_array)
171    {
172        for gsi in gsis {
173            let index_name = gsi
174                .get("IndexName")
175                .and_then(serde_json::Value::as_str)
176                .unwrap_or("unnamed_gsi");
177            let gsi_id = format!("{table_name}:gsi:{index_name}");
178            builder = builder.vertex(&gsi_id, "gsi", None)?;
179            builder = builder.edge(table_name, &gsi_id, "prop", Some(index_name))?;
180
181            if let Some(proj) = gsi.get("Projection") {
182                if let Some(proj_type) = proj
183                    .get("ProjectionType")
184                    .and_then(serde_json::Value::as_str)
185                {
186                    builder = builder.constraint(&gsi_id, "projection-type", proj_type);
187                }
188            }
189
190            if let Some(keys) = gsi.get("KeySchema").and_then(serde_json::Value::as_array) {
191                for key in keys {
192                    let key_name = key
193                        .get("AttributeName")
194                        .and_then(serde_json::Value::as_str)
195                        .unwrap_or("");
196                    let key_type = key
197                        .get("KeyType")
198                        .and_then(serde_json::Value::as_str)
199                        .unwrap_or("HASH");
200                    builder =
201                        builder.constraint(&gsi_id, "key-type", &format!("{key_name}:{key_type}"));
202                }
203            }
204        }
205    }
206    Ok(builder)
207}
208
209fn parse_lsis(
210    mut builder: SchemaBuilder,
211    json: &serde_json::Value,
212    table_name: &str,
213) -> Result<SchemaBuilder, ProtocolError> {
214    if let Some(lsis) = json
215        .get("LocalSecondaryIndexes")
216        .and_then(serde_json::Value::as_array)
217    {
218        for lsi in lsis {
219            let index_name = lsi
220                .get("IndexName")
221                .and_then(serde_json::Value::as_str)
222                .unwrap_or("unnamed_lsi");
223            let lsi_id = format!("{table_name}:lsi:{index_name}");
224            builder = builder.vertex(&lsi_id, "lsi", None)?;
225            builder = builder.edge(table_name, &lsi_id, "prop", Some(index_name))?;
226
227            if let Some(proj) = lsi.get("Projection") {
228                if let Some(proj_type) = proj
229                    .get("ProjectionType")
230                    .and_then(serde_json::Value::as_str)
231                {
232                    builder = builder.constraint(&lsi_id, "projection-type", proj_type);
233                }
234            }
235        }
236    }
237    Ok(builder)
238}
239
240/// Emit a [`Schema`] as `DynamoDB` `CreateTable` JSON.
241///
242/// # Errors
243///
244/// Returns [`ProtocolError::Emit`] if the schema cannot be serialized.
245pub fn emit_dynamodb(schema: &Schema) -> Result<serde_json::Value, ProtocolError> {
246    let tables: Vec<_> = find_roots(schema, &["prop"]);
247    let table = tables
248        .into_iter()
249        .find(|v| v.kind == "table")
250        .ok_or_else(|| ProtocolError::Emit("no table vertex found".into()))?;
251
252    let children = children_by_edge(schema, &table.id, "prop");
253
254    let mut attr_defs = Vec::new();
255    let mut key_schema = Vec::new();
256    let mut gsis = Vec::new();
257
258    for (edge, vertex) in &children {
259        let attr_name = edge.name.as_deref().unwrap_or(&vertex.id);
260        match vertex.kind.as_str() {
261            "gsi" => {
262                let mut gsi_obj = serde_json::json!({
263                    "IndexName": attr_name,
264                    "KeySchema": [],
265                    "Projection": { "ProjectionType": "ALL" }
266                });
267                if let Some(proj) = constraint_value(schema, &vertex.id, "projection-type") {
268                    gsi_obj["Projection"]["ProjectionType"] =
269                        serde_json::Value::String(proj.to_string());
270                }
271                gsis.push(gsi_obj);
272            }
273            "lsi" => {
274                // LSIs emitted similarly but we keep it simple.
275            }
276            _ => {
277                let ddb_type = kind_to_dynamodb_type(&vertex.kind);
278                attr_defs.push(serde_json::json!({
279                    "AttributeName": attr_name,
280                    "AttributeType": ddb_type
281                }));
282                if let Some(kt) = constraint_value(schema, &vertex.id, "key-type") {
283                    key_schema.push(serde_json::json!({
284                        "AttributeName": attr_name,
285                        "KeyType": kt
286                    }));
287                }
288            }
289        }
290    }
291
292    let mut result = serde_json::json!({
293        "TableName": table.id,
294        "AttributeDefinitions": attr_defs,
295        "KeySchema": key_schema
296    });
297
298    if let Some(rcu) = constraint_value(schema, &table.id, "read-capacity") {
299        if let Some(wcu) = constraint_value(schema, &table.id, "write-capacity") {
300            result["ProvisionedThroughput"] = serde_json::json!({
301                "ReadCapacityUnits": rcu.parse::<u64>().unwrap_or(5),
302                "WriteCapacityUnits": wcu.parse::<u64>().unwrap_or(5)
303            });
304        }
305    }
306
307    if !gsis.is_empty() {
308        result["GlobalSecondaryIndexes"] = serde_json::Value::Array(gsis);
309    }
310
311    Ok(result)
312}
313
314fn dynamodb_type_to_kind(ddb_type: &str) -> String {
315    match ddb_type {
316        "N" => "number",
317        "B" => "binary",
318        _ => "string",
319    }
320    .into()
321}
322
323fn kind_to_dynamodb_type(kind: &str) -> &'static str {
324    match kind {
325        "number" => "N",
326        "binary" => "B",
327        _ => "S",
328    }
329}
330
331fn edge_rules() -> Vec<EdgeRule> {
332    vec![
333        EdgeRule {
334            edge_kind: "prop".into(),
335            src_kinds: vec!["table".into()],
336            tgt_kinds: vec![],
337        },
338        EdgeRule {
339            edge_kind: "foreign-key".into(),
340            src_kinds: vec![],
341            tgt_kinds: vec![],
342        },
343    ]
344}
345
346#[cfg(test)]
347#[allow(clippy::expect_used, clippy::unwrap_used)]
348mod tests {
349    use super::*;
350
351    #[test]
352    fn protocol_creates_valid_definition() {
353        let p = protocol();
354        assert_eq!(p.name, "dynamodb");
355        assert_eq!(p.schema_theory, "ThDynamoDBSchema");
356        assert!(p.find_edge_rule("prop").is_some());
357    }
358
359    #[test]
360    fn register_theories_adds_correct_theories() {
361        let mut registry = HashMap::new();
362        register_theories(&mut registry);
363        assert!(registry.contains_key("ThDynamoDBSchema"));
364        assert!(registry.contains_key("ThDynamoDBInstance"));
365    }
366
367    #[test]
368    fn parse_simple_table() {
369        let json = serde_json::json!({
370            "TableName": "users",
371            "AttributeDefinitions": [
372                { "AttributeName": "user_id", "AttributeType": "S" },
373                { "AttributeName": "sort_key", "AttributeType": "N" }
374            ],
375            "KeySchema": [
376                { "AttributeName": "user_id", "KeyType": "HASH" },
377                { "AttributeName": "sort_key", "KeyType": "RANGE" }
378            ],
379            "ProvisionedThroughput": {
380                "ReadCapacityUnits": 10,
381                "WriteCapacityUnits": 5
382            }
383        });
384        let schema = parse_dynamodb(&json).expect("should parse");
385        assert!(schema.has_vertex("users"));
386        assert!(schema.has_vertex("users.user_id"));
387        assert_eq!(schema.vertices.get("users.user_id").unwrap().kind, "string");
388        assert_eq!(
389            schema.vertices.get("users.sort_key").unwrap().kind,
390            "number"
391        );
392    }
393
394    #[test]
395    fn parse_with_gsi() {
396        let json = serde_json::json!({
397            "TableName": "orders",
398            "AttributeDefinitions": [
399                { "AttributeName": "order_id", "AttributeType": "S" },
400                { "AttributeName": "customer_id", "AttributeType": "S" }
401            ],
402            "KeySchema": [
403                { "AttributeName": "order_id", "KeyType": "HASH" }
404            ],
405            "GlobalSecondaryIndexes": [{
406                "IndexName": "customer_index",
407                "KeySchema": [
408                    { "AttributeName": "customer_id", "KeyType": "HASH" }
409                ],
410                "Projection": { "ProjectionType": "ALL" }
411            }]
412        });
413        let schema = parse_dynamodb(&json).expect("should parse");
414        assert!(schema.has_vertex("orders:gsi:customer_index"));
415    }
416
417    #[test]
418    fn emit_roundtrip() {
419        let json = serde_json::json!({
420            "TableName": "items",
421            "AttributeDefinitions": [
422                { "AttributeName": "item_id", "AttributeType": "S" }
423            ],
424            "KeySchema": [
425                { "AttributeName": "item_id", "KeyType": "HASH" }
426            ]
427        });
428        let schema = parse_dynamodb(&json).expect("parse");
429        let emitted = emit_dynamodb(&schema).expect("emit");
430        assert_eq!(emitted["TableName"], "items");
431        assert!(
432            !emitted["AttributeDefinitions"]
433                .as_array()
434                .unwrap()
435                .is_empty()
436        );
437    }
438
439    #[test]
440    fn parse_missing_table_name_fails() {
441        let json = serde_json::json!({});
442        let result = parse_dynamodb(&json);
443        assert!(result.is_err());
444    }
445}