Skip to main content

moltendb_core/engine/
mod.rs

1// ─── engine/mod.rs ────────────────────────────────────────────────────────────
2// This is the root module of the database engine. It defines the `Db` struct —
3// the central object that the rest of the application interacts with.
4//
5// The Db struct is a thin, cloneable handle to the shared database state.
6// Cloning a Db is cheap — it just increments reference counts on the Arcs
7// inside. All clones share the same underlying data, so any write made through
8// one clone is immediately visible through all others. This is how Axum handler
9// functions can each receive their own Db clone via State<> extraction while
10// all operating on the same in-memory database.
11//
12// Internal structure:
13//   state        — the actual document data: collection → (key → JSON value)
14//   storage      — the persistence layer (disk, encrypted, or OPFS)
15//   tx           — broadcast channel for real-time WebSocket notifications
16//   indexes      — field indexes for fast WHERE queries
17//   query_heatmap — tracks query frequency for auto-indexing
18//
19// The Db struct has two constructors:
20//   open()      — native (server) build, opens a disk file
21//   open_wasm() — WASM (browser) build, opens an OPFS file
22// Both are conditionally compiled with #[cfg(...)] attributes.
23// ─────────────────────────────────────────────────────────────────────────────
24
25// Declare the sub-modules of the engine.
26mod types;      // LogEntry, DbError
27mod indexing;   // index_doc, unindex_doc, track_query, create_index
28mod storage;    // StorageBackend trait + concrete implementations
29mod operations; // get, get_all, insert_batch, update, delete, etc.
30
31// Re-export DbError so callers can write `engine::DbError` instead of
32// `engine::types::DbError`.
33pub use types::DbError;
34// Re-export the StorageBackend trait so callers can use it without knowing
35// the internal module structure.
36pub use storage::{StorageBackend, EncryptedStorage};
37
38// DashMap = concurrent hash map. DashSet = concurrent hash set.
39use dashmap::{DashMap, DashSet};
40use tracing::{info};
41// Value = dynamically-typed JSON value.
42use serde_json::Value;
43// Standard HashMap — used for return values from get operations.
44use std::collections::HashMap;
45// Arc = thread-safe reference-counted pointer.
46// Wrapping fields in Arc allows Db to be cheaply cloned — all clones share
47// the same underlying data.
48use std::sync::Arc;
49// Tokio's broadcast channel: one sender, many receivers.
50// Used to push real-time change notifications to WebSocket subscribers.
51use tokio::sync::broadcast;
52
53/// The central database handle. Cheap to clone — all clones share the same state.
54///
55/// This struct is the public API of the engine. All database operations go
56/// through methods on this struct, which delegate to the operations module.
57#[derive(Clone)]
58pub struct Db {
59    /// The main document store.
60    /// Outer map: collection name (e.g. "users") → inner map.
61    /// Inner map: document key (e.g. "u1") → Hybrid Hot/Cold document state.
62    /// DashMap allows concurrent reads and writes from multiple threads.
63    state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
64
65    /// The storage backend — handles persistence to disk or OPFS.
66    /// `pub` so handlers can access it directly if needed (e.g. for compaction).
67    /// `Arc<dyn StorageBackend>` = shared pointer to any type implementing the trait.
68    pub storage: Arc<dyn StorageBackend>,
69
70    /// Broadcast channel sender for real-time change notifications.
71    /// When a document is inserted, updated, or deleted, a JSON event is sent
72    /// on this channel. WebSocket handlers subscribe to receive these events.
73    /// `pub` so the WebSocket handler in main.rs can call subscribe().
74    pub tx: broadcast::Sender<String>,
75
76    /// The index store.
77    /// Key format: "collection:field" (e.g. "users:role").
78    /// Value: field_value → set of document keys with that value.
79    /// e.g. "users:role" → { "admin" → {"u1"}, "user" → {"u2", "u3"} }
80    /// `pub` so handlers.rs can check for index existence directly.
81    pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
82
83    /// Query frequency counter for auto-indexing.
84    /// Key: "collection:field". Value: number of times queried.
85    /// When a field reaches 3 queries, an index is auto-created.
86    pub query_heatmap: Arc<DashMap<String, u32>>,
87
88    /// The maximum number of documents per collection to keep in RAM (Hot).
89    /// If a collection exceeds this, older documents are paged out to disk (Cold).
90    /// Default is 50,000.
91    pub hot_threshold: usize,
92
93    /// Max requests per window.
94    pub rate_limit_requests: u32,
95
96    /// Window size in seconds.
97    pub rate_limit_window: u64,
98
99    /// Maximum request body size in bytes.
100    pub max_body_size: usize,
101}
102
103impl Db {
104    /// Open (or create) a database at the given file path.
105    /// Only available on native (non-WASM) builds.
106    ///
107    /// `sync_mode`      — if true, use SyncDiskStorage (flush on every write).
108    ///                    if false, use AsyncDiskStorage (flush every 50ms).
109    ///                    Ignored when `tiered_mode` is true.
110    /// `tiered_mode`    — if true, use TieredStorage (hot + cold two-tier backend).
111    ///                    Hot writes go to the active log; cold data is archived and
112    ///                    read via mmap on startup. Best for large datasets (100k+ docs).
113    ///                    Enable with STORAGE_MODE=tiered environment variable.
114    /// `encryption_key` — if Some, wrap the storage in EncryptedStorage.
115    ///                    if None, data is stored in plaintext (not recommended).
116    #[cfg(not(target_arch = "wasm32"))]
117    pub fn open(
118        path: &str,
119        sync_mode: bool,
120        tiered_mode: bool,
121        hot_threshold: usize,
122        rate_limit_requests: u32,
123        rate_limit_window: u64,
124        max_body_size: usize,
125        encryption_key: Option<&[u8; 32]>,
126    ) -> Result<Self, DbError> {
127        // Create the shared in-memory state containers.
128        let state = Arc::new(DashMap::new());
129        // Create the broadcast channel with a buffer of 100 messages.
130        // If the buffer fills up (no subscribers reading), old messages are dropped.
131        let (tx, _rx) = broadcast::channel(100);
132        let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
133            Arc::new(Default::default());
134        let query_heatmap = Arc::new(Default::default());
135
136        // Choose the base storage backend based on the configured mode.
137        //
138        //   tiered_mode = true  → TieredStorage: hot log (async writes) + cold log
139        //                         (mmap reads). Best for large datasets. The cold log
140        //                         accumulates promoted hot data and is paged by the OS.
141        //
142        //   sync_mode = true    → SyncDiskStorage: every write is flushed to disk
143        //                         immediately. Zero data loss, lower throughput.
144        //
145        //   default             → AsyncDiskStorage: writes buffered in memory, flushed
146        //                         every 50ms. Highest throughput, up to 50ms data loss.
147        let base_storage: Arc<dyn StorageBackend> = if tiered_mode {
148            Arc::new(storage::TieredStorage::new(path)?)
149        } else if sync_mode {
150            Arc::new(storage::SyncDiskStorage::new(path)?)
151        } else {
152            Arc::new(storage::AsyncDiskStorage::new(path)?)
153        };
154
155        // Optionally wrap the base storage in EncryptedStorage.
156        // EncryptedStorage is transparent — it encrypts on write and decrypts
157        // on read, so the rest of the engine doesn't know encryption is happening.
158        let storage: Arc<dyn StorageBackend> = if let Some(key) = encryption_key {
159            Arc::new(storage::EncryptedStorage::new(base_storage, key))
160        } else {
161            base_storage
162        };
163
164        // Replay the log (or snapshot + delta) into the in-memory state.
165        // After this call, `state` and `indexes` reflect the persisted data.
166        storage::stream_into_state(&*storage, &state, &indexes)?;
167
168        Ok(Self {
169            state,
170            storage,
171            tx,
172            indexes,
173            query_heatmap,
174            hot_threshold,
175            rate_limit_requests,
176            rate_limit_window,
177            max_body_size,
178        })
179    }
180
181    /// Open (or create) a database in the browser using OPFS.
182    /// Only available on WASM builds. Async because OPFS APIs return Promises.
183    ///
184    /// `db_name` — the filename in the OPFS root directory (e.g. "analytics_db").
185    #[cfg(target_arch = "wasm32")]
186    pub async fn open_wasm(
187        db_name: &str,
188        hot_threshold: usize,
189        rate_limit_requests: u32,
190        rate_limit_window: u64,
191        max_body_size: usize,
192        encryption_key: Option<&[u8; 32]>,
193        sync_mode: bool,
194    ) -> Result<Self, DbError> {
195        let state = Arc::new(DashMap::new());
196        let (tx, _rx) = broadcast::channel(100);
197        let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
198            Arc::new(Default::default());
199        let query_heatmap = Arc::new(Default::default());
200
201        // Open the OPFS file. This is async because the browser's OPFS API
202        // uses Promises which we must await.
203        let mut storage: Arc<dyn StorageBackend> =
204            Arc::new(storage::OpfsStorage::new(db_name, sync_mode).await?);
205
206        // Apply encryption wrapper if a key is provided.
207        if let Some(key) = encryption_key {
208            storage = Arc::new(storage::EncryptedStorage::new(storage, key));
209        }
210
211        // Replay the log into the in-memory state.
212        storage::stream_into_state(&*storage, &state, &indexes)?;
213
214        Ok(Self {
215            state,
216            storage,
217            tx,
218            indexes,
219            query_heatmap,
220            hot_threshold,
221            rate_limit_requests,
222            rate_limit_window,
223            max_body_size,
224        })
225    }
226
227    /// Create a new broadcast receiver for real-time change notifications.
228    /// Each call returns an independent receiver — multiple WebSocket handlers
229    /// can each subscribe and receive all events independently.
230    pub fn subscribe(&self) -> broadcast::Receiver<String> {
231        self.tx.subscribe()
232    }
233
234    /// Retrieve a single document by key. Returns None if not found.
235    pub fn get(&self, collection: &str, key: &str) -> Option<Value> {
236        operations::get(&self.state, &self.storage, collection, key)
237    }
238
239    /// Retrieve all documents in a collection as a HashMap.
240    pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
241        operations::get_all(&self.state, &self.storage, collection)
242    }
243
244    /// Retrieve a specific set of documents by their keys.
245    pub fn get_batch(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
246        operations::get_batch(&self.state, &self.storage, collection, keys)
247    }
248
249    /// Insert or overwrite multiple documents in one call.
250    /// Each item is a (key, value) pair. Writes are persisted to storage.
251    pub fn insert_batch(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
252        operations::insert_batch(
253            &self.state,
254            &self.indexes,
255            &self.storage,
256            &self.tx,
257            collection,
258            items,
259        )?;
260
261        // Auto-evict if the collection exceeds the threshold.
262        let _ = self.evict_collection(collection, self.hot_threshold);
263        Ok(())
264    }
265
266    /// Partially update a document — merges `updates` into the existing document.
267    /// Returns true if the document was found and updated, false if not found.
268    pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
269        let updated = operations::update(
270            &self.state,
271            &self.indexes,
272            &self.storage,
273            &self.tx,
274            collection,
275            key,
276            updates,
277        )?;
278
279        if updated {
280            // Auto-evict if the collection exceeds the threshold.
281            let _ = self.evict_collection(collection, self.hot_threshold);
282        }
283        Ok(updated)
284    }
285
286    /// Delete a single document by key.
287    pub fn delete(&self, collection: &str, key: &str) -> Result<(), DbError> {
288        operations::delete(
289            &self.state,
290            &self.indexes,
291            &self.storage,
292            &self.tx,
293            collection,
294            key,
295        )
296    }
297
298    /// Delete multiple documents by key in one call.
299    pub fn delete_batch(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
300        operations::delete_batch(
301            &self.state,
302            &self.indexes,
303            &self.storage,
304            &self.tx,
305            collection,
306            keys,
307        )
308    }
309
310    /// Drop an entire collection — removes all documents and its indexes.
311    pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
312        operations::delete_collection(
313            &self.state,
314            &self.indexes,
315            &self.storage,
316            &self.tx,
317            collection,
318        )
319    }
320
321    /// Track that `field` was queried in `collection` and auto-create an index
322    /// if this field has been queried 3 or more times.
323    /// Errors are silently ignored — auto-indexing is best-effort.
324    pub fn track_query(&self, collection: &str, field: &str) {
325        // The `let _ =` discards the Result — a failed auto-index is not fatal.
326        let _ = indexing::track_query(
327            &self.indexes,
328            &self.query_heatmap,
329            collection,
330            field,
331            &self.storage,
332            &self.state,
333        );
334    }
335    
336    /// Compact the log file — rewrite it to contain only the current state.
337    ///
338    /// This removes all dead entries (superseded INSERTs, DELETE tombstones)
339    /// and writes a binary snapshot for fast next startup.
340    ///
341    /// The compacted log contains:
342    ///   - One INSERT entry per live document (current value only).
343    ///   - One INDEX entry per registered index (index data is rebuilt on replay).
344    pub fn compact(&self) -> Result<(), DbError> {
345        info!("🔨 Starting Log Compaction...");
346
347        // Build the minimal set of entries representing the current state.
348        let mut entries = Vec::new();
349
350        // One INSERT per live document across all collections.
351        for col_ref in self.state.iter() {
352            let col_name = col_ref.key();
353            for item_ref in col_ref.value().iter() {
354                // To compact, we need the full Value. If it's Cold, we fetch it from storage.
355                let value = match item_ref.value() {
356                    crate::engine::types::DocumentState::Hot(v) => v.clone(),
357                    crate::engine::types::DocumentState::Cold(ptr) => {
358                        let bytes = self.storage.read_at(ptr.offset, ptr.length)?;
359                        let log_entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
360                        log_entry.value
361                    }
362                };
363                entries.push(types::LogEntry {
364                    cmd: "INSERT".to_string(),
365                    collection: col_name.clone(),
366                    key: item_ref.key().clone(),
367                    value,
368                });
369            }
370        }
371
372        // One INDEX entry per registered index.
373        // The index name format is "collection:field" — we split it to get both parts.
374        for index_ref in self.indexes.iter() {
375            let parts: Vec<&str> = index_ref.key().split(':').collect();
376            if parts.len() == 2 {
377                entries.push(types::LogEntry {
378                    cmd: "INDEX".to_string(),
379                    collection: parts[0].to_string(),
380                    key: parts[1].to_string(),       // field name
381                    value: serde_json::json!(null),
382                });
383            }
384        }
385
386        // Delegate the actual file rewrite (and snapshot write) to the storage backend.
387        self.storage.compact(entries)?;
388
389        info!("✅ Log Compaction Finished!");
390        Ok(())
391    }
392
393    /// Evict documents from RAM to disk for a collection if it exceeds the threshold.
394    ///
395    /// This converts `Hot(Value)` entries into `Cold(RecordPointer)` entries.
396    /// In this v1, it re-scans the log to find the exact byte offsets for the documents.
397    pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
398        let col_len = if let Some(col) = self.state.get(collection) {
399            col.len()
400        } else {
401            return Err(DbError::CollectionNotFound);
402        };
403
404        if col_len <= limit {
405            return Ok(0);
406        }
407
408        let mut evicted_count = 0;
409        let mut offset = 0u64;
410        let to_evict = col_len - limit;
411
412        // To evict properly, we need the pointers. Since we don't store them for
413        // Hot documents, we re-scan the log to find them.
414        self.storage.stream_log_into(&mut |entry, length| {
415            if entry.collection == collection {
416                if evicted_count < to_evict {
417                    if let Some(col) = self.state.get(collection) {
418                        if let Some(mut doc_state) = col.get_mut(&entry.key) {
419                            if let crate::engine::types::DocumentState::Hot(_) = *doc_state {
420                                *doc_state = crate::engine::types::DocumentState::Cold(crate::engine::types::RecordPointer {
421                                    offset,
422                                    length,
423                                });
424                                evicted_count += 1;
425                            }
426                        }
427                    }
428                }
429                offset += (length + 1) as u64;
430            } else {
431                offset += (length + 1) as u64;
432            }
433        })?;
434
435        Ok(evicted_count)
436    }
437}