use crate::actions;
use crate::errors::DynoxideError;
use crate::storage_backend::StorageBackend;
pub const CONTRACT_VERSION: u32 = 1;
const UNSUPPORTED_TYPE: &str = "com.dynoxide.wasm#UnsupportedOperation";
pub const SUPPORTED_OPS: &[&str] = &[
"CreateTable",
"DeleteTable",
"DescribeTable",
"UpdateTable",
"ListTables",
"PutItem",
"GetItem",
"DeleteItem",
"UpdateItem",
"Query",
"Scan",
"BatchGetItem",
"BatchWriteItem",
"TransactGetItems",
];
fn unsupported_envelope(op: &str) -> String {
let message = if crate::dynamo_ops::is_known_operation(op) {
format!("Operation '{op}' is not supported by the wasm preview engine")
} else {
format!("Unknown operation: '{op}'")
};
serde_json::json!({ "__type": UNSUPPORTED_TYPE, "message": message }).to_string()
}
pub async fn dispatch<S: StorageBackend>(
backend: &S,
op: &str,
request_json: &str,
) -> std::result::Result<String, String> {
macro_rules! run {
($module:ident) => {{
let request = serde_json::from_str(request_json)
.map_err(|e| DynoxideError::SerializationException(e.to_string()).to_json())?;
let response = actions::$module::execute(backend, request)
.await
.map_err(|e| e.to_json())?;
serde_json::to_string(&response)
.map_err(|e| DynoxideError::InternalServerError(e.to_string()).to_json())
}};
}
match op {
"CreateTable" => run!(create_table),
"DeleteTable" => run!(delete_table),
"DescribeTable" => run!(describe_table),
"UpdateTable" => run!(update_table),
"ListTables" => run!(list_tables),
"PutItem" => run!(put_item),
"GetItem" => run!(get_item),
"DeleteItem" => run!(delete_item),
"UpdateItem" => run!(update_item),
"Query" => run!(query),
"Scan" => run!(scan),
"BatchGetItem" => run!(batch_get_item),
"BatchWriteItem" => run!(batch_write_item),
"TransactGetItems" => run!(transact_get_items),
other => Err(unsupported_envelope(other)),
}
}
#[cfg(feature = "wasm-sqlite")]
mod engine {
use super::{CONTRACT_VERSION, SUPPORTED_OPS, dispatch};
use crate::WasmDatabase;
use std::cell::RefCell;
use wasm_bindgen::prelude::*;
thread_local! {
static ENGINE: RefCell<Option<WasmDatabase>> = const { RefCell::new(None) };
}
fn boot_descriptor(persistence_mode: &str) -> String {
serde_json::json!({
"contractVersion": CONTRACT_VERSION,
"capabilities": SUPPORTED_OPS,
"persistenceMode": persistence_mode,
})
.to_string()
}
fn not_opened_envelope() -> String {
serde_json::json!({
"__type": "com.dynoxide.wasm#EngineNotOpened",
"message": "execute called before open(); call open(name) first",
})
.to_string()
}
#[wasm_bindgen]
pub async fn open(name: String, ephemeral: bool) -> Result<String, String> {
let db = WasmDatabase::open_with(&name, ephemeral)
.await
.map_err(|e| e.to_json())?;
let persistence_mode = db.persistence_mode().await;
let previous = ENGINE.with(|cell| cell.borrow_mut().replace(db));
if let Some(previous) = previous {
let _ = previous.close().await;
}
Ok(boot_descriptor(&persistence_mode))
}
#[wasm_bindgen]
pub async fn execute(op: String, request_json: String) -> Result<String, String> {
let db = ENGINE.with(|cell| cell.borrow().clone());
let Some(db) = db else {
return Err(not_opened_envelope());
};
let backend = db.backend().await;
dispatch(&*backend, &op, &request_json).await
}
#[wasm_bindgen]
pub fn capabilities() -> String {
serde_json::to_string(SUPPORTED_OPS).unwrap_or_else(|_| "[]".to_string())
}
#[wasm_bindgen]
pub fn contract_version() -> u32 {
CONTRACT_VERSION
}
}
#[cfg(all(test, feature = "native-sqlite"))]
mod tests {
use super::*;
use crate::storage::Storage;
fn run(backend: &Storage, op: &str, json: &str) -> std::result::Result<String, String> {
pollster::block_on(dispatch(backend, op, json))
}
const CREATE_MUSIC: &str = r#"{
"TableName": "Music",
"KeySchema": [
{"AttributeName": "artist", "KeyType": "HASH"},
{"AttributeName": "song", "KeyType": "RANGE"}
],
"AttributeDefinitions": [
{"AttributeName": "artist", "AttributeType": "S"},
{"AttributeName": "song", "AttributeType": "S"}
],
"BillingMode": "PAY_PER_REQUEST"
}"#;
fn seed_music(backend: &Storage) {
run(backend, "CreateTable", CREATE_MUSIC).expect("create table");
for (song, genre) in [("s1", "rock"), ("s2", "jazz"), ("s3", "rock")] {
let put = format!(
r#"{{"TableName":"Music","Item":{{"artist":{{"S":"a"}},"song":{{"S":"{song}"}},"genre":{{"S":"{genre}"}}}}}}"#
);
run(backend, "PutItem", &put).expect("put item");
}
}
#[test]
fn create_put_get_roundtrip() {
let backend = Storage::memory().unwrap();
run(&backend, "CreateTable", CREATE_MUSIC).unwrap();
let put = r#"{"TableName":"Music","Item":{"artist":{"S":"a"},"song":{"S":"s1"},"msg":{"S":"hi"}}}"#;
run(&backend, "PutItem", put).unwrap();
let get = r#"{"TableName":"Music","Key":{"artist":{"S":"a"},"song":{"S":"s1"}}}"#;
let resp = run(&backend, "GetItem", get).unwrap();
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert_eq!(v["Item"]["msg"]["S"], "hi");
}
#[test]
fn query_returns_count_and_items() {
let backend = Storage::memory().unwrap();
seed_music(&backend);
let query = r#"{"TableName":"Music","KeyConditionExpression":"artist = :a","ExpressionAttributeValues":{":a":{"S":"a"}}}"#;
let resp = run(&backend, "Query", query).unwrap();
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert_eq!(v["Count"], 3);
assert_eq!(v["Items"].as_array().unwrap().len(), 3);
}
#[test]
fn scan_with_filter_scans_more_than_it_counts() {
let backend = Storage::memory().unwrap();
seed_music(&backend);
let scan = r#"{"TableName":"Music","FilterExpression":"genre = :g","ExpressionAttributeValues":{":g":{"S":"rock"}}}"#;
let resp = run(&backend, "Scan", scan).unwrap();
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert_eq!(v["Count"], 2);
assert_eq!(v["ScannedCount"], 3);
assert!(v["ScannedCount"].as_u64() > v["Count"].as_u64());
}
#[test]
fn newly_wrapped_update_item_roundtrips() {
let backend = Storage::memory().unwrap();
seed_music(&backend);
let update = r#"{
"TableName": "Music",
"Key": {"artist": {"S": "a"}, "song": {"S": "s1"}},
"UpdateExpression": "SET plays = :p",
"ExpressionAttributeValues": {":p": {"N": "5"}},
"ReturnValues": "ALL_NEW"
}"#;
let resp = run(&backend, "UpdateItem", update).unwrap();
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert_eq!(v["Attributes"]["plays"]["N"], "5");
}
#[test]
fn batch_get_item_returns_seeded_items() {
let backend = Storage::memory().unwrap();
seed_music(&backend);
let batch_get = r#"{
"RequestItems": {
"Music": {
"Keys": [
{"artist": {"S": "a"}, "song": {"S": "s1"}},
{"artist": {"S": "a"}, "song": {"S": "s3"}}
]
}
}
}"#;
let resp = run(&backend, "BatchGetItem", batch_get).unwrap();
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
let items = v["Responses"]["Music"].as_array().unwrap();
assert_eq!(items.len(), 2);
let mut songs: Vec<&str> = items
.iter()
.map(|item| item["song"]["S"].as_str().unwrap())
.collect();
songs.sort_unstable();
assert_eq!(songs, ["s1", "s3"]);
assert!(v["UnprocessedKeys"].as_object().unwrap().is_empty());
}
#[test]
fn batch_write_item_puts_and_deletes_persist() {
let backend = Storage::memory().unwrap();
seed_music(&backend);
let batch_write = r#"{
"RequestItems": {
"Music": [
{"DeleteRequest": {"Key": {"artist": {"S": "a"}, "song": {"S": "s2"}}}},
{"PutRequest": {"Item": {"artist": {"S": "a"}, "song": {"S": "s4"}, "genre": {"S": "pop"}}}}
]
}
}"#;
let resp = run(&backend, "BatchWriteItem", batch_write).unwrap();
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(v["UnprocessedItems"].as_object().unwrap().is_empty());
let get_s2 = r#"{"TableName":"Music","Key":{"artist":{"S":"a"},"song":{"S":"s2"}}}"#;
let s2: serde_json::Value =
serde_json::from_str(&run(&backend, "GetItem", get_s2).unwrap()).unwrap();
assert!(s2.get("Item").is_none(), "s2 should have been deleted");
let get_s4 = r#"{"TableName":"Music","Key":{"artist":{"S":"a"},"song":{"S":"s4"}}}"#;
let s4: serde_json::Value =
serde_json::from_str(&run(&backend, "GetItem", get_s4).unwrap()).unwrap();
assert_eq!(s4["Item"]["genre"]["S"], "pop");
}
#[test]
fn transact_get_items_preserves_position_for_present_and_missing() {
let backend = Storage::memory().unwrap();
seed_music(&backend);
let transact_get = r#"{
"TransactItems": [
{"Get": {"TableName": "Music", "Key": {"artist": {"S": "a"}, "song": {"S": "s1"}}}},
{"Get": {"TableName": "Music", "Key": {"artist": {"S": "a"}, "song": {"S": "nope"}}}}
]
}"#;
let resp = run(&backend, "TransactGetItems", transact_get).unwrap();
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
let responses = v["Responses"].as_array().unwrap();
assert_eq!(responses.len(), 2);
assert_eq!(responses[0]["Item"]["genre"]["S"], "rock");
assert!(responses[1].get("Item").is_none());
}
#[test]
fn unknown_op_returns_envelope_not_panic() {
let backend = Storage::memory().unwrap();
let err = run(&backend, "FlyToTheMoon", "{}").unwrap_err();
let v: serde_json::Value = serde_json::from_str(&err).unwrap();
assert_eq!(v["__type"], "com.dynoxide.wasm#UnsupportedOperation");
assert!(v["message"].as_str().unwrap().contains("Unknown operation"));
}
#[test]
fn unsupported_preview_op_returns_envelope() {
let backend = Storage::memory().unwrap();
let err = run(&backend, "UpdateTimeToLive", "{}").unwrap_err();
let v: serde_json::Value = serde_json::from_str(&err).unwrap();
assert_eq!(v["__type"], "com.dynoxide.wasm#UnsupportedOperation");
assert!(v["message"].as_str().unwrap().contains("not supported"));
}
#[test]
fn conditional_check_failure_surfaces_in_envelope() {
let backend = Storage::memory().unwrap();
seed_music(&backend);
let put = r#"{
"TableName": "Music",
"Item": {"artist": {"S": "a"}, "song": {"S": "s1"}},
"ConditionExpression": "attribute_not_exists(artist)"
}"#;
let err = run(&backend, "PutItem", put).unwrap_err();
let v: serde_json::Value = serde_json::from_str(&err).unwrap();
assert!(
v["__type"]
.as_str()
.unwrap()
.contains("ConditionalCheckFailedException")
);
}
#[test]
fn malformed_request_json_is_a_serialization_error() {
let backend = Storage::memory().unwrap();
let err = run(&backend, "PutItem", "{ this is not json").unwrap_err();
let v: serde_json::Value = serde_json::from_str(&err).unwrap();
assert!(
v["__type"]
.as_str()
.unwrap()
.contains("SerializationException")
);
}
#[test]
fn contract_advertises_a_version_and_the_supported_ops() {
assert_eq!(CONTRACT_VERSION, 1);
assert!(SUPPORTED_OPS.contains(&"Query"));
assert!(SUPPORTED_OPS.contains(&"Scan"));
}
#[test]
fn update_table_adds_a_gsi_and_backfills_through_dispatch() {
let backend = Storage::memory().unwrap();
seed_music(&backend);
let update = r#"{
"TableName": "Music",
"AttributeDefinitions": [
{"AttributeName": "artist", "AttributeType": "S"},
{"AttributeName": "song", "AttributeType": "S"},
{"AttributeName": "genre", "AttributeType": "S"}
],
"GlobalSecondaryIndexUpdates": [
{"Create": {
"IndexName": "GenreIndex",
"KeySchema": [{"AttributeName": "genre", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"}
}}
]
}"#;
let resp = run(&backend, "UpdateTable", update).unwrap();
assert!(
resp.contains("GenreIndex"),
"the response should describe the new GSI"
);
let q = r#"{"TableName":"Music","IndexName":"GenreIndex","KeyConditionExpression":"genre = :g","ExpressionAttributeValues":{":g":{"S":"rock"}}}"#;
let qv: serde_json::Value =
serde_json::from_str(&run(&backend, "Query", q).unwrap()).unwrap();
assert_eq!(qv["Count"], 2);
assert!(SUPPORTED_OPS.contains(&"UpdateTable"));
}
}