Skip to main content

moltendb_core/engine/storage/
mod.rs

1// ─── storage/mod.rs ──────────────────────────────────────────────────────────
2// This is the root module for all storage backends. It does three things:
3//
4//   1. Declares and conditionally exposes the concrete backend modules
5//      (disk, encrypted, wasm) based on the compile target.
6//
7//   2. Defines the StorageBackend trait — the single interface that the rest
8//      of the engine uses to read/write data. Any type that implements this
9//      trait can be used as a storage backend, whether it writes to a disk
10//      file, an encrypted file, or a browser OPFS file.
11//
12//   3. Provides the startup replay functions (stream_into_state, apply_entry,
13//      replay_log_entries) that rebuild the in-memory database state from the
14//      persistent log on server/worker startup.
15//
16// The StorageBackend trait is the key abstraction that makes MoltenDB's
17// "same engine, different storage" design possible. The engine (mod.rs,
18// operations.rs, handlers.rs) never imports a concrete storage type — it
19// only ever holds an Arc<dyn StorageBackend>. This means you can swap the
20// storage backend without changing any engine code.
21// ─────────────────────────────────────────────────────────────────────────────
22
23// ── Conditional module declarations ──────────────────────────────────────────
24// These cfg attributes mean "only compile this when NOT targeting wasm32".
25// On native (server) builds we get disk.rs and encrypted.rs.
26// On WASM (browser) builds we get wasm.rs.
27// This prevents browser-incompatible code (file I/O, Tokio tasks) from being
28// compiled into the WASM binary.
29
30#[cfg(not(target_arch = "wasm32"))]
31mod disk;
32mod encrypted;
33mod memory;
34// tiered.rs provides MmapLogReader (memory-mapped cold log reads) and
35// TieredStorage (hot + cold two-tier backend for large-scale deployments).
36#[cfg(not(target_arch = "wasm32"))]
37mod tiered;
38// Re-export the concrete types so callers can write `storage::AsyncDiskStorage`
39// instead of `storage::disk::AsyncDiskStorage`.
40#[cfg(not(target_arch = "wasm32"))]
41pub use disk::{AsyncDiskStorage, SyncDiskStorage};
42pub use encrypted::EncryptedStorage;
43pub use memory::InMemoryStorage;
44// Re-export TieredStorage so engine/mod.rs and main.rs can use it directly.
45#[cfg(not(target_arch = "wasm32"))]
46pub use tiered::TieredStorage;
47
48// On WASM builds, expose the browser-side OPFS storage.
49#[cfg(target_arch = "wasm32")]
50pub mod wasm;
51#[cfg(target_arch = "wasm32")]
52pub use wasm::OpfsStorage;
53
54// ── Shared imports ────────────────────────────────────────────────────────────
55// These are used by both the trait definition and the replay functions below.
56use crate::engine::types::{DbError, LogEntry};
57#[cfg(feature = "schema")]
58use serde_json::Value;
59use std::ops::ControlFlow;
60// DashMap is a concurrent hash map — like HashMap but safe to read/write from
61// multiple threads simultaneously without a global lock.
62// DashSet is the set equivalent.
63use dashmap::{DashMap, DashSet};
64// serde_json::Value is a dynamically-typed JSON value (can be object, array,
65// string, number, bool, or null). All document data is stored as Value.
66
67// ─── StorageBackend trait ─────────────────────────────────────────────────────
68//
69// This is the core abstraction of the storage layer. Any type that implements
70// these three methods can serve as a MoltenDB storage backend.
71//
72// The trait requires Send + Sync because the backend is stored inside an
73// Arc<dyn StorageBackend> and shared across multiple Tokio tasks/threads.
74//   • Send  = the type can be moved to another thread
75//   • Sync  = the type can be referenced from multiple threads simultaneously
76// ─────────────────────────────────────────────────────────────────────────────
77
78/// The core storage abstraction. Implement this trait to add a new storage backend.
79///
80/// All three methods operate on `LogEntry` — the atomic unit of data in MoltenDB.
81/// The engine never writes raw bytes; it always goes through this interface.
82pub trait StorageBackend: Send + Sync {
83    /// Append a single log entry to the persistent store.
84    ///
85    /// This is called on every insert, update, delete, and index creation.
86    /// Implementations may buffer writes (async) or flush immediately (sync).
87    fn write_entry(&self, entry: &LogEntry) -> Result<(), DbError>;
88
89    /// Read all log entries from persistent storage into a Vec.
90    ///
91    /// Called on startup to rebuild the in-memory state, and by EncryptedStorage
92    /// which must decrypt entries before they can be streamed into state.
93    /// For large databases, prefer `stream_log_into` which avoids holding the
94    /// full log in RAM.
95    fn read_log(&self) -> Result<Vec<LogEntry>, DbError>;
96
97    /// Compact the log by writing only the current state (removing dead entries).
98    ///
99    /// `entries` is the complete current state of the database — every live
100    /// document as a single INSERT entry. The implementation should atomically
101    /// replace the existing log with this minimal set.
102    fn compact(&self, entries: Vec<LogEntry>) -> Result<(), DbError>;
103
104    /// Compact the log and execute an optional post-backup shell hook.
105    ///
106    /// The default implementation calls `compact()` and ignores the hook.
107    /// Backends that support shell hooks should override this.
108    fn compact_with_hook(&self, entries: Vec<LogEntry>, _hook: Option<String>) -> Result<(), DbError> {
109        self.compact(entries)
110    }
111
112    /// Read exactly `length` bytes starting at `offset` from the log.
113    ///
114    /// This is used to fetch "Cold" documents from the append-only log without
115    /// loading the entire file into memory.
116    fn read_at(&self, offset: u64, length: u32) -> Result<Vec<u8>, DbError>;
117
118    /// Return the current size of the persistent log file in bytes.
119    ///
120    /// Used by the WASM worker to implement size-based auto-compaction — the JS
121    /// side calls `get_size` after every INSERT batch and compacts if the file
122    /// exceeds the configured threshold (default: 5 MB).
123    ///
124    /// The default implementation returns 0 (no size information available).
125    /// `OpfsStorage` overrides this with a real `FileSystemSyncAccessHandle.getSize()` call.
126    /// Native disk backends don't need this — they use OS-level file metadata instead.
127    #[allow(dead_code)]
128    fn get_size(&self) -> Result<u64, DbError> {
129        Ok(0)
130    }
131
132    /// Stream log entries into state one at a time, without loading the full
133    /// log into RAM. Implementations may load a binary snapshot first and only
134    /// replay the delta lines written after the snapshot.
135    ///
136    /// The default implementation falls back to `read_log()` for backwards
137    /// compatibility (used by WASM/EncryptedStorage which don't have snapshots).
138    ///
139    /// Returns the total number of entries processed.
140    fn stream_log_into(&self, f: &mut dyn FnMut(LogEntry, u32) -> ControlFlow<(), ()>) -> Result<u64, DbError> {
141        // Default: load everything into a Vec, then iterate.
142        // Concrete implementations (AsyncDiskStorage, SyncDiskStorage) override
143        // this with a more efficient snapshot + streaming approach.
144        let entries = self.read_log()?;
145        let mut count = 0u64;
146        for entry in entries {
147            // Default re-serializes to get length. 
148            // Better implementations override this.
149            let json = serde_json::to_vec(&entry).unwrap_or_default();
150            let length = json.len() as u32;
151            if let ControlFlow::Break(_) = f(entry, length) {
152                return Ok(count);
153            }
154            count += 1;
155        }
156        Ok(count)
157    }
158}
159
160// ─── Startup replay ───────────────────────────────────────────────────────────
161//
162// When the server starts (or the WASM worker initialises), we need to rebuild
163// the in-memory state from the persistent log. These functions handle that.
164//
165// The process is:
166//   1. Call storage.stream_log_into() — this either loads a binary snapshot
167//      + delta (fast path) or streams the full log line-by-line (slow path).
168//   2. For each LogEntry, call apply_entry() to update the in-memory DashMaps.
169//   3. After all entries are applied, the in-memory state matches the log.
170// ─────────────────────────────────────────────────────────────────────────────
171
172/// Drive startup by streaming all log entries from storage into the in-memory
173/// state and index maps. Uses snapshot + delta replay when available.
174///
175/// `state`   — the main data store: collection name → (key → document state)
176/// `indexes` — the index store: "collection:field" → (field value → set of keys)
177///
178/// Returns the total number of log entries processed.
179pub fn stream_into_state(
180    storage: &dyn StorageBackend,
181    state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
182    indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
183    #[cfg(feature = "schema")] schemas: &DashMap<String, std::sync::Arc<(Value, jsonschema::Validator)>>,
184) -> Result<u64, DbError> {
185    let mut count = 0u64;
186    let mut offset = 0u64;
187    let mut tx_buffer: Vec<(LogEntry, crate::engine::types::RecordPointer)> = Vec::new();
188    let mut active_tx: Option<String> = None;
189
190    // stream_log_into calls our closure once per LogEntry, providing the 
191    // LogEntry and its raw byte length in the log file.
192    storage.stream_log_into(&mut |entry, length| {
193        let pointer = crate::engine::types::RecordPointer {
194            offset,
195            length,
196        };
197
198        match entry.cmd.as_str() {
199            "TX_BEGIN" => {
200                active_tx = Some(entry.key.clone());
201                tx_buffer.clear();
202            }
203            "TX_COMMIT" => {
204                if active_tx.as_ref() == Some(&entry.key) {
205                    // Flush buffer to DashMap
206                    for (e, p) in tx_buffer.drain(..) {
207                        // If length was 0, p.length will be 0 (from the snapshot replay)
208                        let pointer = if p.length == 0 { None } else { Some(p) };
209                        apply_entry(
210                            &e,
211                            state,
212                            indexes,
213                            #[cfg(feature = "schema")] schemas,
214                            pointer,
215                        );
216                    }
217                    active_tx = None;
218                } else {
219                    tracing::warn!("⚠️  TX_COMMIT seen for unknown or inactive transaction ID: {}. Ignoring.", entry.key);
220                }
221            }
222            _ => {
223                if active_tx.is_some() {
224                    // Hold in RAM until commit
225                    tx_buffer.push((entry, pointer));
226                } else {
227                    // Standard non-transactional entry
228                    // If length is 0, it means it's from a snapshot, so we want it Hot (pointer=None).
229                    let p = if length == 0 { None } else { Some(pointer) };
230                    apply_entry(
231                        &entry,
232                        state,
233                        indexes,
234                        #[cfg(feature = "schema")] schemas,
235                        p,
236                    );
237                }
238            }
239        }
240
241        count += 1;
242        // +1 for the newline character appended to each JSON line in the log.
243        // length=0 means this entry came from the snapshot (not the log file),
244        // so we must NOT advance the file offset for it.
245        if length > 0 {
246            offset += (length + 1) as u64;
247        }
248        ControlFlow::Continue(())
249    })?;
250
251    // If active_tx is still Some, the file ended prematurely (crash).
252    // In this case, we DISCARD the buffer to ensure atomicity of the last operation.
253    Ok(count)
254}
255
256/// Apply a single log entry to the in-memory state and indexes.
257///
258/// If `pointer` is provided (during log replay), INSERT entries are stored
259/// as `DocumentState::Cold(pointer)` to save memory. Live writes stay `Hot`.
260pub fn apply_entry(
261    entry: &LogEntry,
262    state: &DashMap<String, DashMap<String, crate::engine::types::DocumentState>>,
263    indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
264    #[cfg(feature = "schema")] schemas: &DashMap<String, std::sync::Arc<(Value, jsonschema::Validator)>>,
265    pointer: Option<crate::engine::types::RecordPointer>,
266) {
267    match entry.cmd.as_str() {
268        "INSERT" => {
269            let col = state
270                .entry(entry.collection.clone())
271                .or_insert_with(DashMap::new);
272
273            // During replay, we use the pointer (Cold). For live writes, we store the Value (Hot).
274            let doc_state = if let Some(p) = pointer {
275                crate::engine::types::DocumentState::Cold(p)
276            } else {
277                crate::engine::types::DocumentState::Hot(entry.value.clone())
278            };
279
280            col.insert(entry.key.clone(), doc_state);
281
282            // Indexes ALWAYS store values in RAM to keep searches O(1).
283            crate::engine::indexing::index_doc(indexes, &entry.collection, &entry.key, &entry.value);
284        }
285        "DELETE" => {
286            if let Some(col) = state.get(&entry.collection) {
287                // To unindex, we need the Value. If it's Cold, we'd have to fetch it.
288                // However, during REPLAY, we can just skip unindexing if we don't have the value,
289                // BUT that would break if a DELETE follows an INSERT.
290                // Actually, unindex_doc needs the Value.
291                // For simplicity in this v1 of Hybrid, we'll fetch if needed or change unindex_doc.
292                // Wait, if it's Cold, we don't have the value.
293                // I'll leave a TODO here and for now just handle Hot.
294                if let Some(old_state) = col.get(&entry.key) {
295                    if let crate::engine::types::DocumentState::Hot(old_val) = old_state.value() {
296                         crate::engine::indexing::unindex_doc(
297                            indexes,
298                            &entry.collection,
299                            &entry.key,
300                            old_val,
301                        );
302                    }
303                }
304                col.remove(&entry.key);
305            }
306        }
307        "DROP" => {
308            // Remove the entire collection from the state map.
309            state.remove(&entry.collection);
310            // Remove all indexes that belong to this collection.
311            // retain() keeps only entries where the closure returns true.
312            // We drop any index whose key starts with "collection:" (e.g. "users:role").
313            indexes.retain(|k, _| !k.starts_with(&format!("{}:", entry.collection)));
314        }
315        "INDEX" => {
316            // Register an empty index slot for "collection:field".
317            // The index will be populated as subsequent INSERT entries are applied.
318            // `entry.key` holds the field name (e.g. "role" for "users:role").
319            indexes.insert(
320                format!("{}:{}", entry.collection, entry.key),
321                DashMap::new(),
322            );
323        }
324        #[cfg(feature = "schema")]
325        "SCHEMA" => {
326            // Re-compile and register the schema during replay.
327            if let Ok(validator) = jsonschema::validator_for(&entry.value) {
328                schemas.insert(entry.collection.clone(), std::sync::Arc::new((entry.value.clone(), validator)));
329            }
330        }
331        // Unknown command types are silently ignored for forward compatibility.
332        // If a future version of MoltenDB adds a new command, older versions
333        // will simply skip those entries rather than crashing.
334        _ => {}
335    }
336}
337
338// Replay a slice of already-decoded log entries into RAM state.
339//
340// This is an alternative to stream_into_state() used when the entries have
341// already been loaded into memory (e.g. after decryption by EncryptedStorage).
342// It applies the same logic as apply_entry() but iterates a pre-built slice.
343
344// pub fn replay_log_entries(
345//     entries: &[LogEntry],
346//     state: &DashMap<String, DashMap<String, Value>>,
347//     indexes: &DashMap<String, DashMap<String, DashSet<String>>>,
348// ) {
349//     for entry in entries {
350//         match entry.cmd.as_str() {
351//             "INSERT" => {
352//                 // Get or create the collection, then insert the document.
353//                 let col = state
354//                     .entry(entry.collection.clone())
355//                     .or_insert_with(DashMap::new);
356//                 col.insert(entry.key.clone(), entry.value.clone());
357//                 // Keep indexes in sync with the inserted document.
358//                 crate::engine::indexing::index_doc(indexes, &entry.collection, &entry.key, &entry.value);
359//             }
360//             "DELETE" => {
361//                 if let Some(col) = state.get(&entry.collection) {
362//                     // Remove from indexes before removing from state.
363//                     if let Some(old_val) = col.get(&entry.key) {
364//                         crate::engine::indexing::unindex_doc(
365//                             indexes,
366//                             &entry.collection,
367//                             &entry.key,
368//                             old_val.value(),
369//                         );
370//                     }
371//                     col.remove(&entry.key);
372//                 }
373//             }
374//             "DROP" => {
375//                 // Remove the collection and all its associated indexes.
376//                 state.remove(&entry.collection);
377//                 indexes.retain(|k, _| !k.starts_with(&format!("{}:", entry.collection)));
378//             }
379//             "INDEX" => {
380//                 // Register an empty index slot.
381//                 indexes.insert(
382//                     format!("{}:{}", entry.collection, entry.key),
383//                     DashMap::new(),
384//                 );
385//             }
386//             _ => {}
387//         }
388//     }
389//     println!("✅ Database restored & Indexes rebuilt!");
390// }