mod types; mod indexing; mod storage; mod config; #[cfg(feature = "schema")]
mod schema; mod operations; mod open; mod open_wasm;
pub use types::{DbError, LogEntry};
pub use config::DbConfig;
pub use storage::{StorageBackend, EncryptedStorage};
#[cfg(not(target_arch = "wasm32"))]
pub use storage::{AsyncDiskStorage, SyncDiskStorage};
use dashmap::{DashMap, DashSet};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
#[derive(Clone)]
pub struct Db {
state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
pub storage: Arc<dyn StorageBackend>,
pub tx: broadcast::Sender<String>,
pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
pub query_heatmap: Arc<DashMap<String, u32>>,
pub hot_threshold: usize,
pub rate_limit_requests: u32,
pub rate_limit_window: u64,
pub max_body_size: usize,
pub max_keys_per_request: usize,
#[cfg(feature = "schema")]
pub schemas: Arc<DashMap<String, Arc<(Value, jsonschema::Validator)>>>,
pub post_backup_script: Option<String>,
pub tiered_mode: bool,
#[cfg(not(target_arch = "wasm32"))]
pub started_at: std::time::Instant,
}
impl Db {
pub fn hot_keys_count(&self) -> usize {
self.state.iter().map(|c| c.value().len()).sum()
}
pub fn subscribe(&self) -> broadcast::Receiver<String> {
self.tx.subscribe()
}
pub fn get(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
operations::get(&self.state, &self.storage, collection, keys)
}
pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
operations::get_all(&self.state, &self.storage, collection)
}
pub fn insert(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
operations::insert(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
#[cfg(feature = "schema")] &self.schemas,
collection,
items,
)?;
let _ = self.evict_collection(collection, self.hot_threshold);
Ok(())
}
pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
let updated = operations::update(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
#[cfg(feature = "schema")] &self.schemas,
collection,
key,
updates,
)?;
if updated {
let _ = self.evict_collection(collection, self.hot_threshold);
}
Ok(updated)
}
pub fn delete(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
operations::delete(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
keys,
)
}
pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
operations::delete_collection(
&self.state,
&self.indexes,
&self.storage,
&self.tx,
collection,
)
}
pub fn track_query(&self, collection: &str, field: &str) {
let _ = indexing::track_query(
&self.indexes,
&self.query_heatmap,
collection,
field,
&self.storage,
&self.state,
);
}
#[cfg(feature = "schema")]
pub fn set_schema(&self, collection: &str, schema: Value) -> Result<(), DbError> {
schema::set_schema(
&self.schemas,
&self.storage,
&self.tx,
collection,
schema
)
}
pub fn clear_all(&self) {
self.state.clear();
self.indexes.clear();
self.query_heatmap.clear();
#[cfg(feature = "schema")]
self.schemas.clear();
}
pub fn compact(&self) -> Result<(), DbError> {
let entries = operations::compact(
&self.state,
#[cfg(feature = "schema")] &self.schemas,
&self.indexes,
&*self.storage,
self.post_backup_script.clone(),
)?;
for entry in &entries {
if entry.cmd == "INSERT" {
if let Some(col) = self.state.get(&entry.collection) {
if let Some(mut doc) = col.get_mut(&entry.key) {
if matches!(*doc, crate::engine::types::DocumentState::Cold(_)) {
*doc = crate::engine::types::DocumentState::Hot(entry.value.clone());
}
}
}
}
}
Ok(())
}
pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
operations::evict_collection(&self.state, &*self.storage, collection, limit)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn recover_to(
storage: &dyn StorageBackend,
to_time: Option<u64>,
to_seq: Option<u64>,
) -> Result<Vec<LogEntry>, DbError> {
operations::recover_to(storage, to_time, to_seq)
}
}