Skip to main content

moltendb_core/engine/
mod.rs

1// ─── engine/mod.rs ────────────────────────────────────────────────────────────
2// This is the root module of the database engine. It defines the `Db` struct —
3// the central object that the rest of the application interacts with.
4//
5// The Db struct is a thin, cloneable handle to the shared database state.
6// Cloning a Db is cheap — it just increments reference counts on the Arcs
7// inside. All clones share the same underlying data, so any write made through
8// one clone is immediately visible through all others. This is how Axum handler
9// functions can each receive their own Db clone via State<> extraction while
10// all operating on the same in-memory database.
11//
12// Internal structure:
13//   state        — the actual document data: collection → (key → JSON value)
14//   storage      — the persistence layer (disk, encrypted, or OPFS)
15//   tx           — broadcast channel for real-time WebSocket notifications
16//   indexes      — field indexes for fast WHERE queries
17//   query_heatmap — tracks query frequency for auto-indexing
18//
19// The Db struct has two constructors:
20//   open()      — native (server) build, opens a disk file
21//   open_wasm() — WASM (browser) build, opens an OPFS file
22// Both are conditionally compiled with #[cfg(...)] attributes.
23// ─────────────────────────────────────────────────────────────────────────────
24
25// Declare the sub-modules of the engine.
26mod types;      // LogEntry, DbError
27mod indexing;   // index_doc, unindex_doc, track_query, create_index
28mod storage;    // StorageBackend trait + concrete implementations
29mod config;     // DbConfig struct
30#[cfg(feature = "schema")]
31mod schema;     // JSON Schema validation
32mod operations; // get, get_all, insert, update, delete, etc.
33mod open;       // Db::open() — native constructor
34mod open_wasm;  // Db::open_wasm() — WASM constructor
35
36// Re-export LogEntry so it can be used by tests and other crates.
37pub use types::{DbError, LogEntry};
38// Re-export DbConfig
39pub use config::DbConfig;
40// Re-export the StorageBackend trait so callers can use it without knowing
41// the internal module structure.
42pub use storage::{StorageBackend, EncryptedStorage};
43#[cfg(not(target_arch = "wasm32"))]
44pub use storage::{AsyncDiskStorage, SyncDiskStorage};
45
46// DashMap = concurrent hash map. DashSet = concurrent hash set.
47use dashmap::{DashMap, DashSet};
48// Value = dynamically-typed JSON value.
49use serde_json::Value;
50// Standard HashMap — used for return values from get operations.
51use std::collections::HashMap;
52// Arc = thread-safe reference-counted pointer.
53// Wrapping fields in Arc allows Db to be cheaply cloned — all clones share
54// the same underlying data.
55use std::sync::Arc;
56// Tokio's broadcast channel: one sender, many receivers.
57// Used to push real-time change notifications to WebSocket subscribers.
58use tokio::sync::broadcast;
59
60/// The central database handle. Cheap to clone — all clones share the same state.
61///
62/// This struct is the public API of the engine. All database operations go
63/// through methods on this struct, which delegate to the operations module.
64#[derive(Clone)]
65pub struct Db {
66    /// The main document store.
67    /// Outer map: collection name (e.g. "users") → inner map.
68    /// Inner map: document key (e.g. "u1") → Hybrid Hot/Cold document state.
69    /// DashMap allows concurrent reads and writes from multiple threads.
70    state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
71
72    /// The storage backend — handles persistence to disk or OPFS.
73    /// `pub` so handlers can access it directly if needed (e.g. for compaction).
74    /// `Arc<dyn StorageBackend>` = shared pointer to any type implementing the trait.
75    pub storage: Arc<dyn StorageBackend>,
76
77    /// Broadcast channel sender for real-time change notifications.
78    /// When a document is inserted, updated, or deleted, a JSON event is sent
79    /// on this channel. WebSocket handlers subscribe to receive these events.
80    /// `pub` so the WebSocket handler in main.rs can call subscribe().
81    pub tx: broadcast::Sender<String>,
82
83    /// The index store.
84    /// Key format: "collection:field" (e.g. "users:role").
85    /// Value: field_value → set of document keys with that value.
86    /// e.g. "users:role" → { "admin" → {"u1"}, "user" → {"u2", "u3"} }
87    /// `pub` so handlers.rs can check for index existence directly.
88    pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
89
90    /// Query frequency counter for auto-indexing.
91    /// Key: "collection:field". Value: number of times queried.
92    /// When a field reaches 3 queries, an index is auto-created.
93    pub query_heatmap: Arc<DashMap<String, u32>>,
94
95    /// The maximum number of documents per collection to keep in RAM (Hot).
96    /// If a collection exceeds this, older documents are paged out to disk (Cold).
97    /// Default is 50,000.
98    pub hot_threshold: usize,
99
100    /// Max requests per window.
101    pub rate_limit_requests: u32,
102
103    /// Window size in seconds.
104    pub rate_limit_window: u64,
105
106    /// Maximum request body size in bytes.
107    pub max_body_size: usize,
108
109    /// Maximum keys allowed per request.
110    pub max_keys_per_request: usize,
111
112    /// Registered JSON schemas per collection.
113    /// Key: collection name → Value: (Original JSON, Compiled Validator).
114    #[cfg(feature = "schema")]
115    pub schemas: Arc<DashMap<String, Arc<(Value, jsonschema::Validator)>>>,
116
117    /// Optional shell command to execute after a successful backup.
118    /// Supports the {SNAPSHOT_PATH} placeholder.
119    pub post_backup_script: Option<String>,
120
121    /// Whether tiered (hot+cold) storage mode is active.
122    pub tiered_mode: bool,
123
124    /// Timestamp of when this Db instance was opened, used for uptime calculation.
125    #[cfg(not(target_arch = "wasm32"))]
126    pub started_at: std::time::Instant,
127}
128
129impl Db {
130    /// Returns the total number of hot (in-memory) keys across all collections.
131    pub fn hot_keys_count(&self) -> usize {
132        self.state.iter().map(|c| c.value().len()).sum()
133    }
134
135    /// Create a new broadcast receiver for real-time change notifications.
136    /// Each call returns an independent receiver — multiple WebSocket handlers
137    /// can each subscribe and receive all events independently.
138    pub fn subscribe(&self) -> broadcast::Receiver<String> {
139        self.tx.subscribe()
140    }
141
142    /// Retrieve documents by their keys. Returns a HashMap of found key→value pairs.
143    /// Missing keys are silently skipped. Pass a single key to retrieve one document.
144    pub fn get(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
145        operations::get(&self.state, &self.storage, collection, keys)
146    }
147
148    /// Retrieve all documents in a collection as a HashMap.
149    pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
150        operations::get_all(&self.state, &self.storage, collection)
151    }
152
153    /// Insert or overwrite multiple documents in one call.
154    /// Each item is a (key, value) pair. Writes are persisted to storage.
155    pub fn insert(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
156        operations::insert(
157            &self.state,
158            &self.indexes,
159            &self.storage,
160            &self.tx,
161            #[cfg(feature = "schema")] &self.schemas,
162            collection,
163            items,
164        )?;
165
166        // Auto-evict if the collection exceeds the threshold.
167        let _ = self.evict_collection(collection, self.hot_threshold);
168        Ok(())
169    }
170
171    /// Partially update a document — merges `updates` into the existing document.
172    /// Returns true if the document was found and updated, false if not found.
173    pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
174        let updated = operations::update(
175            &self.state,
176            &self.indexes,
177            &self.storage,
178            &self.tx,
179            #[cfg(feature = "schema")] &self.schemas,
180            collection,
181            key,
182            updates,
183        )?;
184
185        if updated {
186            // Auto-evict if the collection exceeds the threshold.
187            let _ = self.evict_collection(collection, self.hot_threshold);
188        }
189        Ok(updated)
190    }
191
192    /// Delete one or more documents by key. Pass a single key to delete one document.
193    pub fn delete(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
194        operations::delete(
195            &self.state,
196            &self.indexes,
197            &self.storage,
198            &self.tx,
199            collection,
200            keys,
201        )
202    }
203
204    /// Drop an entire collection — removes all documents and its indexes.
205    pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
206        operations::delete_collection(
207            &self.state,
208            &self.indexes,
209            &self.storage,
210            &self.tx,
211            collection,
212        )
213    }
214
215    /// Track that `field` was queried in `collection` and auto-create an index
216    /// if this field has been queried 3 or more times.
217    /// Errors are silently ignored — auto-indexing is best-effort.
218    pub fn track_query(&self, collection: &str, field: &str) {
219        // The `let _ =` discards the Result — a failed auto-index is not fatal.
220        let _ = indexing::track_query(
221            &self.indexes,
222            &self.query_heatmap,
223            collection,
224            field,
225            &self.storage,
226            &self.state,
227        );
228    }
229
230    /// Register a JSON schema for a collection.
231    /// All subsequent writes to this collection must conform to this schema.
232    #[cfg(feature = "schema")]
233    pub fn set_schema(&self, collection: &str, schema: Value) -> Result<(), DbError> {
234        schema::set_schema(
235            &self.schemas,
236            &self.storage,
237            &self.tx,
238            collection,
239            schema
240        )
241    }
242    
243    /// Wipe all in-memory state — documents, indexes, and query heatmap.
244    /// Used by the WASM layer when a browser tab unloads in in-memory mode,
245    /// so that any tab refresh clears the shared RAM store for all tabs.
246    pub fn clear_all(&self) {
247        self.state.clear();
248        self.indexes.clear();
249        self.query_heatmap.clear();
250        #[cfg(feature = "schema")]
251        self.schemas.clear();
252    }
253
254    /// Compact the log file — rewrite it to contain only the current state.
255    ///
256    /// This removes all dead entries (superseded INSERTs, DELETE tombstones)
257    /// and writes a binary snapshot for fast next startup.
258    ///
259    /// The compacted log contains:
260    ///   - One INSERT entry per live document (current value only).
261    ///   - One INDEX entry per registered index (index data is rebuilt on replay).
262    pub fn compact(&self) -> Result<(), DbError> {
263        let entries = operations::compact(
264            &self.state,
265            #[cfg(feature = "schema")] &self.schemas,
266            &self.indexes,
267            &*self.storage,
268            self.post_backup_script.clone(),
269        )?;
270
271        // After compaction the log is rewritten and all old RecordPointers are invalid.
272        // Promote every Cold entry in the in-memory state to Hot so subsequent reads
273        // don't try to seek to stale byte offsets in the now-truncated log file.
274        for entry in &entries {
275            if entry.cmd == "INSERT" {
276                if let Some(col) = self.state.get(&entry.collection) {
277                    if let Some(mut doc) = col.get_mut(&entry.key) {
278                        if matches!(*doc, crate::engine::types::DocumentState::Cold(_)) {
279                            *doc = crate::engine::types::DocumentState::Hot(entry.value.clone());
280                        }
281                    }
282                }
283            }
284        }
285
286        Ok(())
287    }
288
289    /// Evict documents from RAM to disk for a collection if it exceeds the threshold.
290    ///
291    /// This converts `Hot(Value)` entries into `Cold(RecordPointer)` entries.
292    /// In this v1, it re-scans the log to find the exact byte offsets for the documents.
293    pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
294        operations::evict_collection(&self.state, &*self.storage, collection, limit)
295    }
296
297    /// Recover the database state to a specific point in time or sequence number.
298    /// Returns the recovered state as a Vec of LogEntries that can be written to a snapshot.
299    ///
300    /// This is a utility function used by the CLI for PITR.
301    #[cfg(not(target_arch = "wasm32"))]
302    pub fn recover_to(
303        storage: &dyn StorageBackend,
304        to_time: Option<u64>,
305        to_seq: Option<u64>,
306    ) -> Result<Vec<LogEntry>, DbError> {
307        operations::recover_to(storage, to_time, to_seq)
308    }
309}