Skip to main content

moltendb_core/engine/storage/
mod.rs

1// ─── storage/mod.rs ──────────────────────────────────────────────────────────
2// This is the root module for all storage backends. It does three things:
3//
4//   1. Declares and conditionally exposes the concrete backend modules
5//      (disk, encrypted, wasm) based on the compile target.
6//
7//   2. Defines the StorageBackend trait — the single interface that the rest
8//      of the engine uses to read/write data. Any type that implements this
9//      trait can be used as a storage backend, whether it writes to a disk
10//      file, an encrypted file, or a browser OPFS file.
11//
12//   3. Provides the startup replay functions (stream_into_state, apply_entry,
13//      replay_log_entries) that rebuild the in-memory database state from the
14//      persistent log on server/worker startup.
15//
16// The StorageBackend trait is the key abstraction that makes MoltenDB's
17// "same engine, different storage" design possible. The engine (mod.rs,
18// operations.rs, handlers.rs) never imports a concrete storage type — it
19// only ever holds an Arc<dyn StorageBackend>. This means you can swap the
20// storage backend without changing any engine code.
21// ─────────────────────────────────────────────────────────────────────────────
22
23// ── Conditional module declarations ──────────────────────────────────────────
24// These cfg attributes mean "only compile this when NOT targeting wasm32".
25// On native (server) builds we get disk.rs and encrypted.rs.
26// On WASM (browser) builds we get wasm.rs.
27// This prevents browser-incompatible code (file I/O, Tokio tasks) from being
28// compiled into the WASM binary.
29
30#[cfg(not(target_arch = "wasm32"))]
31mod disk;
32mod encrypted;
33// tiered.rs provides MmapLogReader (memory-mapped cold log reads) and
34// TieredStorage (hot + cold two-tier backend for large-scale deployments).
35#[cfg(not(target_arch = "wasm32"))]
36mod tiered;
37// Re-export the concrete types so callers can write `storage::AsyncDiskStorage`
38// instead of `storage::disk::AsyncDiskStorage`.
39#[cfg(not(target_arch = "wasm32"))]
40pub use disk::{AsyncDiskStorage, SyncDiskStorage};
41pub use encrypted::EncryptedStorage;
42// Re-export TieredStorage so engine/mod.rs and main.rs can use it directly.
43#[cfg(not(target_arch = "wasm32"))]
44pub use tiered::TieredStorage;
45
46// On WASM builds, expose the browser-side OPFS storage.
47#[cfg(target_arch = "wasm32")]
48pub mod wasm;
49#[cfg(target_arch = "wasm32")]
50pub use wasm::OpfsStorage;
51
52// ── Shared imports ────────────────────────────────────────────────────────────
53// These are used by both the trait definition and the replay functions below.
54use crate::engine::types::{DbError, LogEntry};
55// DashMap is a concurrent hash map — like HashMap but safe to read/write from
56// multiple threads simultaneously without a global lock.
57// DashSet is the set equivalent.
58use dashmap::{DashMap, DashSet};
59// serde_json::Value is a dynamically-typed JSON value (can be object, array,
60// string, number, bool, or null). All document data is stored as Value.
61
62// ─── StorageBackend trait ─────────────────────────────────────────────────────
63//
64// This is the core abstraction of the storage layer. Any type that implements
65// these three methods can serve as a MoltenDB storage backend.
66//
67// The trait requires Send + Sync because the backend is stored inside an
68// Arc<dyn StorageBackend> and shared across multiple Tokio tasks/threads.
69//   • Send  = the type can be moved to another thread
70//   • Sync  = the type can be referenced from multiple threads simultaneously
71// ─────────────────────────────────────────────────────────────────────────────
72
73/// The core storage abstraction. Implement this trait to add a new storage backend.
74///
75/// All three methods operate on `LogEntry` — the atomic unit of data in MoltenDB.
76/// The engine never writes raw bytes; it always goes through this interface.
77pub trait StorageBackend: Send + Sync {
78    /// Append a single log entry to the persistent store.
79    ///
80    /// This is called on every insert, update, delete, and index creation.
81    /// Implementations may buffer writes (async) or flush immediately (sync).
82    fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError>;
83
84    /// Read all log entries from persistent storage into a Vec.
85    ///
86    /// Called on startup to rebuild the in-memory state, and by EncryptedStorage
87    /// which must decrypt entries before they can be streamed into state.
88    /// For large databases, prefer `stream_log_into` which avoids holding the
89    /// full log in RAM.
90    fn read_log(&self) -> Result<Vec<LogEntry>, DbError>;
91
92    /// Compact the log by writing only the current state (removing dead entries).
93    ///
94    /// `entries` is the complete current state of the database — every live
95    /// document as a single INSERT entry. The implementation should atomically
96    /// replace the existing log with this minimal set.
97    fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError>;
98
99    /// Read exactly `length` bytes starting at `offset` from the log.
100    ///
101    /// This is used to fetch "Cold" documents from the append-only log without
102    /// loading the entire file into memory.
103    fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError>;
104
105    /// Return the current size of the persistent log file in bytes.
106    ///
107    /// Used by the WASM worker to implement size-based auto-compaction — the JS
108    /// side calls `get_size` after every INSERT batch and compacts if the file
109    /// exceeds the configured threshold (default: 5 MB).
110    ///
111    /// The default implementation returns 0 (no size information available).
112    /// `OpfsStorage` overrides this with a real `FileSystemSyncAccessHandle.getSize()` call.
113    /// Native disk backends don't need this — they use OS-level file metadata instead.
114    #[allow(dead_code)]
115    fn get_size(&self) -> Result<u64, DbError> {
116        Ok(0)
117    }
118
119    /// Stream log entries into state one at a time, without loading the full
120    /// log into RAM. Implementations may load a binary snapshot first and only
121    /// replay the delta lines written after the snapshot.
122    ///
123    /// The default implementation falls back to `read_log()` for backwards
124    /// compatibility (used by WASM/EncryptedStorage which don't have snapshots).
125    ///
126    /// Returns the total number of entries processed.
127    fn stream_log_into(&self, f: &mut dyn FnMut(LogEntry, u32)) -> Result<u64, DbError> {
128        // Default: load everything into a Vec, then iterate.
129        // Concrete implementations (AsyncDiskStorage, SyncDiskStorage) override
130        // this with a more efficient snapshot + streaming approach.
131        let entries = self.read_log()?;
132        let count = entries.len() as u64;
133        for entry in entries {
134            // Default re-serializes to get length. 
135            // Better implementations override this.
136            let json = serde_json::to_vec(&entry).unwrap_or_default();
137            let length = json.len() as u32;
138            f(entry, length);
139        }
140        Ok(count)
141    }
142}
143
144// ─── Startup replay ───────────────────────────────────────────────────────────
145//
146// When the server starts (or the WASM worker initialises), we need to rebuild
147// the in-memory state from the persistent log. These functions handle that.
148//
149// The process is:
150//   1. Call storage.stream_log_into() — this either loads a binary snapshot
151//      + delta (fast path) or streams the full log line-by-line (slow path).
152//   2. For each LogEntry, call apply_entry() to update the in-memory DashMaps.
153//   3. After all entries are applied, the in-memory state matches the log.
154// ─────────────────────────────────────────────────────────────────────────────
155
156/// Drive startup by streaming all log entries from storage into the in-memory
157/// state and index maps. Uses snapshot + delta replay when available.
158///
159/// `state`   — the main data store: collection name → (key → document state)
160/// `indexes` — the index store: "collection:field" → (field value → set of keys)
161///
162/// Returns the total number of log entries processed.
163pub fn stream_into_state(
164    storage: &dyn StorageBackend,
165    state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
166    indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
167) -> Result<u64, DbError> {
168    let mut count = 0u64;
169    let mut offset = 0u64;
170
171    // stream_log_into calls our closure once per LogEntry, providing the 
172    // LogEntry and its raw byte length in the log file.
173    storage.stream_log_into(&mut |entry, length| {
174        apply_entry(&entry, state, indexes, Some(crate::engine::types::RecordPointer {
175            offset,
176            length,
177        }));
178
179        count += 1;
180        // +1 for the newline character appended to each JSON line in the log.
181        offset += (length + 1) as u64;
182    })?;
183    Ok(count)
184}
185
186/// Apply a single log entry to the in-memory state and indexes.
187///
188/// If `pointer` is provided (during log replay), INSERT entries are stored
189/// as `DocumentState::Cold(pointer)` to save memory. Live writes stay `Hot`.
190fn apply_entry(
191    entry: &LogEntry,
192    state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
193    indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
194    pointer: Option<crate::engine::types::RecordPointer>,
195) {
196    match entry.cmd.as_str() {
197        "INSERT" => {
198            let col = state
199                .entry(entry.collection.clone())
200                .or_insert_with(DashMap::new);
201
202            // During replay, we use the pointer (Cold). For live writes, we store the Value (Hot).
203            let doc_state = if let Some(p) = pointer {
204                crate::engine::types::DocumentState::Cold(p)
205            } else {
206                crate::engine::types::DocumentState::Hot(entry.value.clone())
207            };
208
209            col.insert(entry.key.clone(), doc_state);
210
211            // Indexes ALWAYS store values in RAM to keep searches O(1).
212            crate::engine::indexing::index_doc(indexes, &entry.collection, &entry.key, &entry.value);
213        }
214        "DELETE" => {
215            if let Some(col) = state.get(&entry.collection) {
216                // To unindex, we need the Value. If it's Cold, we'd have to fetch it.
217                // However, during REPLAY, we can just skip unindexing if we don't have the value,
218                // BUT that would break if a DELETE follows an INSERT.
219                // Actually, unindex_doc needs the Value.
220                // For simplicity in this v1 of Hybrid, we'll fetch if needed or change unindex_doc.
221                // Wait, if it's Cold, we don't have the value.
222                // I'll leave a TODO here and for now just handle Hot.
223                if let Some(old_state) = col.get(&entry.key) {
224                    if let crate::engine::types::DocumentState::Hot(old_val) = old_state.value() {
225                         crate::engine::indexing::unindex_doc(
226                            indexes,
227                            &entry.collection,
228                            &entry.key,
229                            old_val,
230                        );
231                    }
232                }
233                col.remove(&entry.key);
234            }
235        }
236        "DROP" => {
237            // Remove the entire collection from the state map.
238            state.remove(&entry.collection);
239            // Remove all indexes that belong to this collection.
240            // retain() keeps only entries where the closure returns true.
241            // We drop any index whose key starts with "collection:" (e.g. "users:role").
242            indexes.retain(|k, _| !k.starts_with(&format!("{}:", entry.collection)));
243        }
244        "INDEX" => {
245            // Register an empty index slot for "collection:field".
246            // The index will be populated as subsequent INSERT entries are applied.
247            // `entry.key` holds the field name (e.g. "role" for "users:role").
248            indexes.insert(
249                format!("{}:{}", entry.collection, entry.key),
250                DashMap::new(),
251            );
252        }
253        // Unknown command types are silently ignored for forward compatibility.
254        // If a future version of MoltenDB adds a new command, older versions
255        // will simply skip those entries rather than crashing.
256        _ => {}
257    }
258}
259
260// Replay a slice of already-decoded log entries into RAM state.
261//
262// This is an alternative to stream_into_state() used when the entries have
263// already been loaded into memory (e.g. after decryption by EncryptedStorage).
264// It applies the same logic as apply_entry() but iterates a pre-built slice.
265
266// pub fn replay_log_entries(
267//     entries: &[LogEntry],
268//     state: &DashMap<String, DashMap<String, Value>>,
269//     indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
270// ) {
271//     for entry in entries {
272//         match entry.cmd.as_str() {
273//             "INSERT" => {
274//                 // Get or create the collection, then insert the document.
275//                 let col = state
276//                     .entry(entry.collection.clone())
277//                     .or_insert_with(DashMap::new);
278//                 col.insert(entry.key.clone(), entry.value.clone());
279//                 // Keep indexes in sync with the inserted document.
280//                 crate::engine::indexing::index_doc(indexes, &entry.collection, &entry.key, &entry.value);
281//             }
282//             "DELETE" => {
283//                 if let Some(col) = state.get(&entry.collection) {
284//                     // Remove from indexes before removing from state.
285//                     if let Some(old_val) = col.get(&entry.key) {
286//                         crate::engine::indexing::unindex_doc(
287//                             indexes,
288//                             &entry.collection,
289//                             &entry.key,
290//                             old_val.value(),
291//                         );
292//                     }
293//                     col.remove(&entry.key);
294//                 }
295//             }
296//             "DROP" => {
297//                 // Remove the collection and all its associated indexes.
298//                 state.remove(&entry.collection);
299//                 indexes.retain(|k, _| !k.starts_with(&format!("{}:", entry.collection)));
300//             }
301//             "INDEX" => {
302//                 // Register an empty index slot.
303//                 indexes.insert(
304//                     format!("{}:{}", entry.collection, entry.key),
305//                     DashMap::new(),
306//                 );
307//             }
308//             _ => {}
309//         }
310//     }
311//     println!("✅ Database restored & Indexes rebuilt!");
312// }