moltendb_core/engine/storage/mod.rs
1// ─── storage/mod.rs ──────────────────────────────────────────────────────────
2// This is the root module for all storage backends. It does three things:
3//
4// 1. Declares and conditionally exposes the concrete backend modules
5// (disk, encrypted, wasm) based on the compile target.
6//
7// 2. Defines the StorageBackend trait — the single interface that the rest
8// of the engine uses to read/write data. Any type that implements this
9// trait can be used as a storage backend, whether it writes to a disk
10// file, an encrypted file, or a browser OPFS file.
11//
12// 3. Provides the startup replay functions (stream_into_state, apply_entry,
13// replay_log_entries) that rebuild the in-memory database state from the
14// persistent log on server/worker startup.
15//
16// The StorageBackend trait is the key abstraction that makes MoltenDB's
17// "same engine, different storage" design possible. The engine (mod.rs,
18// operations.rs, handlers.rs) never imports a concrete storage type — it
19// only ever holds an Arc<dyn StorageBackend>. This means you can swap the
20// storage backend without changing any engine code.
21// ─────────────────────────────────────────────────────────────────────────────
22
23// ── Conditional module declarations ──────────────────────────────────────────
24// These cfg attributes mean "only compile this when NOT targeting wasm32".
25// On native (server) builds we get disk.rs and encrypted.rs.
26// On WASM (browser) builds we get wasm.rs.
27// This prevents browser-incompatible code (file I/O, Tokio tasks) from being
28// compiled into the WASM binary.
29
30#[cfg(not(target_arch = "wasm32"))]
31mod disk;
32mod encrypted;
33// tiered.rs provides MmapLogReader (memory-mapped cold log reads) and
34// TieredStorage (hot + cold two-tier backend for large-scale deployments).
35#[cfg(not(target_arch = "wasm32"))]
36mod tiered;
37// Re-export the concrete types so callers can write `storage::AsyncDiskStorage`
38// instead of `storage::disk::AsyncDiskStorage`.
39#[cfg(not(target_arch = "wasm32"))]
40pub use disk::{AsyncDiskStorage, SyncDiskStorage};
41pub use encrypted::EncryptedStorage;
42// Re-export TieredStorage so engine/mod.rs and main.rs can use it directly.
43#[cfg(not(target_arch = "wasm32"))]
44pub use tiered::TieredStorage;
45
46// On WASM builds, expose the browser-side OPFS storage.
47#[cfg(target_arch = "wasm32")]
48pub mod wasm;
49#[cfg(target_arch = "wasm32")]
50pub use wasm::OpfsStorage;
51
52// ── Shared imports ────────────────────────────────────────────────────────────
53// These are used by both the trait definition and the replay functions below.
54use crate::engine::types::{DbError, LogEntry};
55#[cfg(feature = "schema")]
56use serde_json::Value;
57use std::ops::ControlFlow;
58// DashMap is a concurrent hash map — like HashMap but safe to read/write from
59// multiple threads simultaneously without a global lock.
60// DashSet is the set equivalent.
61use dashmap::{DashMap, DashSet};
62// serde_json::Value is a dynamically-typed JSON value (can be object, array,
63// string, number, bool, or null). All document data is stored as Value.
64
65// ─── StorageBackend trait ─────────────────────────────────────────────────────
66//
67// This is the core abstraction of the storage layer. Any type that implements
68// these three methods can serve as a MoltenDB storage backend.
69//
70// The trait requires Send + Sync because the backend is stored inside an
71// Arc<dyn StorageBackend> and shared across multiple Tokio tasks/threads.
72// • Send = the type can be moved to another thread
73// • Sync = the type can be referenced from multiple threads simultaneously
74// ─────────────────────────────────────────────────────────────────────────────
75
76/// The core storage abstraction. Implement this trait to add a new storage backend.
77///
78/// All three methods operate on `LogEntry` — the atomic unit of data in MoltenDB.
79/// The engine never writes raw bytes; it always goes through this interface.
80pub trait StorageBackend: Send + Sync {
81 /// Append a single log entry to the persistent store.
82 ///
83 /// This is called on every insert, update, delete, and index creation.
84 /// Implementations may buffer writes (async) or flush immediately (sync).
85 fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError>;
86
87 /// Read all log entries from persistent storage into a Vec.
88 ///
89 /// Called on startup to rebuild the in-memory state, and by EncryptedStorage
90 /// which must decrypt entries before they can be streamed into state.
91 /// For large databases, prefer `stream_log_into` which avoids holding the
92 /// full log in RAM.
93 fn read_log(&self) -> Result<Vec<LogEntry>, DbError>;
94
95 /// Compact the log by writing only the current state (removing dead entries).
96 ///
97 /// `entries` is the complete current state of the database — every live
98 /// document as a single INSERT entry. The implementation should atomically
99 /// replace the existing log with this minimal set.
100 fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError>;
101
102 /// Compact the log and execute an optional post-backup shell hook.
103 ///
104 /// The default implementation calls `compact()` and ignores the hook.
105 /// Backends that support shell hooks should override this.
106 fn compact_with_hook(&self, entries: Vec<LogEntry>, _hook: Option<String>) -> Result<(), DbError> {
107 self.compact(entries)
108 }
109
110 /// Read exactly `length` bytes starting at `offset` from the log.
111 ///
112 /// This is used to fetch "Cold" documents from the append-only log without
113 /// loading the entire file into memory.
114 fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError>;
115
116 /// Return the current size of the persistent log file in bytes.
117 ///
118 /// Used by the WASM worker to implement size-based auto-compaction — the JS
119 /// side calls `get_size` after every INSERT batch and compacts if the file
120 /// exceeds the configured threshold (default: 5 MB).
121 ///
122 /// The default implementation returns 0 (no size information available).
123 /// `OpfsStorage` overrides this with a real `FileSystemSyncAccessHandle.getSize()` call.
124 /// Native disk backends don't need this — they use OS-level file metadata instead.
125 #[allow(dead_code)]
126 fn get_size(&self) -> Result<u64, DbError> {
127 Ok(0)
128 }
129
130 /// Stream log entries into state one at a time, without loading the full
131 /// log into RAM. Implementations may load a binary snapshot first and only
132 /// replay the delta lines written after the snapshot.
133 ///
134 /// The default implementation falls back to `read_log()` for backwards
135 /// compatibility (used by WASM/EncryptedStorage which don't have snapshots).
136 ///
137 /// Returns the total number of entries processed.
138 fn stream_log_into(&self, f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>) -> Result<u64, DbError> {
139 // Default: load everything into a Vec, then iterate.
140 // Concrete implementations (AsyncDiskStorage, SyncDiskStorage) override
141 // this with a more efficient snapshot + streaming approach.
142 let entries = self.read_log()?;
143 let mut count = 0u64;
144 for entry in entries {
145 // Default re-serializes to get length.
146 // Better implementations override this.
147 let json = serde_json::to_vec(&entry).unwrap_or_default();
148 let length = json.len() as u32;
149 if let ControlFlow::Break(_) = f(entry, length) {
150 return Ok(count);
151 }
152 count += 1;
153 }
154 Ok(count)
155 }
156}
157
158// ─── Startup replay ───────────────────────────────────────────────────────────
159//
160// When the server starts (or the WASM worker initialises), we need to rebuild
161// the in-memory state from the persistent log. These functions handle that.
162//
163// The process is:
164// 1. Call storage.stream_log_into() — this either loads a binary snapshot
165// + delta (fast path) or streams the full log line-by-line (slow path).
166// 2. For each LogEntry, call apply_entry() to update the in-memory DashMaps.
167// 3. After all entries are applied, the in-memory state matches the log.
168// ─────────────────────────────────────────────────────────────────────────────
169
170/// Drive startup by streaming all log entries from storage into the in-memory
171/// state and index maps. Uses snapshot + delta replay when available.
172///
173/// `state` — the main data store: collection name → (key → document state)
174/// `indexes` — the index store: "collection:field" → (field value → set of keys)
175///
176/// Returns the total number of log entries processed.
177pub fn stream_into_state(
178 storage: &dyn StorageBackend,
179 state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
180 indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
181 #[cfg(feature = "schema")] schemas: &DashMap<String, std::sync::Arc<(Value, jsonschema::Validator)>>,
182) -> Result<u64, DbError> {
183 let mut count = 0u64;
184 let mut offset = 0u64;
185 let mut tx_buffer: Vec<(LogEntry, crate::engine::types::RecordPointer)> = Vec::new();
186 let mut active_tx: Option<String> = None;
187
188 // stream_log_into calls our closure once per LogEntry, providing the
189 // LogEntry and its raw byte length in the log file.
190 storage.stream_log_into(&mut |entry, length| {
191 let pointer = crate::engine::types::RecordPointer {
192 offset,
193 length,
194 };
195
196 match entry.cmd.as_str() {
197 "TX_BEGIN" => {
198 active_tx = Some(entry.key.clone());
199 tx_buffer.clear();
200 }
201 "TX_COMMIT" => {
202 if active_tx.as_ref() == Some(&entry.key) {
203 // Flush buffer to DashMap
204 for (e, p) in tx_buffer.drain(..) {
205 // If length was 0, p.length will be 0 (from the snapshot replay)
206 let pointer = if p.length == 0 { None } else { Some(p) };
207 apply_entry(
208 &e,
209 state,
210 indexes,
211 #[cfg(feature = "schema")] schemas,
212 pointer,
213 );
214 }
215 active_tx = None;
216 } else {
217 tracing::warn!("⚠️ TX_COMMIT seen for unknown or inactive transaction ID: {}. Ignoring.", entry.key);
218 }
219 }
220 _ => {
221 if active_tx.is_some() {
222 // Hold in RAM until commit
223 tx_buffer.push((entry, pointer));
224 } else {
225 // Standard non-transactional entry
226 // If length is 0, it means it's from a snapshot, so we want it Hot (pointer=None).
227 let p = if length == 0 { None } else { Some(pointer) };
228 apply_entry(
229 &entry,
230 state,
231 indexes,
232 #[cfg(feature = "schema")] schemas,
233 p,
234 );
235 }
236 }
237 }
238
239 count += 1;
240 // +1 for the newline character appended to each JSON line in the log.
241 // length=0 means this entry came from the snapshot (not the log file),
242 // so we must NOT advance the file offset for it.
243 if length > 0 {
244 offset += (length + 1) as u64;
245 }
246 ControlFlow::Continue(())
247 })?;
248
249 // If active_tx is still Some, the file ended prematurely (crash).
250 // In this case, we DISCARD the buffer to ensure atomicity of the last operation.
251 Ok(count)
252}
253
254/// Apply a single log entry to the in-memory state and indexes.
255///
256/// If `pointer` is provided (during log replay), INSERT entries are stored
257/// as `DocumentState::Cold(pointer)` to save memory. Live writes stay `Hot`.
258pub fn apply_entry(
259 entry: &LogEntry,
260 state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
261 indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
262 #[cfg(feature = "schema")] schemas: &DashMap<String, std::sync::Arc<(Value, jsonschema::Validator)>>,
263 pointer: Option<crate::engine::types::RecordPointer>,
264) {
265 match entry.cmd.as_str() {
266 "INSERT" => {
267 let col = state
268 .entry(entry.collection.clone())
269 .or_insert_with(DashMap::new);
270
271 // During replay, we use the pointer (Cold). For live writes, we store the Value (Hot).
272 let doc_state = if let Some(p) = pointer {
273 crate::engine::types::DocumentState::Cold(p)
274 } else {
275 crate::engine::types::DocumentState::Hot(entry.value.clone())
276 };
277
278 col.insert(entry.key.clone(), doc_state);
279
280 // Indexes ALWAYS store values in RAM to keep searches O(1).
281 crate::engine::indexing::index_doc(indexes, &entry.collection, &entry.key, &entry.value);
282 }
283 "DELETE" => {
284 if let Some(col) = state.get(&entry.collection) {
285 // To unindex, we need the Value. If it's Cold, we'd have to fetch it.
286 // However, during REPLAY, we can just skip unindexing if we don't have the value,
287 // BUT that would break if a DELETE follows an INSERT.
288 // Actually, unindex_doc needs the Value.
289 // For simplicity in this v1 of Hybrid, we'll fetch if needed or change unindex_doc.
290 // Wait, if it's Cold, we don't have the value.
291 // I'll leave a TODO here and for now just handle Hot.
292 if let Some(old_state) = col.get(&entry.key) {
293 if let crate::engine::types::DocumentState::Hot(old_val) = old_state.value() {
294 crate::engine::indexing::unindex_doc(
295 indexes,
296 &entry.collection,
297 &entry.key,
298 old_val,
299 );
300 }
301 }
302 col.remove(&entry.key);
303 }
304 }
305 "DROP" => {
306 // Remove the entire collection from the state map.
307 state.remove(&entry.collection);
308 // Remove all indexes that belong to this collection.
309 // retain() keeps only entries where the closure returns true.
310 // We drop any index whose key starts with "collection:" (e.g. "users:role").
311 indexes.retain(|k, _| !k.starts_with(&format!("{}:", entry.collection)));
312 }
313 "INDEX" => {
314 // Register an empty index slot for "collection:field".
315 // The index will be populated as subsequent INSERT entries are applied.
316 // `entry.key` holds the field name (e.g. "role" for "users:role").
317 indexes.insert(
318 format!("{}:{}", entry.collection, entry.key),
319 DashMap::new(),
320 );
321 }
322 #[cfg(feature = "schema")]
323 "SCHEMA" => {
324 // Re-compile and register the schema during replay.
325 if let Ok(validator) = jsonschema::validator_for(&entry.value) {
326 schemas.insert(entry.collection.clone(), std::sync::Arc::new((entry.value.clone(), validator)));
327 }
328 }
329 // Unknown command types are silently ignored for forward compatibility.
330 // If a future version of MoltenDB adds a new command, older versions
331 // will simply skip those entries rather than crashing.
332 _ => {}
333 }
334}
335
336// Replay a slice of already-decoded log entries into RAM state.
337//
338// This is an alternative to stream_into_state() used when the entries have
339// already been loaded into memory (e.g. after decryption by EncryptedStorage).
340// It applies the same logic as apply_entry() but iterates a pre-built slice.
341
342// pub fn replay_log_entries(
343// entries: &[LogEntry],
344// state: &DashMap<String, DashMap<String, Value>>,
345// indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
346// ) {
347// for entry in entries {
348// match entry.cmd.as_str() {
349// "INSERT" => {
350// // Get or create the collection, then insert the document.
351// let col = state
352// .entry(entry.collection.clone())
353// .or_insert_with(DashMap::new);
354// col.insert(entry.key.clone(), entry.value.clone());
355// // Keep indexes in sync with the inserted document.
356// crate::engine::indexing::index_doc(indexes, &entry.collection, &entry.key, &entry.value);
357// }
358// "DELETE" => {
359// if let Some(col) = state.get(&entry.collection) {
360// // Remove from indexes before removing from state.
361// if let Some(old_val) = col.get(&entry.key) {
362// crate::engine::indexing::unindex_doc(
363// indexes,
364// &entry.collection,
365// &entry.key,
366// old_val.value(),
367// );
368// }
369// col.remove(&entry.key);
370// }
371// }
372// "DROP" => {
373// // Remove the collection and all its associated indexes.
374// state.remove(&entry.collection);
375// indexes.retain(|k, _| !k.starts_with(&format!("{}:", entry.collection)));
376// }
377// "INDEX" => {
378// // Register an empty index slot.
379// indexes.insert(
380// format!("{}:{}", entry.collection, entry.key),
381// DashMap::new(),
382// );
383// }
384// _ => {}
385// }
386// }
387// println!("✅ Database restored & Indexes rebuilt!");
388// }