#[cfg(all(feature = "native-sqlite", feature = "_has-encryption"))]
compile_error!(
"Features `native-sqlite` and `encryption`/`encryption-cc` are mutually exclusive.\n\
If you ran `cargo install`, use:\n \
cargo install dynoxide-rs --no-default-features --features encrypted-server\n\
If using as a library dependency, set `default-features = false` \
and enable only one backend."
);
#[cfg(all(feature = "encryption", feature = "encryption-cc"))]
compile_error!(
"Features `encryption` and `encryption-cc` are mutually exclusive. \
Use `encryption` for vendored OpenSSL or `encryption-cc` for Apple CommonCrypto."
);
#[cfg(all(feature = "encryption-cc", not(target_vendor = "apple")))]
compile_error!(
"The `encryption-cc` feature is intended for Apple platforms only (CommonCrypto). \
Use the `encryption` feature for vendored OpenSSL on non-Apple platforms."
);
#[cfg(not(any(
feature = "native-sqlite",
feature = "_has-encryption",
feature = "wasm-sqlite"
)))]
compile_error!(
"A storage backend feature must be enabled: `native-sqlite`, `encryption`, \
`encryption-cc`, or `wasm-sqlite`. Default features include `native-sqlite`. \
If you used `default-features = false`, add one of these features."
);
pub mod actions;
pub mod errors;
pub mod expressions;
#[cfg(feature = "import")]
pub mod import;
#[doc(hidden)]
pub mod macros;
#[cfg(feature = "mcp-server")]
pub mod mcp;
pub mod partiql;
pub mod schema;
#[cfg(feature = "http-server")]
pub mod server;
#[cfg(feature = "mcp-server")]
pub(crate) mod snapshots;
pub mod storage;
pub mod storage_backend;
pub mod streams;
pub mod ttl;
pub mod types;
pub mod validation;
#[cfg(any(feature = "http-server", feature = "wasm-sqlite", test))]
pub(crate) mod dynamo_ops;
#[cfg(any(feature = "wasm-sqlite", test))]
pub mod wasm_api;
#[cfg(feature = "wasm-harness")]
pub mod wasm_harness;
#[doc(hidden)]
pub use macros::ItemInsert;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use web_time::Instant;
pub use errors::{DynoxideError, Result};
pub use storage::{DatabaseInfo, TableInfoEntry, TableMetadata, TableStats};
pub use storage_backend::BackendError;
#[cfg(feature = "wasm-sqlite")]
pub use storage_backend::WasmBridgeBackend;
pub use types::{AttributeValue, ConversionError, Item};
#[derive(Debug, Clone, Default)]
pub struct ImportOptions {
pub record_streams: bool,
pub set_cached_at: bool,
}
#[derive(Debug, Clone)]
pub struct ImportResult {
pub items_imported: usize,
pub bytes_imported: usize,
}
type TokenCache = HashMap<
String,
(
Instant,
u64,
actions::transact_write_items::TransactWriteItemsResponse,
),
>;
#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
pub type RusqliteBackend = storage::Storage;
#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
pub type NativeDatabase = Database<RusqliteBackend>;
#[cfg(feature = "wasm-sqlite")]
pub type WasmDatabase = Database<WasmBridgeBackend>;
#[cfg(feature = "wasm-sqlite")]
pub const WASM_PREVIEW: bool = true;
#[cfg(not(feature = "wasm-sqlite"))]
pub const WASM_PREVIEW: bool = false;
#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
pub struct Database<S = RusqliteBackend> {
inner: Arc<Mutex<S>>,
idempotency_tokens: Arc<Mutex<TokenCache>>,
}
#[cfg(all(
not(any(feature = "native-sqlite", feature = "_has-encryption")),
feature = "wasm-sqlite"
))]
use async_lock::Mutex as BackendMutex;
#[cfg(all(
not(any(feature = "native-sqlite", feature = "_has-encryption")),
not(feature = "wasm-sqlite")
))]
use std::sync::Mutex as BackendMutex;
#[cfg(not(any(feature = "native-sqlite", feature = "_has-encryption")))]
pub struct Database<S> {
inner: Arc<BackendMutex<S>>,
idempotency_tokens: Arc<Mutex<TokenCache>>,
}
impl<S> Clone for Database<S> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
idempotency_tokens: Arc::clone(&self.idempotency_tokens),
}
}
}
#[cfg(any(feature = "native-sqlite", feature = "_has-encryption"))]
impl Database<RusqliteBackend> {
pub fn new(path: &str) -> Result<Self> {
let storage = storage::Storage::new(path)?;
Ok(Self {
inner: Arc::new(Mutex::new(storage)),
idempotency_tokens: Arc::new(Mutex::new(HashMap::new())),
})
}
#[cfg(feature = "_has-encryption")]
pub fn new_encrypted(path: &str, key: &str) -> Result<Self> {
if key.len() != 64 || !key.bytes().all(|b| b.is_ascii_hexdigit()) {
return Err(DynoxideError::ValidationException(
"Encryption key must be a 64-character hex string (32 bytes)".to_string(),
));
}
let storage = storage::Storage::new_encrypted(path, key)?;
Ok(Self {
inner: Arc::new(Mutex::new(storage)),
idempotency_tokens: Arc::new(Mutex::new(HashMap::new())),
})
}
pub fn memory() -> Result<Self> {
let storage = storage::Storage::memory()?;
Ok(Self {
inner: Arc::new(Mutex::new(storage)),
idempotency_tokens: Arc::new(Mutex::new(HashMap::new())),
})
}
pub(crate) fn with_storage<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(&storage::Storage) -> Result<T>,
{
let guard = self
.inner
.lock()
.map_err(|e| DynoxideError::InternalServerError(format!("Lock poisoned: {e}")))?;
f(&guard)
}
pub(crate) fn with_storage_mut<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut storage::Storage) -> Result<T>,
{
let mut guard = self
.inner
.lock()
.map_err(|e| DynoxideError::InternalServerError(format!("Lock poisoned: {e}")))?;
f(&mut guard)
}
pub fn create_table(
&self,
request: actions::create_table::CreateTableRequest,
) -> Result<actions::create_table::CreateTableResponse> {
self.with_storage(|s| pollster::block_on(actions::create_table::execute(s, request)))
}
pub fn delete_table(
&self,
request: actions::delete_table::DeleteTableRequest,
) -> Result<actions::delete_table::DeleteTableResponse> {
self.with_storage(|s| pollster::block_on(actions::delete_table::execute(s, request)))
}
pub fn describe_table(
&self,
request: actions::describe_table::DescribeTableRequest,
) -> Result<actions::describe_table::DescribeTableResponse> {
self.with_storage(|s| pollster::block_on(actions::describe_table::execute(s, request)))
}
pub fn update_table(
&self,
request: actions::update_table::UpdateTableRequest,
) -> Result<actions::update_table::UpdateTableResponse> {
self.with_storage(|s| pollster::block_on(actions::update_table::execute(s, request)))
}
pub fn list_tables(
&self,
request: actions::list_tables::ListTablesRequest,
) -> Result<actions::list_tables::ListTablesResponse> {
self.with_storage(|s| pollster::block_on(actions::list_tables::execute(s, request)))
}
pub fn tag_resource(
&self,
request: actions::tag_resource::TagResourceRequest,
) -> Result<actions::tag_resource::TagResourceResponse> {
self.with_storage(|s| pollster::block_on(actions::tag_resource::execute(s, request)))
}
pub fn untag_resource(
&self,
request: actions::untag_resource::UntagResourceRequest,
) -> Result<actions::untag_resource::UntagResourceResponse> {
self.with_storage(|s| pollster::block_on(actions::untag_resource::execute(s, request)))
}
pub fn list_tags_of_resource(
&self,
request: actions::list_tags_of_resource::ListTagsOfResourceRequest,
) -> Result<actions::list_tags_of_resource::ListTagsOfResourceResponse> {
self.with_storage(|s| {
pollster::block_on(actions::list_tags_of_resource::execute(s, request))
})
}
pub fn put_item(
&self,
request: actions::put_item::PutItemRequest,
) -> Result<actions::put_item::PutItemResponse> {
self.with_storage(|s| pollster::block_on(actions::put_item::execute(s, request)))
}
pub fn get_item(
&self,
request: actions::get_item::GetItemRequest,
) -> Result<actions::get_item::GetItemResponse> {
self.with_storage(|s| pollster::block_on(actions::get_item::execute(s, request)))
}
pub fn delete_item(
&self,
request: actions::delete_item::DeleteItemRequest,
) -> Result<actions::delete_item::DeleteItemResponse> {
self.with_storage(|s| pollster::block_on(actions::delete_item::execute(s, request)))
}
pub fn update_item(
&self,
request: actions::update_item::UpdateItemRequest,
) -> Result<actions::update_item::UpdateItemResponse> {
self.with_storage(|s| pollster::block_on(actions::update_item::execute(s, request)))
}
pub fn batch_get_item(
&self,
request: actions::batch_get_item::BatchGetItemRequest,
) -> Result<actions::batch_get_item::BatchGetItemResponse> {
self.with_storage(|s| pollster::block_on(actions::batch_get_item::execute(s, request)))
}
pub fn batch_write_item(
&self,
request: actions::batch_write_item::BatchWriteItemRequest,
) -> Result<actions::batch_write_item::BatchWriteItemResponse> {
self.with_storage(|s| pollster::block_on(actions::batch_write_item::execute(s, request)))
}
pub fn import_items(
&self,
table_name: &str,
items: Vec<Item>,
options: ImportOptions,
) -> Result<ImportResult> {
self.with_storage(|s| {
pollster::block_on(actions::import_items::execute(
s, table_name, items, &options,
))
})
}
#[cfg(feature = "import")]
pub(crate) fn import_items_fresh(
&self,
table_name: &str,
items: Vec<Item>,
options: ImportOptions,
) -> Result<ImportResult> {
self.with_storage(|s| {
pollster::block_on(actions::import_items::execute_skip_gsi_deletes(
s, table_name, items, &options,
))
})
}
pub fn enable_bulk_loading(&self) -> Result<()> {
self.with_storage(|s| s.enable_bulk_loading())
}
pub fn disable_bulk_loading(&self) -> Result<()> {
self.with_storage(|s| s.disable_bulk_loading())
}
pub fn query(
&self,
request: actions::query::QueryRequest,
) -> Result<actions::query::QueryResponse> {
self.with_storage(|s| pollster::block_on(actions::query::execute(s, request)))
}
pub fn scan(&self, request: actions::scan::ScanRequest) -> Result<actions::scan::ScanResponse> {
self.with_storage(|s| pollster::block_on(actions::scan::execute(s, request)))
}
pub fn transact_write_items(
&self,
request: actions::transact_write_items::TransactWriteItemsRequest,
) -> Result<actions::transact_write_items::TransactWriteItemsResponse> {
const TOKEN_EXPIRY_SECS: u64 = 600; const MAX_TOKEN_LEN: usize = 36;
if let Some(ref token) = request.client_request_token {
if token.len() > MAX_TOKEN_LEN {
return Err(DynoxideError::ValidationException(format!(
"1 validation error detected: Value '{}' at 'clientRequestToken' failed to satisfy constraint: Member must have length less than or equal to {}",
token, MAX_TOKEN_LEN
)));
}
}
let request_hash = if request.client_request_token.is_some() {
use std::hash::{Hash, Hasher};
let normalised = serde_json::to_value(&request.transact_items)
.and_then(|v| serde_json::to_vec(&v))
.unwrap_or_default();
let mut hasher = std::collections::hash_map::DefaultHasher::new();
normalised.hash(&mut hasher);
hasher.finish()
} else {
0
};
if let Some(ref token) = request.client_request_token {
let mut cache = self
.idempotency_tokens
.lock()
.map_err(|e| DynoxideError::InternalServerError(format!("Lock poisoned: {e}")))?;
cache.retain(|_, (ts, _, _)| ts.elapsed().as_secs() < TOKEN_EXPIRY_SECS);
if let Some((_, cached_hash, resp)) = cache.get(token) {
if *cached_hash != request_hash {
return Err(DynoxideError::IdempotentParameterMismatchException(
"An error occurred (IdempotentParameterMismatchException)".to_string(),
));
}
return Ok(resp.clone());
}
}
let resp = self.with_storage(|s| {
pollster::block_on(actions::transact_write_items::execute(s, request.clone()))
})?;
if let Some(ref token) = request.client_request_token {
if let Ok(mut cache) = self.idempotency_tokens.lock() {
cache.insert(token.clone(), (Instant::now(), request_hash, resp.clone()));
}
}
Ok(resp)
}
pub fn transact_get_items(
&self,
request: actions::transact_get_items::TransactGetItemsRequest,
) -> Result<actions::transact_get_items::TransactGetItemsResponse> {
self.with_storage(|s| pollster::block_on(actions::transact_get_items::execute(s, request)))
}
pub fn list_streams(
&self,
request: actions::list_streams::ListStreamsRequest,
) -> Result<actions::list_streams::ListStreamsResponse> {
self.with_storage(|s| pollster::block_on(actions::list_streams::execute(s, request)))
}
pub fn describe_stream(
&self,
request: actions::describe_stream::DescribeStreamRequest,
) -> Result<actions::describe_stream::DescribeStreamResponse> {
self.with_storage(|s| pollster::block_on(actions::describe_stream::execute(s, request)))
}
pub fn get_shard_iterator(
&self,
request: actions::get_shard_iterator::GetShardIteratorRequest,
) -> Result<actions::get_shard_iterator::GetShardIteratorResponse> {
self.with_storage(|s| pollster::block_on(actions::get_shard_iterator::execute(s, request)))
}
pub fn get_records(
&self,
request: actions::get_records::GetRecordsRequest,
) -> Result<actions::get_records::GetRecordsResponse> {
self.with_storage(|s| pollster::block_on(actions::get_records::execute(s, request)))
}
pub fn update_time_to_live(
&self,
request: actions::update_time_to_live::UpdateTimeToLiveRequest,
) -> Result<actions::update_time_to_live::UpdateTimeToLiveResponse> {
self.with_storage(|s| pollster::block_on(actions::update_time_to_live::execute(s, request)))
}
pub fn describe_time_to_live(
&self,
request: actions::describe_time_to_live::DescribeTimeToLiveRequest,
) -> Result<actions::describe_time_to_live::DescribeTimeToLiveResponse> {
self.with_storage(|s| {
pollster::block_on(actions::describe_time_to_live::execute(s, request))
})
}
pub fn sweep_ttl(&self) -> Result<usize> {
self.with_storage(|s| pollster::block_on(ttl::sweep_expired_items(s)))
}
pub fn execute_statement(
&self,
request: actions::execute_statement::ExecuteStatementRequest,
) -> Result<actions::execute_statement::ExecuteStatementResponse> {
self.with_storage(|s| pollster::block_on(actions::execute_statement::execute(s, request)))
}
pub fn execute_transaction(
&self,
request: actions::execute_transaction::ExecuteTransactionRequest,
) -> Result<actions::execute_transaction::ExecuteTransactionResponse> {
self.with_storage(|s| pollster::block_on(actions::execute_transaction::execute(s, request)))
}
pub fn batch_execute_statement(
&self,
request: actions::batch_execute_statement::BatchExecuteStatementRequest,
) -> Result<actions::batch_execute_statement::BatchExecuteStatementResponse> {
self.with_storage(|s| {
pollster::block_on(actions::batch_execute_statement::execute(s, request))
})
}
pub fn touch_cached_at(
&self,
table_name: &str,
pk: &str,
sk: &str,
timestamp: f64,
) -> Result<()> {
self.with_storage(|s| s.touch_cached_at(table_name, pk, sk, timestamp))
}
pub fn get_lru_items(
&self,
table_name: &str,
limit: usize,
) -> Result<Vec<(String, String, i64)>> {
self.with_storage(|s| s.get_lru_items(table_name, limit))
}
pub fn db_path(&self) -> Result<Option<String>> {
self.with_storage(|s| Ok(s.db_path()))
}
pub fn db_size_bytes(&self) -> Result<u64> {
self.with_storage(|s| s.db_size_bytes())
}
pub fn table_count(&self) -> Result<usize> {
self.with_storage(|s| s.table_count())
}
pub fn table_stats(&self) -> Result<Vec<TableStats>> {
self.with_storage(|s| s.table_stats())
}
pub fn get_table_metadata(&self, table_name: &str) -> Result<Option<storage::TableMetadata>> {
self.with_storage(|s| s.get_table_metadata(table_name))
}
pub fn database_info(&self) -> Result<DatabaseInfo> {
self.with_storage(|s| s.database_info())
}
pub fn vacuum(&self) -> Result<()> {
self.with_storage(|s| s.vacuum())
}
pub fn vacuum_into(&self, path: &str) -> Result<()> {
self.with_storage(|s| s.vacuum_into(path))
}
pub fn restore_from(&self, path: &str) -> Result<()> {
self.with_storage_mut(|s| s.restore_from(path))
}
#[cfg(feature = "mcp-server")]
pub(crate) fn backup_to_memory(&self) -> Result<rusqlite::Connection> {
self.with_storage(|s| s.backup_to_memory())
}
#[cfg(feature = "mcp-server")]
pub(crate) fn restore_from_connection(&self, source: &rusqlite::Connection) -> Result<()> {
self.with_storage_mut(|s| s.restore_from_connection(source))
}
}
#[cfg(feature = "wasm-sqlite")]
impl Database<WasmBridgeBackend> {
pub async fn open(name: &str) -> Result<Self> {
Self::open_with(name, false).await
}
pub async fn open_with(name: &str, ephemeral: bool) -> Result<Self> {
let backend = WasmBridgeBackend::open_with(name, ephemeral)
.await
.map_err(DynoxideError::from)?;
Ok(Self {
inner: Arc::new(BackendMutex::new(backend)),
idempotency_tokens: Arc::new(Mutex::new(HashMap::new())),
})
}
pub async fn persistence_mode(&self) -> String {
self.backend().await.persistence_mode().to_string()
}
pub async fn close(&self) -> Result<()> {
self.backend()
.await
.close()
.await
.map_err(DynoxideError::from)
}
pub(crate) async fn backend(&self) -> async_lock::MutexGuard<'_, WasmBridgeBackend> {
self.inner.lock().await
}
pub async fn create_table(
&self,
request: actions::create_table::CreateTableRequest,
) -> Result<actions::create_table::CreateTableResponse> {
let backend = self.backend().await;
actions::create_table::execute(&*backend, request).await
}
pub async fn delete_table(
&self,
request: actions::delete_table::DeleteTableRequest,
) -> Result<actions::delete_table::DeleteTableResponse> {
let backend = self.backend().await;
actions::delete_table::execute(&*backend, request).await
}
pub async fn describe_table(
&self,
request: actions::describe_table::DescribeTableRequest,
) -> Result<actions::describe_table::DescribeTableResponse> {
let backend = self.backend().await;
actions::describe_table::execute(&*backend, request).await
}
pub async fn list_tables(
&self,
request: actions::list_tables::ListTablesRequest,
) -> Result<actions::list_tables::ListTablesResponse> {
let backend = self.backend().await;
actions::list_tables::execute(&*backend, request).await
}
pub async fn put_item(
&self,
request: actions::put_item::PutItemRequest,
) -> Result<actions::put_item::PutItemResponse> {
let backend = self.backend().await;
actions::put_item::execute(&*backend, request).await
}
pub async fn get_item(
&self,
request: actions::get_item::GetItemRequest,
) -> Result<actions::get_item::GetItemResponse> {
let backend = self.backend().await;
actions::get_item::execute(&*backend, request).await
}
pub async fn delete_item(
&self,
request: actions::delete_item::DeleteItemRequest,
) -> Result<actions::delete_item::DeleteItemResponse> {
let backend = self.backend().await;
actions::delete_item::execute(&*backend, request).await
}
pub async fn query(
&self,
request: actions::query::QueryRequest,
) -> Result<actions::query::QueryResponse> {
let backend = self.backend().await;
actions::query::execute(&*backend, request).await
}
pub async fn scan(
&self,
request: actions::scan::ScanRequest,
) -> Result<actions::scan::ScanResponse> {
let backend = self.backend().await;
actions::scan::execute(&*backend, request).await
}
}
#[cfg(all(test, any(feature = "native-sqlite", feature = "_has-encryption")))]
mod tests {
use super::*;
#[test]
fn test_database_memory() {
let db = Database::memory().unwrap();
let _db2 = db.clone();
}
#[test]
fn test_database_with_storage() {
let db = Database::memory().unwrap();
let tables = db.with_storage(|s| s.list_table_names()).unwrap();
assert!(tables.is_empty());
}
#[test]
fn test_database_thread_safe() {
let db = Database::memory().unwrap();
let db2 = db.clone();
let handle =
std::thread::spawn(move || db2.with_storage(|s| s.list_table_names()).unwrap());
let tables = handle.join().unwrap();
assert!(tables.is_empty());
}
#[test]
fn test_native_database_alias_round_trips() {
let db: NativeDatabase = Database::memory().unwrap();
db.create_table(actions::create_table::CreateTableRequest {
table_name: "tbl".to_string(),
key_schema: vec![types::KeySchemaElement {
attribute_name: "pk".to_string(),
key_type: types::KeyType::HASH,
}],
attribute_definitions: vec![types::AttributeDefinition {
attribute_name: "pk".to_string(),
attribute_type: types::ScalarAttributeType::S,
}],
..Default::default()
})
.unwrap();
let mut item = HashMap::new();
item.insert("pk".to_string(), AttributeValue::S("a".to_string()));
db.put_item(actions::put_item::PutItemRequest {
table_name: "tbl".to_string(),
item,
..Default::default()
})
.unwrap();
let mut key = HashMap::new();
key.insert("pk".to_string(), AttributeValue::S("a".to_string()));
let got = db
.get_item(actions::get_item::GetItemRequest {
table_name: "tbl".to_string(),
key,
..Default::default()
})
.unwrap();
assert_eq!(
got.item.unwrap().get("pk"),
Some(&AttributeValue::S("a".to_string()))
);
}
}