mod types; mod indexing; mod storage; #[cfg(feature = "schema")]
mod schema; mod operations;
pub use types::{DbError, LogEntry};
pub use storage::{StorageBackend, EncryptedStorage};
#[cfg(not(target_arch = "wasm32"))]
pub use storage::{AsyncDiskStorage, SyncDiskStorage};
use dashmap::{DashMap, DashSet};
use tracing::{info};
use serde_json::Value;
use std::collections::HashMap;
use std::ops::ControlFlow;
use std::sync::Arc;
use tokio::sync::broadcast;
#[derive(Clone)]
pub struct Db {
state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
pub storage: Arc<dyn StorageBackend>,
pub tx: broadcast::Sender<String>,
pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
pub query_heatmap: Arc<DashMap<String, u32>>,
pub hot_threshold: usize,
pub rate_limit_requests: u32,
pub rate_limit_window: u64,
pub max_body_size: usize,
#[cfg(feature = "schema")]
pub schemas: Arc<DashMap<String, Arc<(Value, jsonschema::Validator)>>>,
}
impl Db {
#[cfg(not(target_arch = "wasm32"))]
pub fn open(
path: &str,
sync_mode: bool,
tiered_mode: bool,
hot_threshold: usize,
rate_limit_requests: u32,
rate_limit_window: u64,
max_body_size: usize,
encryption_key: Option<&[u8; 32]>,
) -> Result<Self, DbError> {
let state = Arc::new(DashMap::new());
let (tx, _rx) = broadcast::channel(100);
let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
Arc::new(Default::default());
let query_heatmap = Arc::new(Default::default());
#[cfg(feature = "schema")]
let schemas = Arc::new(DashMap::new());
if let Some(parent) = std::path::Path::new(path).parent() {
std::fs::create_dir_all(parent)?;
}
let base_storage: Arc<dyn StorageBackend> = if tiered_mode {
Arc::new(storage::TieredStorage::new(path)?)
} else if sync_mode {
Arc::new(storage::SyncDiskStorage::new(path)?)
} else {
Arc::new(storage::AsyncDiskStorage::new(path)?)
};
let storage: Arc<dyn StorageBackend> = if let Some(key) = encryption_key {
Arc::new(storage::EncryptedStorage::new(base_storage, key))
} else {
base_storage
};
storage::stream_into_state(
&*storage,
&state,
&indexes,
#[cfg(feature = "schema")] &schemas,
)?;
Ok(Self {
state,
storage,
tx,
indexes,
query_heatmap,
hot_threshold,
rate_limit_requests,
rate_limit_window,
max_body_size,
#[cfg(feature = "schema")]
schemas,
})
}
#[cfg(target_arch = "wasm32")]
pub async fn open_wasm(
db_name: &str,
hot_threshold: usize,
rate_limit_requests: u32,
rate_limit_window: u64,
max_body_size: usize,
encryption_key: Option<&[u8; 32]>,
sync_mode: bool,
) -> Result<Self, DbError> {
let state = Arc::new(DashMap::new());
let (tx, _rx) = broadcast::channel(100);
let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
Arc::new(Default::default());
let query_heatmap = Arc::new(Default::default());
#[cfg(feature = "schema")]
let schemas = Arc::new(DashMap::new());
let mut storage: Arc<dyn StorageBackend> =
Arc::new(storage::OpfsStorage::new(db_name, sync_mode).await?);
if let Some(key) = encryption_key {
storage = Arc::new(storage::EncryptedStorage::new(storage, key));
}
storage::stream_into_state(
&*storage,
&state,
&indexes,
#[cfg(feature = "schema")] &schemas,
)?;
Ok(Self {
state,
storage,
tx,
indexes,
query_heatmap,
hot_threshold,
rate_limit_requests,
rate_limit_window,
max_body_size,
#[cfg(feature = "schema")]
schemas,
})
}
pub fn subscribe(&self) -> broadcast::Receiver<String> {
self.tx.subscribe()
}
pub fn get(&self, collection: &str, key: &str) -> Option<Value> {
operations::get(&self.state, &self.storage, collection, key)
}
pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
operations::get_all(&self.state, &self.storage, collection)
}
pub fn get_batch(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
operations::get_batch(&self.state, &self.storage, collection, keys)
}
pub fn insert_batch(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
operations::insert_batch(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
#[cfg(feature = "schema")] &self.schemas,
collection,
items,
)?;
let _ = self.evict_collection(collection, self.hot_threshold);
Ok(())
}
pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
let updated = operations::update(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
#[cfg(feature = "schema")] &self.schemas,
collection,
key,
updates,
)?;
if updated {
let _ = self.evict_collection(collection, self.hot_threshold);
}
Ok(updated)
}
pub fn delete(&self, collection: &str, key: &str) -> Result<(), DbError> {
operations::delete(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
key,
)
}
pub fn delete_batch(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
operations::delete_batch(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
keys,
)
}
pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
operations::delete_collection(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
)
}
pub fn track_query(&self, collection: &str, field: &str) {
let _ = indexing::track_query(
&self.indexes,
&self.query_heatmap,
collection,
field,
&self.storage,
&self.state,
);
}
#[cfg(feature = "schema")]
pub fn set_schema(&self, collection: &str, schema: Value) -> Result<(), DbError> {
schema::set_schema(
&self.schemas,
&self.storage,
&self.tx,
collection,
schema
)
}
pub fn compact(&self) -> Result<(), DbError> {
info!("🔨 Starting Log Compaction...");
let mut entries = Vec::new();
for col_ref in self.state.iter() {
let col_name = col_ref.key();
for item_ref in col_ref.value().iter() {
let entry = match item_ref.value() {
crate::engine::types::DocumentState::Hot(v) => {
types::LogEntry::new(
"INSERT".to_string(),
col_name.clone(),
item_ref.key().clone(),
v.clone(),
)
}
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = self.storage.read_at(ptr.offset, ptr.length)?;
serde_json::from_slice(&bytes)?
}
};
entries.push(entry);
}
}
#[cfg(feature = "schema")]
for schema_ref in self.schemas.iter() {
let col_name = schema_ref.key();
let (schema_json, _) = &**schema_ref.value();
entries.push(types::LogEntry::new(
"SCHEMA".to_string(),
col_name.clone(),
"".to_string(),
schema_json.clone(),
));
}
for index_ref in self.indexes.iter() {
let parts: Vec<&str> = index_ref.key().split(':').collect();
if parts.len() == 2 {
entries.push(types::LogEntry::new(
"INDEX".to_string(),
parts[0].to_string(),
parts[1].to_string(), serde_json::json!(null),
));
}
}
self.storage.compact(entries.clone())?;
for entry in &entries {
if entry.cmd == "INSERT" {
if let Some(col) = self.state.get(&entry.collection) {
if let Some(mut doc) = col.get_mut(&entry.key) {
if matches!(*doc, crate::engine::types::DocumentState::Cold(_)) {
*doc = crate::engine::types::DocumentState::Hot(entry.value.clone());
}
}
}
}
}
info!("✅ Log Compaction Finished!");
Ok(())
}
pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
let col_len = if let Some(col) = self.state.get(collection) {
col.len()
} else {
return Err(DbError::CollectionNotFound);
};
if col_len <= limit {
return Ok(0);
}
let mut evicted_count = 0;
let mut offset = 0u64;
let to_evict = col_len - limit;
self.storage.stream_log_into(&mut |entry, length| {
if entry.collection == collection {
if evicted_count < to_evict {
if let Some(col) = self.state.get(collection) {
if let Some(mut doc_state) = col.get_mut(&entry.key) {
if let crate::engine::types::DocumentState::Hot(_) = *doc_state {
*doc_state = crate::engine::types::DocumentState::Cold(crate::engine::types::RecordPointer {
offset,
length,
});
evicted_count += 1;
}
}
}
}
}
offset += (length + 1) as u64;
ControlFlow::Continue(())
})?;
Ok(evicted_count)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn recover_to(
storage: &dyn StorageBackend,
to_time: Option<u64>,
to_seq: Option<u64>,
) -> Result<Vec<LogEntry>, DbError> {
let state: DashMap<String, DashMap<String, crate::engine::types::DocumentState>> = DashMap::new();
let indexes: DashMap<String, DashMap<String, DashSet<String>>> = DashMap::new();
#[cfg(feature = "schema")]
let schemas: DashMap<String, Arc<(serde_json::Value, jsonschema::Validator)>> = DashMap::new();
let mut offset = 0u64;
let mut count = 0u64;
let mut current_tx_entries = Vec::new();
let mut current_tx_id = None;
storage.stream_log_into(&mut |entry, length| {
if let Some(t) = to_time {
if entry._t > t {
return ControlFlow::Break(());
}
}
if let Some(s) = to_seq {
if count >= s {
return ControlFlow::Break(());
}
}
let pointer = crate::engine::types::RecordPointer {
offset,
length,
};
match entry.cmd.as_str() {
"TX_BEGIN" => {
current_tx_id = Some(entry.key.clone());
current_tx_entries.clear();
}
"TX_COMMIT" => {
if current_tx_id.as_ref() == Some(&entry.key) {
for (e, p) in current_tx_entries.drain(..) {
crate::engine::storage::apply_entry(
&e,
&state,
&indexes,
#[cfg(feature = "schema")] &schemas,
Some(p),
);
}
current_tx_id = None;
}
}
_ => {
if current_tx_id.is_some() {
current_tx_entries.push((entry, pointer));
} else {
crate::engine::storage::apply_entry(
&entry,
&state,
&indexes,
#[cfg(feature = "schema")] &schemas,
Some(pointer),
);
}
}
}
count += 1;
offset += (length + 1) as u64;
ControlFlow::Continue(())
})?;
let mut entries = Vec::new();
for col_ref in state.iter() {
let col_name = col_ref.key();
for item_ref in col_ref.value().iter() {
let entry = match item_ref.value() {
crate::engine::types::DocumentState::Hot(v) => {
LogEntry::new(
"INSERT".to_string(),
col_name.clone(),
item_ref.key().clone(),
v.clone(),
)
}
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = storage.read_at(ptr.offset, ptr.length).unwrap_or_default();
serde_json::from_slice(&bytes).unwrap_or_else(|_| {
LogEntry::new("INSERT".to_string(), col_name.clone(), item_ref.key().clone(), serde_json::Value::Null)
})
}
};
entries.push(entry);
}
}
#[cfg(feature = "schema")]
for schema_ref in schemas.iter() {
let col_name = schema_ref.key();
let (schema_json, _) = &**schema_ref.value();
entries.push(LogEntry::new(
"SCHEMA".to_string(),
col_name.clone(),
"".to_string(),
schema_json.clone(),
));
}
for index_ref in indexes.iter() {
let parts: Vec<&str> = index_ref.key().split(':').collect();
if parts.len() == 2 {
entries.push(LogEntry::new(
"INDEX".to_string(),
parts[0].to_string(),
parts[1].to_string(),
serde_json::json!(null),
));
}
}
Ok(entries)
}
}