Skip to main content

moltendb_core/engine/
mod.rs

1// ─── engine/mod.rs ────────────────────────────────────────────────────────────
2// This is the root module of the database engine. It defines the `Db` struct —
3// the central object that the rest of the application interacts with.
4//
5// The Db struct is a thin, cloneable handle to the shared database state.
6// Cloning a Db is cheap — it just increments reference counts on the Arcs
7// inside. All clones share the same underlying data, so any write made through
8// one clone is immediately visible through all others. This is how Axum handler
9// functions can each receive their own Db clone via State<> extraction while
10// all operating on the same in-memory database.
11//
12// Internal structure:
13//   state        — the actual document data: collection → (key → JSON value)
14//   storage      — the persistence layer (disk, encrypted, or OPFS)
15//   tx           — broadcast channel for real-time WebSocket notifications
16//   indexes      — field indexes for fast WHERE queries
17//   query_heatmap — tracks query frequency for auto-indexing
18//
19// The Db struct has two constructors:
20//   open()      — native (server) build, opens a disk file
21//   open_wasm() — WASM (browser) build, opens an OPFS file
22// Both are conditionally compiled with #[cfg(...)] attributes.
23// ─────────────────────────────────────────────────────────────────────────────
24
25// Declare the sub-modules of the engine.
26mod types;      // LogEntry, DbError
27mod indexing;   // index_doc, unindex_doc, track_query, create_index
28mod storage;    // StorageBackend trait + concrete implementations
29#[cfg(feature = "schema")]
30mod schema;     // JSON Schema validation
31mod operations; // get, get_all, insert_batch, update, delete, etc.
32
33// Re-export LogEntry so it can be used by tests and other crates.
34pub use types::{DbError, LogEntry};
35// Re-export the StorageBackend trait so callers can use it without knowing
36// the internal module structure.
37pub use storage::{StorageBackend, EncryptedStorage};
38#[cfg(not(target_arch = "wasm32"))]
39pub use storage::{AsyncDiskStorage, SyncDiskStorage};
40
41// DashMap = concurrent hash map. DashSet = concurrent hash set.
42use dashmap::{DashMap, DashSet};
43use tracing::{info};
44// Value = dynamically-typed JSON value.
45use serde_json::Value;
46// Standard HashMap — used for return values from get operations.
47use std::collections::HashMap;
48// Arc = thread-safe reference-counted pointer.
49// Wrapping fields in Arc allows Db to be cheaply cloned — all clones share
50// the same underlying data.
51use std::ops::ControlFlow;
52use std::sync::Arc;
53// Tokio's broadcast channel: one sender, many receivers.
54// Used to push real-time change notifications to WebSocket subscribers.
55use tokio::sync::broadcast;
56
57/// The central database handle. Cheap to clone — all clones share the same state.
58///
59/// This struct is the public API of the engine. All database operations go
60/// through methods on this struct, which delegate to the operations module.
61#[derive(Clone)]
62pub struct Db {
63    /// The main document store.
64    /// Outer map: collection name (e.g. "users") → inner map.
65    /// Inner map: document key (e.g. "u1") → Hybrid Hot/Cold document state.
66    /// DashMap allows concurrent reads and writes from multiple threads.
67    state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
68
69    /// The storage backend — handles persistence to disk or OPFS.
70    /// `pub` so handlers can access it directly if needed (e.g. for compaction).
71    /// `Arc<dyn StorageBackend>` = shared pointer to any type implementing the trait.
72    pub storage: Arc<dyn StorageBackend>,
73
74    /// Broadcast channel sender for real-time change notifications.
75    /// When a document is inserted, updated, or deleted, a JSON event is sent
76    /// on this channel. WebSocket handlers subscribe to receive these events.
77    /// `pub` so the WebSocket handler in main.rs can call subscribe().
78    pub tx: broadcast::Sender<String>,
79
80    /// The index store.
81    /// Key format: "collection:field" (e.g. "users:role").
82    /// Value: field_value → set of document keys with that value.
83    /// e.g. "users:role" → { "admin" → {"u1"}, "user" → {"u2", "u3"} }
84    /// `pub` so handlers.rs can check for index existence directly.
85    pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
86
87    /// Query frequency counter for auto-indexing.
88    /// Key: "collection:field". Value: number of times queried.
89    /// When a field reaches 3 queries, an index is auto-created.
90    pub query_heatmap: Arc<DashMap<String, u32>>,
91
92    /// The maximum number of documents per collection to keep in RAM (Hot).
93    /// If a collection exceeds this, older documents are paged out to disk (Cold).
94    /// Default is 50,000.
95    pub hot_threshold: usize,
96
97    /// Max requests per window.
98    pub rate_limit_requests: u32,
99
100    /// Window size in seconds.
101    pub rate_limit_window: u64,
102
103    /// Maximum request body size in bytes.
104    pub max_body_size: usize,
105
106    /// Registered JSON schemas per collection.
107    /// Key: collection name → Value: (Original JSON, Compiled Validator).
108    #[cfg(feature = "schema")]
109    pub schemas: Arc<DashMap<String, Arc<(Value, jsonschema::Validator)>>>,
110}
111
112impl Db {
113    /// Open (or create) a database at the given file path.
114    /// Only available on native (non-WASM) builds.
115    ///
116    /// `sync_mode`      — if true, use SyncDiskStorage (flush on every write).
117    ///                    if false, use AsyncDiskStorage (flush every 50ms).
118    ///                    Ignored when `tiered_mode` is true.
119    /// `tiered_mode`    — if true, use TieredStorage (hot + cold two-tier backend).
120    ///                    Hot writes go to the active log; cold data is archived and
121    ///                    read via mmap on startup. Best for large datasets (100k+ docs).
122    ///                    Enable with STORAGE_MODE=tiered environment variable.
123    /// `encryption_key` — if Some, wrap the storage in EncryptedStorage.
124    ///                    if None, data is stored in plaintext (not recommended).
125    #[cfg(not(target_arch = "wasm32"))]
126    pub fn open(
127        path: &str,
128        sync_mode: bool,
129        tiered_mode: bool,
130        hot_threshold: usize,
131        rate_limit_requests: u32,
132        rate_limit_window: u64,
133        max_body_size: usize,
134        encryption_key: Option<&[u8; 32]>,
135    ) -> Result<Self, DbError> {
136        // Create the shared in-memory state containers.
137        let state = Arc::new(DashMap::new());
138        // Create the broadcast channel with a buffer of 100 messages.
139        // If the buffer fills up (no subscribers reading), old messages are dropped.
140        let (tx, _rx) = broadcast::channel(100);
141        let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
142            Arc::new(Default::default());
143        let query_heatmap = Arc::new(Default::default());
144        #[cfg(feature = "schema")]
145        let schemas = Arc::new(DashMap::new());
146
147        // Ensure the parent directory exists.
148        if let Some(parent) = std::path::Path::new(path).parent() {
149            std::fs::create_dir_all(parent)?;
150        }
151
152        // Choose the base storage backend based on the configured mode.
153        //
154        //   tiered_mode = true  → TieredStorage: hot log (async writes) + cold log
155        //                         (mmap reads). Best for large datasets. The cold log
156        //                         accumulates promoted hot data and is paged by the OS.
157        //
158        //   sync_mode = true    → SyncDiskStorage: every write is flushed to disk
159        //                         immediately. Zero data loss, lower throughput.
160        //
161        //   default             → AsyncDiskStorage: writes buffered in memory, flushed
162        //                         every 50ms. Highest throughput, up to 50ms data loss.
163        let base_storage: Arc<dyn StorageBackend> = if tiered_mode {
164            Arc::new(storage::TieredStorage::new(path)?)
165        } else if sync_mode {
166            Arc::new(storage::SyncDiskStorage::new(path)?)
167        } else {
168            Arc::new(storage::AsyncDiskStorage::new(path)?)
169        };
170
171        // Optionally wrap the base storage in EncryptedStorage.
172        // EncryptedStorage is transparent — it encrypts on write and decrypts
173        // on read, so the rest of the engine doesn't know encryption is happening.
174        let storage: Arc<dyn StorageBackend> = if let Some(key) = encryption_key {
175            Arc::new(storage::EncryptedStorage::new(base_storage, key))
176        } else {
177            base_storage
178        };
179
180        // Replay the log (or snapshot + delta) into the in-memory state.
181        // After this call, `state` and `indexes` reflect the persisted data.
182        storage::stream_into_state(
183            &*storage,
184            &state,
185            &indexes,
186            #[cfg(feature = "schema")] &schemas,
187        )?;
188
189        Ok(Self {
190            state,
191            storage,
192            tx,
193            indexes,
194            query_heatmap,
195            hot_threshold,
196            rate_limit_requests,
197            rate_limit_window,
198            max_body_size,
199            #[cfg(feature = "schema")]
200            schemas,
201        })
202    }
203
204    /// Open (or create) a database in the browser using OPFS.
205    /// Only available on WASM builds. Async because OPFS APIs return Promises.
206    ///
207    /// `db_name` — the filename in the OPFS root directory (e.g. "analytics_db").
208    #[cfg(target_arch = "wasm32")]
209    pub async fn open_wasm(
210        db_name: &str,
211        hot_threshold: usize,
212        rate_limit_requests: u32,
213        rate_limit_window: u64,
214        max_body_size: usize,
215        encryption_key: Option<&[u8; 32]>,
216        sync_mode: bool,
217    ) -> Result<Self, DbError> {
218        let state = Arc::new(DashMap::new());
219        let (tx, _rx) = broadcast::channel(100);
220        let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
221            Arc::new(Default::default());
222        let query_heatmap = Arc::new(Default::default());
223        #[cfg(feature = "schema")]
224        let schemas = Arc::new(DashMap::new());
225
226        // Open the OPFS file. This is async because the browser's OPFS API
227        // uses Promises which we must await.
228        let mut storage: Arc<dyn StorageBackend> =
229            Arc::new(storage::OpfsStorage::new(db_name, sync_mode).await?);
230
231        // Apply encryption wrapper if a key is provided.
232        if let Some(key) = encryption_key {
233            storage = Arc::new(storage::EncryptedStorage::new(storage, key));
234        }
235
236        // Replay the log into the in-memory state.
237        storage::stream_into_state(
238            &*storage,
239            &state,
240            &indexes,
241            #[cfg(feature = "schema")] &schemas,
242        )?;
243
244        Ok(Self {
245            state,
246            storage,
247            tx,
248            indexes,
249            query_heatmap,
250            hot_threshold,
251            rate_limit_requests,
252            rate_limit_window,
253            max_body_size,
254            #[cfg(feature = "schema")]
255            schemas,
256        })
257    }
258
259    /// Create a new broadcast receiver for real-time change notifications.
260    /// Each call returns an independent receiver — multiple WebSocket handlers
261    /// can each subscribe and receive all events independently.
262    pub fn subscribe(&self) -> broadcast::Receiver<String> {
263        self.tx.subscribe()
264    }
265
266    /// Retrieve a single document by key. Returns None if not found.
267    pub fn get(&self, collection: &str, key: &str) -> Option<Value> {
268        operations::get(&self.state, &self.storage, collection, key)
269    }
270
271    /// Retrieve all documents in a collection as a HashMap.
272    pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
273        operations::get_all(&self.state, &self.storage, collection)
274    }
275
276    /// Retrieve a specific set of documents by their keys.
277    pub fn get_batch(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
278        operations::get_batch(&self.state, &self.storage, collection, keys)
279    }
280
281    /// Insert or overwrite multiple documents in one call.
282    /// Each item is a (key, value) pair. Writes are persisted to storage.
283    pub fn insert_batch(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
284        operations::insert_batch(
285            &self.state,
286            &self.indexes,
287            &self.storage,
288            &self.tx,
289            #[cfg(feature = "schema")] &self.schemas,
290            collection,
291            items,
292        )?;
293
294        // Auto-evict if the collection exceeds the threshold.
295        let _ = self.evict_collection(collection, self.hot_threshold);
296        Ok(())
297    }
298
299    /// Partially update a document — merges `updates` into the existing document.
300    /// Returns true if the document was found and updated, false if not found.
301    pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
302        let updated = operations::update(
303            &self.state,
304            &self.indexes,
305            &self.storage,
306            &self.tx,
307            #[cfg(feature = "schema")] &self.schemas,
308            collection,
309            key,
310            updates,
311        )?;
312
313        if updated {
314            // Auto-evict if the collection exceeds the threshold.
315            let _ = self.evict_collection(collection, self.hot_threshold);
316        }
317        Ok(updated)
318    }
319
320    /// Delete a single document by key.
321    pub fn delete(&self, collection: &str, key: &str) -> Result<(), DbError> {
322        operations::delete(
323            &self.state,
324            &self.indexes,
325            &self.storage,
326            &self.tx,
327            collection,
328            key,
329        )
330    }
331
332    /// Delete multiple documents by key in one call.
333    pub fn delete_batch(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
334        operations::delete_batch(
335            &self.state,
336            &self.indexes,
337            &self.storage,
338            &self.tx,
339            collection,
340            keys,
341        )
342    }
343
344    /// Drop an entire collection — removes all documents and its indexes.
345    pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
346        operations::delete_collection(
347            &self.state,
348            &self.indexes,
349            &self.storage,
350            &self.tx,
351            collection,
352        )
353    }
354
355    /// Track that `field` was queried in `collection` and auto-create an index
356    /// if this field has been queried 3 or more times.
357    /// Errors are silently ignored — auto-indexing is best-effort.
358    pub fn track_query(&self, collection: &str, field: &str) {
359        // The `let _ =` discards the Result — a failed auto-index is not fatal.
360        let _ = indexing::track_query(
361            &self.indexes,
362            &self.query_heatmap,
363            collection,
364            field,
365            &self.storage,
366            &self.state,
367        );
368    }
369
370    /// Register a JSON schema for a collection.
371    /// All subsequent writes to this collection must conform to this schema.
372    #[cfg(feature = "schema")]
373    pub fn set_schema(&self, collection: &str, schema: Value) -> Result<(), DbError> {
374        schema::set_schema(
375            &self.schemas,
376            &self.storage,
377            &self.tx,
378            collection,
379            schema
380        )
381    }
382    
383    /// Compact the log file — rewrite it to contain only the current state.
384    ///
385    /// This removes all dead entries (superseded INSERTs, DELETE tombstones)
386    /// and writes a binary snapshot for fast next startup.
387    ///
388    /// The compacted log contains:
389    ///   - One INSERT entry per live document (current value only).
390    ///   - One INDEX entry per registered index (index data is rebuilt on replay).
391    pub fn compact(&self) -> Result<(), DbError> {
392        info!("🔨 Starting Log Compaction...");
393
394        // Build the minimal set of entries representing the current state.
395        let mut entries = Vec::new();
396
397        // One INSERT per live document across all collections.
398        for col_ref in self.state.iter() {
399            let col_name = col_ref.key();
400            for item_ref in col_ref.value().iter() {
401                // To compact, we need the full Value. If it's Cold, we fetch it from storage.
402                let entry = match item_ref.value() {
403                    crate::engine::types::DocumentState::Hot(v) => {
404                        types::LogEntry::new(
405                            "INSERT".to_string(),
406                            col_name.clone(),
407                            item_ref.key().clone(),
408                            v.clone(),
409                        )
410                    }
411                    crate::engine::types::DocumentState::Cold(ptr) => {
412                        let bytes = self.storage.read_at(ptr.offset, ptr.length)?;
413                        serde_json::from_slice(&bytes)?
414                    }
415                };
416                entries.push(entry);
417            }
418        }
419
420        // One SCHEMA entry per collection.
421        #[cfg(feature = "schema")]
422        for schema_ref in self.schemas.iter() {
423            let col_name = schema_ref.key();
424            let (schema_json, _) = &**schema_ref.value();
425            entries.push(types::LogEntry::new(
426                "SCHEMA".to_string(),
427                col_name.clone(),
428                "".to_string(),
429                schema_json.clone(),
430            ));
431        }
432
433        // One INDEX entry per registered index.
434        // The index name format is "collection:field" — we split it to get both parts.
435        for index_ref in self.indexes.iter() {
436            let parts: Vec<&str> = index_ref.key().split(':').collect();
437            if parts.len() == 2 {
438                entries.push(types::LogEntry::new(
439                    "INDEX".to_string(),
440                    parts[0].to_string(),
441                    parts[1].to_string(),       // field name
442                    serde_json::json!(null),
443                ));
444            }
445        }
446
447        // Delegate the actual file rewrite (and snapshot write) to the storage backend.
448        self.storage.compact(entries.clone())?;
449
450        // After compaction the log is rewritten and all old RecordPointers are invalid.
451        // Promote every Cold entry in the in-memory state to Hot so subsequent reads
452        // don't try to seek to stale byte offsets in the now-truncated log file.
453        for entry in &entries {
454            if entry.cmd == "INSERT" {
455                if let Some(col) = self.state.get(&entry.collection) {
456                    if let Some(mut doc) = col.get_mut(&entry.key) {
457                        if matches!(*doc, crate::engine::types::DocumentState::Cold(_)) {
458                            *doc = crate::engine::types::DocumentState::Hot(entry.value.clone());
459                        }
460                    }
461                }
462            }
463        }
464
465        info!("✅ Log Compaction Finished!");
466        Ok(())
467    }
468
469    /// Evict documents from RAM to disk for a collection if it exceeds the threshold.
470    ///
471    /// This converts `Hot(Value)` entries into `Cold(RecordPointer)` entries.
472    /// In this v1, it re-scans the log to find the exact byte offsets for the documents.
473    pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
474        let col_len = if let Some(col) = self.state.get(collection) {
475            col.len()
476        } else {
477            return Err(DbError::CollectionNotFound);
478        };
479
480        if col_len <= limit {
481            return Ok(0);
482        }
483
484        let mut evicted_count = 0;
485        let mut offset = 0u64;
486        let to_evict = col_len - limit;
487
488        // To evict properly, we need the pointers. Since we don't store them for
489        // Hot documents, we re-scan the log to find them.
490        self.storage.stream_log_into(&mut |entry, length| {
491            if entry.collection == collection {
492                if evicted_count < to_evict {
493                    if let Some(col) = self.state.get(collection) {
494                        if let Some(mut doc_state) = col.get_mut(&entry.key) {
495                            if let crate::engine::types::DocumentState::Hot(_) = *doc_state {
496                                *doc_state = crate::engine::types::DocumentState::Cold(crate::engine::types::RecordPointer {
497                                    offset,
498                                    length,
499                                });
500                                evicted_count += 1;
501                            }
502                        }
503                    }
504                }
505            }
506            offset += (length + 1) as u64;
507            ControlFlow::Continue(())
508        })?;
509
510        Ok(evicted_count)
511    }
512
513    /// Recover the database state to a specific point in time or sequence number.
514    /// Returns the recovered state as a Vec of LogEntries that can be written to a snapshot.
515    ///
516    /// This is a utility function used by the CLI for PITR.
517    #[cfg(not(target_arch = "wasm32"))]
518    pub fn recover_to(
519        storage: &dyn StorageBackend,
520        to_time: Option<u64>,
521        to_seq: Option<u64>,
522    ) -> Result<Vec<LogEntry>, DbError> {
523        let state: DashMap<String, DashMap<String, crate::engine::types::DocumentState>> = DashMap::new();
524        let indexes: DashMap<String, DashMap<String, DashSet<String>>> = DashMap::new();
525        #[cfg(feature = "schema")]
526        let schemas: DashMap<String, Arc<(serde_json::Value, jsonschema::Validator)>> = DashMap::new();
527
528            let mut offset = 0u64;
529            let mut count = 0u64;
530            let mut current_tx_entries = Vec::new();
531            let mut current_tx_id = None;
532            
533            storage.stream_log_into(&mut |entry, length| {
534                // Condition 1: Check Timestamp
535                if let Some(t) = to_time {
536                    if entry._t > t {
537                        return ControlFlow::Break(());
538                    }
539                }
540    
541                // Condition 2: Check Sequence
542                if let Some(s) = to_seq {
543                    if count >= s {
544                        return ControlFlow::Break(());
545                    }
546                }
547
548            let pointer = crate::engine::types::RecordPointer {
549                offset,
550                length,
551            };
552
553            match entry.cmd.as_str() {
554                "TX_BEGIN" => {
555                    current_tx_id = Some(entry.key.clone());
556                    current_tx_entries.clear();
557                }
558                "TX_COMMIT" => {
559                    if current_tx_id.as_ref() == Some(&entry.key) {
560                        for (e, p) in current_tx_entries.drain(..) {
561                            crate::engine::storage::apply_entry(
562                                &e,
563                                &state,
564                                &indexes,
565                                #[cfg(feature = "schema")] &schemas,
566                                Some(p),
567                            );
568                        }
569                        current_tx_id = None;
570                    }
571                }
572                _ => {
573                    if current_tx_id.is_some() {
574                        current_tx_entries.push((entry, pointer));
575                    } else {
576                        crate::engine::storage::apply_entry(
577                            &entry,
578                            &state,
579                            &indexes,
580                            #[cfg(feature = "schema")] &schemas,
581                            Some(pointer),
582                        );
583                    }
584                }
585            }
586
587            count += 1;
588            offset += (length + 1) as u64;
589            ControlFlow::Continue(())
590        })?;
591
592        // Convert the recovered state into LogEntries (similar to compact logic)
593        let mut entries = Vec::new();
594        for col_ref in state.iter() {
595            let col_name = col_ref.key();
596            for item_ref in col_ref.value().iter() {
597                let entry = match item_ref.value() {
598                    crate::engine::types::DocumentState::Hot(v) => {
599                        LogEntry::new(
600                            "INSERT".to_string(),
601                            col_name.clone(),
602                            item_ref.key().clone(),
603                            v.clone(),
604                        )
605                    }
606                    crate::engine::types::DocumentState::Cold(ptr) => {
607                        let bytes = storage.read_at(ptr.offset, ptr.length).unwrap_or_default();
608                        serde_json::from_slice(&bytes).unwrap_or_else(|_| {
609                            LogEntry::new("INSERT".to_string(), col_name.clone(), item_ref.key().clone(), serde_json::Value::Null)
610                        })
611                    }
612                };
613                entries.push(entry);
614            }
615        }
616
617        #[cfg(feature = "schema")]
618        for schema_ref in schemas.iter() {
619            let col_name = schema_ref.key();
620            let (schema_json, _) = &**schema_ref.value();
621            entries.push(LogEntry::new(
622                "SCHEMA".to_string(),
623                col_name.clone(),
624                "".to_string(),
625                schema_json.clone(),
626            ));
627        }
628
629        for index_ref in indexes.iter() {
630            let parts: Vec<&str> = index_ref.key().split(':').collect();
631            if parts.len() == 2 {
632                entries.push(LogEntry::new(
633                    "INDEX".to_string(),
634                    parts[0].to_string(),
635                    parts[1].to_string(),
636                    serde_json::json!(null),
637                ));
638            }
639        }
640
641        Ok(entries)
642    }
643}