moltendb_core/engine/mod.rs
1// ─── engine/mod.rs ────────────────────────────────────────────────────────────
2// This is the root module of the database engine. It defines the `Db` struct —
3// the central object that the rest of the application interacts with.
4//
5// The Db struct is a thin, cloneable handle to the shared database state.
6// Cloning a Db is cheap — it just increments reference counts on the Arcs
7// inside. All clones share the same underlying data, so any write made through
8// one clone is immediately visible through all others. This is how Axum handler
9// functions can each receive their own Db clone via State<> extraction while
10// all operating on the same in-memory database.
11//
12// Internal structure:
13// state — the actual document data: collection → (key → JSON value)
14// storage — the persistence layer (disk, encrypted, or OPFS)
15// tx — broadcast channel for real-time WebSocket notifications
16// indexes — field indexes for fast WHERE queries
17// query_heatmap — tracks query frequency for auto-indexing
18//
19// The Db struct has two constructors:
20// open() — native (server) build, opens a disk file
21// open_wasm() — WASM (browser) build, opens an OPFS file
22// Both are conditionally compiled with #[cfg(...)] attributes.
23// ─────────────────────────────────────────────────────────────────────────────
24
25// Declare the sub-modules of the engine.
26mod types; // LogEntry, DbError
27mod indexing; // index_doc, unindex_doc, track_query, create_index
28mod storage; // StorageBackend trait + concrete implementations
29mod operations; // get, get_all, insert_batch, update, delete, etc.
30
31// Re-export DbError so callers can write `engine::DbError` instead of
32// `engine::types::DbError`.
33pub use types::DbError;
34// Re-export the StorageBackend trait so callers can use it without knowing
35// the internal module structure.
36pub use storage::{StorageBackend, EncryptedStorage};
37
38// DashMap = concurrent hash map. DashSet = concurrent hash set.
39use dashmap::{DashMap, DashSet};
40use tracing::{info};
41// Value = dynamically-typed JSON value.
42use serde_json::Value;
43// Standard HashMap — used for return values from get operations.
44use std::collections::HashMap;
45// Arc = thread-safe reference-counted pointer.
46// Wrapping fields in Arc allows Db to be cheaply cloned — all clones share
47// the same underlying data.
48use std::sync::Arc;
49// Tokio's broadcast channel: one sender, many receivers.
50// Used to push real-time change notifications to WebSocket subscribers.
51use tokio::sync::broadcast;
52
53/// The central database handle. Cheap to clone — all clones share the same state.
54///
55/// This struct is the public API of the engine. All database operations go
56/// through methods on this struct, which delegate to the operations module.
57#[derive(Clone)]
58pub struct Db {
59 /// The main document store.
60 /// Outer map: collection name (e.g. "users") → inner map.
61 /// Inner map: document key (e.g. "u1") → Hybrid Hot/Cold document state.
62 /// DashMap allows concurrent reads and writes from multiple threads.
63 state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
64
65 /// The storage backend — handles persistence to disk or OPFS.
66 /// `pub` so handlers can access it directly if needed (e.g. for compaction).
67 /// `Arc<dyn StorageBackend>` = shared pointer to any type implementing the trait.
68 pub storage: Arc<dyn StorageBackend>,
69
70 /// Broadcast channel sender for real-time change notifications.
71 /// When a document is inserted, updated, or deleted, a JSON event is sent
72 /// on this channel. WebSocket handlers subscribe to receive these events.
73 /// `pub` so the WebSocket handler in main.rs can call subscribe().
74 pub tx: broadcast::Sender<String>,
75
76 /// The index store.
77 /// Key format: "collection:field" (e.g. "users:role").
78 /// Value: field_value → set of document keys with that value.
79 /// e.g. "users:role" → { "admin" → {"u1"}, "user" → {"u2", "u3"} }
80 /// `pub` so handlers.rs can check for index existence directly.
81 pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
82
83 /// Query frequency counter for auto-indexing.
84 /// Key: "collection:field". Value: number of times queried.
85 /// When a field reaches 3 queries, an index is auto-created.
86 pub query_heatmap: Arc<DashMap<String, u32>>,
87
88 /// The maximum number of documents per collection to keep in RAM (Hot).
89 /// If a collection exceeds this, older documents are paged out to disk (Cold).
90 /// Default is 50,000.
91 pub hot_threshold: usize,
92
93 /// Max requests per window.
94 pub rate_limit_requests: u32,
95
96 /// Window size in seconds.
97 pub rate_limit_window: u64,
98
99 /// Maximum request body size in bytes.
100 pub max_body_size: usize,
101}
102
103impl Db {
104 /// Open (or create) a database at the given file path.
105 /// Only available on native (non-WASM) builds.
106 ///
107 /// `sync_mode` — if true, use SyncDiskStorage (flush on every write).
108 /// if false, use AsyncDiskStorage (flush every 50ms).
109 /// Ignored when `tiered_mode` is true.
110 /// `tiered_mode` — if true, use TieredStorage (hot + cold two-tier backend).
111 /// Hot writes go to the active log; cold data is archived and
112 /// read via mmap on startup. Best for large datasets (100k+ docs).
113 /// Enable with STORAGE_MODE=tiered environment variable.
114 /// `encryption_key` — if Some, wrap the storage in EncryptedStorage.
115 /// if None, data is stored in plaintext (not recommended).
116 #[cfg(not(target_arch = "wasm32"))]
117 pub fn open(
118 path: &str,
119 sync_mode: bool,
120 tiered_mode: bool,
121 hot_threshold: usize,
122 rate_limit_requests: u32,
123 rate_limit_window: u64,
124 max_body_size: usize,
125 encryption_key: Option<&[u8; 32]>,
126 ) -> Result<Self, DbError> {
127 // Create the shared in-memory state containers.
128 let state = Arc::new(DashMap::new());
129 // Create the broadcast channel with a buffer of 100 messages.
130 // If the buffer fills up (no subscribers reading), old messages are dropped.
131 let (tx, _rx) = broadcast::channel(100);
132 let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
133 Arc::new(Default::default());
134 let query_heatmap = Arc::new(Default::default());
135
136 // Choose the base storage backend based on the configured mode.
137 //
138 // tiered_mode = true → TieredStorage: hot log (async writes) + cold log
139 // (mmap reads). Best for large datasets. The cold log
140 // accumulates promoted hot data and is paged by the OS.
141 //
142 // sync_mode = true → SyncDiskStorage: every write is flushed to disk
143 // immediately. Zero data loss, lower throughput.
144 //
145 // default → AsyncDiskStorage: writes buffered in memory, flushed
146 // every 50ms. Highest throughput, up to 50ms data loss.
147 let base_storage: Arc<dyn StorageBackend> = if tiered_mode {
148 Arc::new(storage::TieredStorage::new(path)?)
149 } else if sync_mode {
150 Arc::new(storage::SyncDiskStorage::new(path)?)
151 } else {
152 Arc::new(storage::AsyncDiskStorage::new(path)?)
153 };
154
155 // Optionally wrap the base storage in EncryptedStorage.
156 // EncryptedStorage is transparent — it encrypts on write and decrypts
157 // on read, so the rest of the engine doesn't know encryption is happening.
158 let storage: Arc<dyn StorageBackend> = if let Some(key) = encryption_key {
159 Arc::new(storage::EncryptedStorage::new(base_storage, key))
160 } else {
161 base_storage
162 };
163
164 // Replay the log (or snapshot + delta) into the in-memory state.
165 // After this call, `state` and `indexes` reflect the persisted data.
166 storage::stream_into_state(&*storage, &state, &indexes)?;
167
168 Ok(Self {
169 state,
170 storage,
171 tx,
172 indexes,
173 query_heatmap,
174 hot_threshold,
175 rate_limit_requests,
176 rate_limit_window,
177 max_body_size,
178 })
179 }
180
181 /// Open (or create) a database in the browser using OPFS.
182 /// Only available on WASM builds. Async because OPFS APIs return Promises.
183 ///
184 /// `db_name` — the filename in the OPFS root directory (e.g. "analytics_db").
185 #[cfg(target_arch = "wasm32")]
186 pub async fn open_wasm(
187 db_name: &str,
188 hot_threshold: usize,
189 rate_limit_requests: u32,
190 rate_limit_window: u64,
191 max_body_size: usize,
192 encryption_key: Option<&[u8; 32]>,
193 sync_mode: bool,
194 ) -> Result<Self, DbError> {
195 let state = Arc::new(DashMap::new());
196 let (tx, _rx) = broadcast::channel(100);
197 let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
198 Arc::new(Default::default());
199 let query_heatmap = Arc::new(Default::default());
200
201 // Open the OPFS file. This is async because the browser's OPFS API
202 // uses Promises which we must await.
203 let mut storage: Arc<dyn StorageBackend> =
204 Arc::new(storage::OpfsStorage::new(db_name, sync_mode).await?);
205
206 // Apply encryption wrapper if a key is provided.
207 if let Some(key) = encryption_key {
208 storage = Arc::new(storage::EncryptedStorage::new(storage, key));
209 }
210
211 // Replay the log into the in-memory state.
212 storage::stream_into_state(&*storage, &state, &indexes)?;
213
214 Ok(Self {
215 state,
216 storage,
217 tx,
218 indexes,
219 query_heatmap,
220 hot_threshold,
221 rate_limit_requests,
222 rate_limit_window,
223 max_body_size,
224 })
225 }
226
227 /// Create a new broadcast receiver for real-time change notifications.
228 /// Each call returns an independent receiver — multiple WebSocket handlers
229 /// can each subscribe and receive all events independently.
230 pub fn subscribe(&self) -> broadcast::Receiver<String> {
231 self.tx.subscribe()
232 }
233
234 /// Retrieve a single document by key. Returns None if not found.
235 pub fn get(&self, collection: &str, key: &str) -> Option<Value> {
236 operations::get(&self.state, &self.storage, collection, key)
237 }
238
239 /// Retrieve all documents in a collection as a HashMap.
240 pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
241 operations::get_all(&self.state, &self.storage, collection)
242 }
243
244 /// Retrieve a specific set of documents by their keys.
245 pub fn get_batch(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
246 operations::get_batch(&self.state, &self.storage, collection, keys)
247 }
248
249 /// Insert or overwrite multiple documents in one call.
250 /// Each item is a (key, value) pair. Writes are persisted to storage.
251 pub fn insert_batch(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
252 operations::insert_batch(
253 &self.state,
254 &self.indexes,
255 &self.storage,
256 &self.tx,
257 collection,
258 items,
259 )?;
260
261 // Auto-evict if the collection exceeds the threshold.
262 let _ = self.evict_collection(collection, self.hot_threshold);
263 Ok(())
264 }
265
266 /// Partially update a document — merges `updates` into the existing document.
267 /// Returns true if the document was found and updated, false if not found.
268 pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
269 let updated = operations::update(
270 &self.state,
271 &self.indexes,
272 &self.storage,
273 &self.tx,
274 collection,
275 key,
276 updates,
277 )?;
278
279 if updated {
280 // Auto-evict if the collection exceeds the threshold.
281 let _ = self.evict_collection(collection, self.hot_threshold);
282 }
283 Ok(updated)
284 }
285
286 /// Delete a single document by key.
287 pub fn delete(&self, collection: &str, key: &str) -> Result<(), DbError> {
288 operations::delete(
289 &self.state,
290 &self.indexes,
291 &self.storage,
292 &self.tx,
293 collection,
294 key,
295 )
296 }
297
298 /// Delete multiple documents by key in one call.
299 pub fn delete_batch(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
300 operations::delete_batch(
301 &self.state,
302 &self.indexes,
303 &self.storage,
304 &self.tx,
305 collection,
306 keys,
307 )
308 }
309
310 /// Drop an entire collection — removes all documents and its indexes.
311 pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
312 operations::delete_collection(
313 &self.state,
314 &self.indexes,
315 &self.storage,
316 &self.tx,
317 collection,
318 )
319 }
320
321 /// Track that `field` was queried in `collection` and auto-create an index
322 /// if this field has been queried 3 or more times.
323 /// Errors are silently ignored — auto-indexing is best-effort.
324 pub fn track_query(&self, collection: &str, field: &str) {
325 // The `let _ =` discards the Result — a failed auto-index is not fatal.
326 let _ = indexing::track_query(
327 &self.indexes,
328 &self.query_heatmap,
329 collection,
330 field,
331 &self.storage,
332 &self.state,
333 );
334 }
335
336 /// Compact the log file — rewrite it to contain only the current state.
337 ///
338 /// This removes all dead entries (superseded INSERTs, DELETE tombstones)
339 /// and writes a binary snapshot for fast next startup.
340 ///
341 /// The compacted log contains:
342 /// - One INSERT entry per live document (current value only).
343 /// - One INDEX entry per registered index (index data is rebuilt on replay).
344 pub fn compact(&self) -> Result<(), DbError> {
345 info!("🔨 Starting Log Compaction...");
346
347 // Build the minimal set of entries representing the current state.
348 let mut entries = Vec::new();
349
350 // One INSERT per live document across all collections.
351 for col_ref in self.state.iter() {
352 let col_name = col_ref.key();
353 for item_ref in col_ref.value().iter() {
354 // To compact, we need the full Value. If it's Cold, we fetch it from storage.
355 let value = match item_ref.value() {
356 crate::engine::types::DocumentState::Hot(v) => v.clone(),
357 crate::engine::types::DocumentState::Cold(ptr) => {
358 let bytes = self.storage.read_at(ptr.offset, ptr.length)?;
359 let log_entry: crate::engine::types::LogEntry = serde_json::from_slice(&bytes)?;
360 log_entry.value
361 }
362 };
363 entries.push(types::LogEntry {
364 cmd: "INSERT".to_string(),
365 collection: col_name.clone(),
366 key: item_ref.key().clone(),
367 value,
368 });
369 }
370 }
371
372 // One INDEX entry per registered index.
373 // The index name format is "collection:field" — we split it to get both parts.
374 for index_ref in self.indexes.iter() {
375 let parts: Vec<&str> = index_ref.key().split(':').collect();
376 if parts.len() == 2 {
377 entries.push(types::LogEntry {
378 cmd: "INDEX".to_string(),
379 collection: parts[0].to_string(),
380 key: parts[1].to_string(), // field name
381 value: serde_json::json!(null),
382 });
383 }
384 }
385
386 // Delegate the actual file rewrite (and snapshot write) to the storage backend.
387 self.storage.compact(entries)?;
388
389 info!("✅ Log Compaction Finished!");
390 Ok(())
391 }
392
393 /// Evict documents from RAM to disk for a collection if it exceeds the threshold.
394 ///
395 /// This converts `Hot(Value)` entries into `Cold(RecordPointer)` entries.
396 /// In this v1, it re-scans the log to find the exact byte offsets for the documents.
397 pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
398 let col_len = if let Some(col) = self.state.get(collection) {
399 col.len()
400 } else {
401 return Err(DbError::CollectionNotFound);
402 };
403
404 if col_len <= limit {
405 return Ok(0);
406 }
407
408 let mut evicted_count = 0;
409 let mut offset = 0u64;
410 let to_evict = col_len - limit;
411
412 // To evict properly, we need the pointers. Since we don't store them for
413 // Hot documents, we re-scan the log to find them.
414 self.storage.stream_log_into(&mut |entry, length| {
415 if entry.collection == collection {
416 if evicted_count < to_evict {
417 if let Some(col) = self.state.get(collection) {
418 if let Some(mut doc_state) = col.get_mut(&entry.key) {
419 if let crate::engine::types::DocumentState::Hot(_) = *doc_state {
420 *doc_state = crate::engine::types::DocumentState::Cold(crate::engine::types::RecordPointer {
421 offset,
422 length,
423 });
424 evicted_count += 1;
425 }
426 }
427 }
428 }
429 offset += (length + 1) as u64;
430 } else {
431 offset += (length + 1) as u64;
432 }
433 })?;
434
435 Ok(evicted_count)
436 }
437}