Skip to main content

moltendb_core/engine/
mod.rs

1// ─── engine/mod.rs ────────────────────────────────────────────────────────────
2// This is the root module of the database engine. It defines the `Db` struct —
3// the central object that the rest of the application interacts with.
4//
5// The Db struct is a thin, cloneable handle to the shared database state.
6// Cloning a Db is cheap — it just increments reference counts on the Arcs
7// inside. All clones share the same underlying data, so any write made through
8// one clone is immediately visible through all others. This is how Axum handler
9// functions can each receive their own Db clone via State<> extraction while
10// all operating on the same in-memory database.
11//
12// Internal structure:
13//   state        — the actual document data: collection → (key → JSON value)
14//   storage      — the persistence layer (disk, encrypted, or OPFS)
15//   tx           — broadcast channel for real-time WebSocket notifications
16//   indexes      — field indexes for fast WHERE queries
17//   query_heatmap — tracks query frequency for auto-indexing
18//
19// The Db struct has two constructors:
20//   open()      — native (server) build, opens a disk file
21//   open_wasm() — WASM (browser) build, opens an OPFS file
22// Both are conditionally compiled with #[cfg(...)] attributes.
23// ─────────────────────────────────────────────────────────────────────────────
24
25// Declare the sub-modules of the engine.
26mod types;      // LogEntry, DbError
27mod indexing;   // index_doc, unindex_doc, track_query, create_index
28mod storage;    // StorageBackend trait + concrete implementations
29mod schema;     // JSON Schema validation
30mod operations; // get, get_all, insert_batch, update, delete, etc.
31
32// Re-export LogEntry so it can be used by tests and other crates.
33pub use types::{DbError, LogEntry};
34// Re-export the StorageBackend trait so callers can use it without knowing
35// the internal module structure.
36pub use storage::{StorageBackend, EncryptedStorage};
37
38// DashMap = concurrent hash map. DashSet = concurrent hash set.
39use dashmap::{DashMap, DashSet};
40use tracing::{info};
41// Value = dynamically-typed JSON value.
42use serde_json::Value;
43// Standard HashMap — used for return values from get operations.
44use std::collections::HashMap;
45// Arc = thread-safe reference-counted pointer.
46// Wrapping fields in Arc allows Db to be cheaply cloned — all clones share
47// the same underlying data.
48use std::sync::Arc;
49// Tokio's broadcast channel: one sender, many receivers.
50// Used to push real-time change notifications to WebSocket subscribers.
51use tokio::sync::broadcast;
52
53/// The central database handle. Cheap to clone — all clones share the same state.
54///
55/// This struct is the public API of the engine. All database operations go
56/// through methods on this struct, which delegate to the operations module.
57#[derive(Clone)]
58pub struct Db {
59    /// The main document store.
60    /// Outer map: collection name (e.g. "users") → inner map.
61    /// Inner map: document key (e.g. "u1") → Hybrid Hot/Cold document state.
62    /// DashMap allows concurrent reads and writes from multiple threads.
63    state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
64
65    /// The storage backend — handles persistence to disk or OPFS.
66    /// `pub` so handlers can access it directly if needed (e.g. for compaction).
67    /// `Arc<dyn StorageBackend>` = shared pointer to any type implementing the trait.
68    pub storage: Arc<dyn StorageBackend>,
69
70    /// Broadcast channel sender for real-time change notifications.
71    /// When a document is inserted, updated, or deleted, a JSON event is sent
72    /// on this channel. WebSocket handlers subscribe to receive these events.
73    /// `pub` so the WebSocket handler in main.rs can call subscribe().
74    pub tx: broadcast::Sender<String>,
75
76    /// The index store.
77    /// Key format: "collection:field" (e.g. "users:role").
78    /// Value: field_value → set of document keys with that value.
79    /// e.g. "users:role" → { "admin" → {"u1"}, "user" → {"u2", "u3"} }
80    /// `pub` so handlers.rs can check for index existence directly.
81    pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
82
83    /// Query frequency counter for auto-indexing.
84    /// Key: "collection:field". Value: number of times queried.
85    /// When a field reaches 3 queries, an index is auto-created.
86    pub query_heatmap: Arc<DashMap<String, u32>>,
87
88    /// The maximum number of documents per collection to keep in RAM (Hot).
89    /// If a collection exceeds this, older documents are paged out to disk (Cold).
90    /// Default is 50,000.
91    pub hot_threshold: usize,
92
93    /// Max requests per window.
94    pub rate_limit_requests: u32,
95
96    /// Window size in seconds.
97    pub rate_limit_window: u64,
98
99    /// Maximum request body size in bytes.
100    pub max_body_size: usize,
101
102    /// Registered JSON schemas per collection.
103    /// Key: collection name → Value: (Original JSON, Compiled Validator).
104    pub schemas: Arc<DashMap<String, Arc<(Value, jsonschema::Validator)>>>,
105}
106
107impl Db {
108    /// Open (or create) a database at the given file path.
109    /// Only available on native (non-WASM) builds.
110    ///
111    /// `sync_mode`      — if true, use SyncDiskStorage (flush on every write).
112    ///                    if false, use AsyncDiskStorage (flush every 50ms).
113    ///                    Ignored when `tiered_mode` is true.
114    /// `tiered_mode`    — if true, use TieredStorage (hot + cold two-tier backend).
115    ///                    Hot writes go to the active log; cold data is archived and
116    ///                    read via mmap on startup. Best for large datasets (100k+ docs).
117    ///                    Enable with STORAGE_MODE=tiered environment variable.
118    /// `encryption_key` — if Some, wrap the storage in EncryptedStorage.
119    ///                    if None, data is stored in plaintext (not recommended).
120    #[cfg(not(target_arch = "wasm32"))]
121    pub fn open(
122        path: &str,
123        sync_mode: bool,
124        tiered_mode: bool,
125        hot_threshold: usize,
126        rate_limit_requests: u32,
127        rate_limit_window: u64,
128        max_body_size: usize,
129        encryption_key: Option<&[u8; 32]>,
130    ) -> Result<Self, DbError> {
131        // Create the shared in-memory state containers.
132        let state = Arc::new(DashMap::new());
133        // Create the broadcast channel with a buffer of 100 messages.
134        // If the buffer fills up (no subscribers reading), old messages are dropped.
135        let (tx, _rx) = broadcast::channel(100);
136        let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
137            Arc::new(Default::default());
138        let query_heatmap = Arc::new(Default::default());
139        let schemas = Arc::new(DashMap::new());
140
141        // Choose the base storage backend based on the configured mode.
142        //
143        //   tiered_mode = true  → TieredStorage: hot log (async writes) + cold log
144        //                         (mmap reads). Best for large datasets. The cold log
145        //                         accumulates promoted hot data and is paged by the OS.
146        //
147        //   sync_mode = true    → SyncDiskStorage: every write is flushed to disk
148        //                         immediately. Zero data loss, lower throughput.
149        //
150        //   default             → AsyncDiskStorage: writes buffered in memory, flushed
151        //                         every 50ms. Highest throughput, up to 50ms data loss.
152        let base_storage: Arc<dyn StorageBackend> = if tiered_mode {
153            Arc::new(storage::TieredStorage::new(path)?)
154        } else if sync_mode {
155            Arc::new(storage::SyncDiskStorage::new(path)?)
156        } else {
157            Arc::new(storage::AsyncDiskStorage::new(path)?)
158        };
159
160        // Optionally wrap the base storage in EncryptedStorage.
161        // EncryptedStorage is transparent — it encrypts on write and decrypts
162        // on read, so the rest of the engine doesn't know encryption is happening.
163        let storage: Arc<dyn StorageBackend> = if let Some(key) = encryption_key {
164            Arc::new(storage::EncryptedStorage::new(base_storage, key))
165        } else {
166            base_storage
167        };
168
169        // Replay the log (or snapshot + delta) into the in-memory state.
170        // After this call, `state` and `indexes` reflect the persisted data.
171        storage::stream_into_state(&*storage, &state, &indexes, &schemas)?;
172
173        Ok(Self {
174            state,
175            storage,
176            tx,
177            indexes,
178            query_heatmap,
179            hot_threshold,
180            rate_limit_requests,
181            rate_limit_window,
182            max_body_size,
183            schemas,
184        })
185    }
186
187    /// Open (or create) a database in the browser using OPFS.
188    /// Only available on WASM builds. Async because OPFS APIs return Promises.
189    ///
190    /// `db_name` — the filename in the OPFS root directory (e.g. "analytics_db").
191    #[cfg(target_arch = "wasm32")]
192    pub async fn open_wasm(
193        db_name: &str,
194        hot_threshold: usize,
195        rate_limit_requests: u32,
196        rate_limit_window: u64,
197        max_body_size: usize,
198        encryption_key: Option<&[u8; 32]>,
199        sync_mode: bool,
200    ) -> Result<Self, DbError> {
201        let state = Arc::new(DashMap::new());
202        let (tx, _rx) = broadcast::channel(100);
203        let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
204            Arc::new(Default::default());
205        let query_heatmap = Arc::new(Default::default());
206        let schemas = Arc::new(DashMap::new());
207
208        // Open the OPFS file. This is async because the browser's OPFS API
209        // uses Promises which we must await.
210        let mut storage: Arc<dyn StorageBackend> =
211            Arc::new(storage::OpfsStorage::new(db_name, sync_mode).await?);
212
213        // Apply encryption wrapper if a key is provided.
214        if let Some(key) = encryption_key {
215            storage = Arc::new(storage::EncryptedStorage::new(storage, key));
216        }
217
218        // Replay the log into the in-memory state.
219        storage::stream_into_state(&*storage, &state, &indexes, &schemas)?;
220
221        Ok(Self {
222            state,
223            storage,
224            tx,
225            indexes,
226            query_heatmap,
227            hot_threshold,
228            rate_limit_requests,
229            rate_limit_window,
230            max_body_size,
231            schemas,
232        })
233    }
234
235    /// Create a new broadcast receiver for real-time change notifications.
236    /// Each call returns an independent receiver — multiple WebSocket handlers
237    /// can each subscribe and receive all events independently.
238    pub fn subscribe(&self) -> broadcast::Receiver<String> {
239        self.tx.subscribe()
240    }
241
242    /// Retrieve a single document by key. Returns None if not found.
243    pub fn get(&self, collection: &str, key: &str) -> Option<Value> {
244        operations::get(&self.state, &self.storage, collection, key)
245    }
246
247    /// Retrieve all documents in a collection as a HashMap.
248    pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
249        operations::get_all(&self.state, &self.storage, collection)
250    }
251
252    /// Retrieve a specific set of documents by their keys.
253    pub fn get_batch(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
254        operations::get_batch(&self.state, &self.storage, collection, keys)
255    }
256
257    /// Insert or overwrite multiple documents in one call.
258    /// Each item is a (key, value) pair. Writes are persisted to storage.
259    pub fn insert_batch(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
260        operations::insert_batch(
261            &self.state,
262            &self.indexes,
263            &self.storage,
264            &self.tx,
265            &self.schemas,
266            collection,
267            items,
268        )?;
269
270        // Auto-evict if the collection exceeds the threshold.
271        let _ = self.evict_collection(collection, self.hot_threshold);
272        Ok(())
273    }
274
275    /// Partially update a document — merges `updates` into the existing document.
276    /// Returns true if the document was found and updated, false if not found.
277    pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
278        let updated = operations::update(
279            &self.state,
280            &self.indexes,
281            &self.storage,
282            &self.tx,
283            &self.schemas,
284            collection,
285            key,
286            updates,
287        )?;
288
289        if updated {
290            // Auto-evict if the collection exceeds the threshold.
291            let _ = self.evict_collection(collection, self.hot_threshold);
292        }
293        Ok(updated)
294    }
295
296    /// Delete a single document by key.
297    pub fn delete(&self, collection: &str, key: &str) -> Result<(), DbError> {
298        operations::delete(
299            &self.state,
300            &self.indexes,
301            &self.storage,
302            &self.tx,
303            collection,
304            key,
305        )
306    }
307
308    /// Delete multiple documents by key in one call.
309    pub fn delete_batch(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
310        operations::delete_batch(
311            &self.state,
312            &self.indexes,
313            &self.storage,
314            &self.tx,
315            collection,
316            keys,
317        )
318    }
319
320    /// Drop an entire collection — removes all documents and its indexes.
321    pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
322        operations::delete_collection(
323            &self.state,
324            &self.indexes,
325            &self.storage,
326            &self.tx,
327            collection,
328        )
329    }
330
331    /// Track that `field` was queried in `collection` and auto-create an index
332    /// if this field has been queried 3 or more times.
333    /// Errors are silently ignored — auto-indexing is best-effort.
334    pub fn track_query(&self, collection: &str, field: &str) {
335        // The `let _ =` discards the Result — a failed auto-index is not fatal.
336        let _ = indexing::track_query(
337            &self.indexes,
338            &self.query_heatmap,
339            collection,
340            field,
341            &self.storage,
342            &self.state,
343        );
344    }
345
346    /// Register a JSON schema for a collection.
347    /// All subsequent writes to this collection must conform to this schema.
348    pub fn set_schema(&self, collection: &str, schema: Value) -> Result<(), DbError> {
349        schema::set_schema(
350            &self.schemas,
351            &self.storage,
352            &self.tx,
353            collection,
354            schema
355        )
356    }
357    
358    /// Compact the log file — rewrite it to contain only the current state.
359    ///
360    /// This removes all dead entries (superseded INSERTs, DELETE tombstones)
361    /// and writes a binary snapshot for fast next startup.
362    ///
363    /// The compacted log contains:
364    ///   - One INSERT entry per live document (current value only).
365    ///   - One INDEX entry per registered index (index data is rebuilt on replay).
366    pub fn compact(&self) -> Result<(), DbError> {
367        info!("🔨 Starting Log Compaction...");
368
369        // Build the minimal set of entries representing the current state.
370        let mut entries = Vec::new();
371
372        // One INSERT per live document across all collections.
373        for col_ref in self.state.iter() {
374            let col_name = col_ref.key();
375            for item_ref in col_ref.value().iter() {
376                // To compact, we need the full Value. If it's Cold, we fetch it from storage.
377                let value = match item_ref.value() {
378                    crate::engine::types::DocumentState::Hot(v) => v.clone(),
379                    crate::engine::types::DocumentState::Cold(ptr) => {
380                        let bytes = self.storage.read_at(ptr.offset, ptr.length)?;
381                        let log_entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
382                        log_entry.value
383                    }
384                };
385                entries.push(types::LogEntry {
386                    cmd: "INSERT".to_string(),
387                    collection: col_name.clone(),
388                    key: item_ref.key().clone(),
389                    value,
390                });
391            }
392        }
393
394        // One SCHEMA entry per collection.
395        for schema_ref in self.schemas.iter() {
396            let col_name = schema_ref.key();
397            let (schema_json, _) = &**schema_ref.value();
398            entries.push(types::LogEntry {
399                cmd: "SCHEMA".to_string(),
400                collection: col_name.clone(),
401                key: "".to_string(),
402                value: schema_json.clone(),
403            });
404        }
405
406        // One INDEX entry per registered index.
407        // The index name format is "collection:field" — we split it to get both parts.
408        for index_ref in self.indexes.iter() {
409            let parts: Vec<&str> = index_ref.key().split(':').collect();
410            if parts.len() == 2 {
411                entries.push(types::LogEntry {
412                    cmd: "INDEX".to_string(),
413                    collection: parts[0].to_string(),
414                    key: parts[1].to_string(),       // field name
415                    value: serde_json::json!(null),
416                });
417            }
418        }
419
420        // Delegate the actual file rewrite (and snapshot write) to the storage backend.
421        self.storage.compact(entries)?;
422
423        info!("✅ Log Compaction Finished!");
424        Ok(())
425    }
426
427    /// Evict documents from RAM to disk for a collection if it exceeds the threshold.
428    ///
429    /// This converts `Hot(Value)` entries into `Cold(RecordPointer)` entries.
430    /// In this v1, it re-scans the log to find the exact byte offsets for the documents.
431    pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
432        let col_len = if let Some(col) = self.state.get(collection) {
433            col.len()
434        } else {
435            return Err(DbError::CollectionNotFound);
436        };
437
438        if col_len <= limit {
439            return Ok(0);
440        }
441
442        let mut evicted_count = 0;
443        let mut offset = 0u64;
444        let to_evict = col_len - limit;
445
446        // To evict properly, we need the pointers. Since we don't store them for
447        // Hot documents, we re-scan the log to find them.
448        self.storage.stream_log_into(&mut |entry, length| {
449            if entry.collection == collection {
450                if evicted_count < to_evict {
451                    if let Some(col) = self.state.get(collection) {
452                        if let Some(mut doc_state) = col.get_mut(&entry.key) {
453                            if let crate::engine::types::DocumentState::Hot(_) = *doc_state {
454                                *doc_state = crate::engine::types::DocumentState::Cold(crate::engine::types::RecordPointer {
455                                    offset,
456                                    length,
457                                });
458                                evicted_count += 1;
459                            }
460                        }
461                    }
462                }
463                offset += (length + 1) as u64;
464            } else {
465                offset += (length + 1) as u64;
466            }
467        })?;
468
469        Ok(evicted_count)
470    }
471}