mod types; mod indexing; mod storage; mod schema; mod operations;
pub use types::{DbError, LogEntry};
pub use storage::{StorageBackend, EncryptedStorage};
use dashmap::{DashMap, DashSet};
use tracing::{info};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
#[derive(Clone)]
pub struct Db {
state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
pub storage: Arc<dyn StorageBackend>,
pub tx: broadcast::Sender<String>,
pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
pub query_heatmap: Arc<DashMap<String, u32>>,
pub hot_threshold: usize,
pub rate_limit_requests: u32,
pub rate_limit_window: u64,
pub max_body_size: usize,
pub schemas: Arc<DashMap<String, Arc<(Value, jsonschema::Validator)>>>,
}
impl Db {
#[cfg(not(target_arch = "wasm32"))]
pub fn open(
path: &str,
sync_mode: bool,
tiered_mode: bool,
hot_threshold: usize,
rate_limit_requests: u32,
rate_limit_window: u64,
max_body_size: usize,
encryption_key: Option<&[u8; 32]>,
) -> Result<Self, DbError> {
let state = Arc::new(DashMap::new());
let (tx, _rx) = broadcast::channel(100);
let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
Arc::new(Default::default());
let query_heatmap = Arc::new(Default::default());
let schemas = Arc::new(DashMap::new());
let base_storage: Arc<dyn StorageBackend> = if tiered_mode {
Arc::new(storage::TieredStorage::new(path)?)
} else if sync_mode {
Arc::new(storage::SyncDiskStorage::new(path)?)
} else {
Arc::new(storage::AsyncDiskStorage::new(path)?)
};
let storage: Arc<dyn StorageBackend> = if let Some(key) = encryption_key {
Arc::new(storage::EncryptedStorage::new(base_storage, key))
} else {
base_storage
};
storage::stream_into_state(&*storage, &state, &indexes, &schemas)?;
Ok(Self {
state,
storage,
tx,
indexes,
query_heatmap,
hot_threshold,
rate_limit_requests,
rate_limit_window,
max_body_size,
schemas,
})
}
#[cfg(target_arch = "wasm32")]
pub async fn open_wasm(
db_name: &str,
hot_threshold: usize,
rate_limit_requests: u32,
rate_limit_window: u64,
max_body_size: usize,
encryption_key: Option<&[u8; 32]>,
sync_mode: bool,
) -> Result<Self, DbError> {
let state = Arc::new(DashMap::new());
let (tx, _rx) = broadcast::channel(100);
let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
Arc::new(Default::default());
let query_heatmap = Arc::new(Default::default());
let schemas = Arc::new(DashMap::new());
let mut storage: Arc<dyn StorageBackend> =
Arc::new(storage::OpfsStorage::new(db_name, sync_mode).await?);
if let Some(key) = encryption_key {
storage = Arc::new(storage::EncryptedStorage::new(storage, key));
}
storage::stream_into_state(&*storage, &state, &indexes, &schemas)?;
Ok(Self {
state,
storage,
tx,
indexes,
query_heatmap,
hot_threshold,
rate_limit_requests,
rate_limit_window,
max_body_size,
schemas,
})
}
pub fn subscribe(&self) -> broadcast::Receiver<String> {
self.tx.subscribe()
}
pub fn get(&self, collection: &str, key: &str) -> Option<Value> {
operations::get(&self.state, &self.storage, collection, key)
}
pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
operations::get_all(&self.state, &self.storage, collection)
}
pub fn get_batch(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
operations::get_batch(&self.state, &self.storage, collection, keys)
}
pub fn insert_batch(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
operations::insert_batch(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
&self.schemas,
collection,
items,
)?;
let _ = self.evict_collection(collection, self.hot_threshold);
Ok(())
}
pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
let updated = operations::update(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
&self.schemas,
collection,
key,
updates,
)?;
if updated {
let _ = self.evict_collection(collection, self.hot_threshold);
}
Ok(updated)
}
pub fn delete(&self, collection: &str, key: &str) -> Result<(), DbError> {
operations::delete(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
key,
)
}
pub fn delete_batch(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
operations::delete_batch(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
keys,
)
}
pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
operations::delete_collection(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
)
}
pub fn track_query(&self, collection: &str, field: &str) {
let _ = indexing::track_query(
&self.indexes,
&self.query_heatmap,
collection,
field,
&self.storage,
&self.state,
);
}
pub fn set_schema(&self, collection: &str, schema: Value) -> Result<(), DbError> {
schema::set_schema(
&self.schemas,
&self.storage,
&self.tx,
collection,
schema
)
}
pub fn compact(&self) -> Result<(), DbError> {
info!("🔨 Starting Log Compaction...");
let mut entries = Vec::new();
for col_ref in self.state.iter() {
let col_name = col_ref.key();
for item_ref in col_ref.value().iter() {
let value = match item_ref.value() {
crate::engine::types::DocumentState::Hot(v) => v.clone(),
crate::engine::types::DocumentState::Cold(ptr) => {
let bytes = self.storage.read_at(ptr.offset, ptr.length)?;
let log_entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
log_entry.value
}
};
entries.push(types::LogEntry {
cmd: "INSERT".to_string(),
collection: col_name.clone(),
key: item_ref.key().clone(),
value,
});
}
}
for schema_ref in self.schemas.iter() {
let col_name = schema_ref.key();
let (schema_json, _) = &**schema_ref.value();
entries.push(types::LogEntry {
cmd: "SCHEMA".to_string(),
collection: col_name.clone(),
key: "".to_string(),
value: schema_json.clone(),
});
}
for index_ref in self.indexes.iter() {
let parts: Vec<&str> = index_ref.key().split(':').collect();
if parts.len() == 2 {
entries.push(types::LogEntry {
cmd: "INDEX".to_string(),
collection: parts[0].to_string(),
key: parts[1].to_string(), value: serde_json::json!(null),
});
}
}
self.storage.compact(entries)?;
info!("✅ Log Compaction Finished!");
Ok(())
}
pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
let col_len = if let Some(col) = self.state.get(collection) {
col.len()
} else {
return Err(DbError::CollectionNotFound);
};
if col_len <= limit {
return Ok(0);
}
let mut evicted_count = 0;
let mut offset = 0u64;
let to_evict = col_len - limit;
self.storage.stream_log_into(&mut |entry, length| {
if entry.collection == collection {
if evicted_count < to_evict {
if let Some(col) = self.state.get(collection) {
if let Some(mut doc_state) = col.get_mut(&entry.key) {
if let crate::engine::types::DocumentState::Hot(_) = *doc_state {
*doc_state = crate::engine::types::DocumentState::Cold(crate::engine::types::RecordPointer {
offset,
length,
});
evicted_count += 1;
}
}
}
}
offset += (length + 1) as u64;
} else {
offset += (length + 1) as u64;
}
})?;
Ok(evicted_count)
}
}