mod types; mod indexing; mod storage; mod operations;
pub use types::DbError;
pub use storage::StorageBackend;
#[cfg(not(target_arch = "wasm32"))]
pub use storage::{EncryptedStorage};
#[cfg(target_arch = "wasm32")]
pub use storage::OpfsStorage;
use dashmap::{DashMap, DashSet};
use tracing::{info};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
#[derive(Clone)]
pub struct Db {
state: Arc<DashMap<String, DashMap<String, Value>>>,
pub storage: Arc<dyn StorageBackend>,
pub tx: broadcast::Sender<String>,
pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
pub query_heatmap: Arc<DashMap<String, u32>>,
}
impl Db {
#[cfg(not(target_arch = "wasm32"))]
pub fn open(path: &str, sync_mode: bool, tiered_mode: bool, encryption_key: Option<&[u8; 32]>) -> Result<Self, DbError> {
let state = Arc::new(DashMap::new());
let (tx, _rx) = broadcast::channel(100);
let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
Arc::new(Default::default());
let query_heatmap = Arc::new(Default::default());
let base_storage: Arc<dyn StorageBackend> = if tiered_mode {
Arc::new(storage::TieredStorage::new(path)?)
} else if sync_mode {
Arc::new(storage::SyncDiskStorage::new(path)?)
} else {
Arc::new(storage::AsyncDiskStorage::new(path)?)
};
let storage: Arc<dyn StorageBackend> = if let Some(key) = encryption_key {
Arc::new(storage::EncryptedStorage::new(base_storage, key))
} else {
base_storage
};
storage::stream_into_state(&*storage, &state, &indexes)?;
Ok(Self {
state,
storage,
tx,
indexes,
query_heatmap,
})
}
#[cfg(target_arch = "wasm32")]
pub async fn open_wasm(db_name: &str) -> Result<Self, DbError> {
let state = Arc::new(DashMap::new());
let (tx, _rx) = broadcast::channel(100);
let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
Arc::new(Default::default());
let query_heatmap = Arc::new(Default::default());
let storage: Arc<dyn StorageBackend> = Arc::new(storage::OpfsStorage::new(db_name).await?);
storage::stream_into_state(&*storage, &state, &indexes)?;
Ok(Self {
state,
storage,
tx,
indexes,
query_heatmap,
})
}
pub fn subscribe(&self) -> broadcast::Receiver<String> {
self.tx.subscribe()
}
pub fn get(&self, collection: &str, key: &str) -> Option<Value> {
operations::get(&self.state, collection, key)
}
pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
operations::get_all(&self.state, collection)
}
pub fn get_batch(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
operations::get_batch(&self.state, collection, keys)
}
pub fn insert_batch(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
operations::insert_batch(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
items,
)
}
pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
operations::update(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
key,
updates,
)
}
pub fn delete(&self, collection: &str, key: &str) -> Result<(), DbError> {
operations::delete(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
key,
)
}
pub fn delete_batch(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
operations::delete_batch(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
keys,
)
}
pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
operations::delete_collection(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
)
}
pub fn track_query(&self, collection: &str, field: &str) {
let _ = indexing::track_query(
&self.indexes,
&self.query_heatmap,
collection,
field,
&self.storage,
&self.state,
);
}
pub fn compact(&self) -> Result<(), DbError> {
info!("🔨 Starting Log Compaction...");
let mut entries = Vec::new();
for col_ref in self.state.iter() {
let col_name = col_ref.key();
for item_ref in col_ref.value().iter() {
entries.push(types::LogEntry {
cmd: "INSERT".to_string(),
collection: col_name.clone(),
key: item_ref.key().clone(),
value: item_ref.value().clone(),
});
}
}
for index_ref in self.indexes.iter() {
let parts: Vec<&str> = index_ref.key().split(':').collect();
if parts.len() == 2 {
entries.push(types::LogEntry {
cmd: "INDEX".to_string(),
collection: parts[0].to_string(),
key: parts[1].to_string(), value: serde_json::json!(null),
});
}
}
self.storage.compact(entries)?;
info!("✅ Log Compaction Finished!");
Ok(())
}
}