#[cfg(not(target_arch = "wasm32"))]
mod disk;
mod encrypted;
mod memory;
#[cfg(not(target_arch = "wasm32"))]
mod tiered;
#[cfg(not(target_arch = "wasm32"))]
pub use disk::{AsyncDiskStorage, SyncDiskStorage};
pub use encrypted::EncryptedStorage;
pub use memory::InMemoryStorage;
#[cfg(not(target_arch = "wasm32"))]
pub use tiered::TieredStorage;
#[cfg(target_arch = "wasm32")]
pub mod wasm;
#[cfg(target_arch = "wasm32")]
pub use wasm::OpfsStorage;
use crate::engine::types::{DbError, LogEntry};
#[cfg(feature = "schema")]
use serde_json::Value;
use std::ops::ControlFlow;
use dashmap::{DashMap, DashSet};
pub trait StorageBackend: Send + Sync {
fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError>;
fn read_log(&self) -> Result<Vec<LogEntry>, DbError>;
fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError>;
fn compact_with_hook(&self, entries: Vec<LogEntry>, _hook: Option<String>) -> Result<(), DbError> {
self.compact(entries)
}
fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError>;
#[allow(dead_code)]
fn get_size(&self) -> Result<u64, DbError> {
Ok(0)
}
fn stream_log_into(&self, f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>) -> Result<u64, DbError> {
let entries = self.read_log()?;
let mut count = 0u64;
for entry in entries {
let json = serde_json::to_vec(&entry).unwrap_or_default();
let length = json.len() as u32;
if let ControlFlow::Break(_) = f(entry, length) {
return Ok(count);
}
count += 1;
}
Ok(count)
}
}
pub fn stream_into_state(
storage: &dyn StorageBackend,
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
#[cfg(feature = "schema")] schemas: &DashMap<String, std::sync::Arc<(Value, jsonschema::Validator)>>,
) -> Result<u64, DbError> {
let mut count = 0u64;
let mut offset = 0u64;
let mut tx_buffer: Vec<(LogEntry, crate::engine::types::RecordPointer)> = Vec::new();
let mut active_tx: Option<String> = None;
storage.stream_log_into(&mut |entry, length| {
let pointer = crate::engine::types::RecordPointer {
offset,
length,
};
match entry.cmd.as_str() {
"TX_BEGIN" => {
active_tx = Some(entry.key.clone());
tx_buffer.clear();
}
"TX_COMMIT" => {
if active_tx.as_ref() == Some(&entry.key) {
for (e, p) in tx_buffer.drain(..) {
let pointer = if p.length == 0 { None } else { Some(p) };
apply_entry(
&e,
state,
indexes,
#[cfg(feature = "schema")] schemas,
pointer,
);
}
active_tx = None;
} else {
tracing::warn!("⚠️ TX_COMMIT seen for unknown or inactive transaction ID: {}. Ignoring.", entry.key);
}
}
_ => {
if active_tx.is_some() {
tx_buffer.push((entry, pointer));
} else {
let p = if length == 0 { None } else { Some(pointer) };
apply_entry(
&entry,
state,
indexes,
#[cfg(feature = "schema")] schemas,
p,
);
}
}
}
count += 1;
if length > 0 {
offset += (length + 1) as u64;
}
ControlFlow::Continue(())
})?;
Ok(count)
}
pub fn apply_entry(
entry: &LogEntry,
state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
#[cfg(feature = "schema")] schemas: &DashMap<String, std::sync::Arc<(Value, jsonschema::Validator)>>,
pointer: Option<crate::engine::types::RecordPointer>,
) {
match entry.cmd.as_str() {
"INSERT" => {
let col = state
.entry(entry.collection.clone())
.or_insert_with(DashMap::new);
let doc_state = if let Some(p) = pointer {
crate::engine::types::DocumentState::Cold(p)
} else {
crate::engine::types::DocumentState::Hot(entry.value.clone())
};
col.insert(entry.key.clone(), doc_state);
crate::engine::indexing::index_doc(indexes, &entry.collection, &entry.key, &entry.value);
}
"DELETE" => {
if let Some(col) = state.get(&entry.collection) {
if let Some(old_state) = col.get(&entry.key) {
if let crate::engine::types::DocumentState::Hot(old_val) = old_state.value() {
crate::engine::indexing::unindex_doc(
indexes,
&entry.collection,
&entry.key,
old_val,
);
}
}
col.remove(&entry.key);
}
}
"DROP" => {
state.remove(&entry.collection);
indexes.retain(|k, _| !k.starts_with(&format!("{}:", entry.collection)));
}
"INDEX" => {
indexes.insert(
format!("{}:{}", entry.collection, entry.key),
DashMap::new(),
);
}
#[cfg(feature = "schema")]
"SCHEMA" => {
if let Ok(validator) = jsonschema::validator_for(&entry.value) {
schemas.insert(entry.collection.clone(), std::sync::Arc::new((entry.value.clone(), validator)));
}
}
_ => {}
}
}