moltendb_core/engine/mod.rs
1// ─── engine/mod.rs ────────────────────────────────────────────────────────────
2// This is the root module of the database engine. It defines the `Db` struct —
3// the central object that the rest of the application interacts with.
4//
5// The Db struct is a thin, cloneable handle to the shared database state.
6// Cloning a Db is cheap — it just increments reference counts on the Arcs
7// inside. All clones share the same underlying data, so any write made through
8// one clone is immediately visible through all others. This is how Axum handler
9// functions can each receive their own Db clone via State<> extraction while
10// all operating on the same in-memory database.
11//
12// Internal structure:
13// state — the actual document data: collection → (key → JSON value)
14// storage — the persistence layer (disk, encrypted, or OPFS)
15// tx — broadcast channel for real-time WebSocket notifications
16// indexes — field indexes for fast WHERE queries
17// query_heatmap — tracks query frequency for auto-indexing
18//
19// The Db struct has two constructors:
20// open() — native (server) build, opens a disk file
21// open_wasm() — WASM (browser) build, opens an OPFS file
22// Both are conditionally compiled with #[cfg(...)] attributes.
23// ─────────────────────────────────────────────────────────────────────────────
24
25// Declare the sub-modules of the engine.
26mod types; // LogEntry, DbError
27mod indexing; // index_doc, unindex_doc, track_query, create_index
28mod storage; // StorageBackend trait + concrete implementations
29#[cfg(feature = "schema")]
30mod schema; // JSON Schema validation
31mod operations; // get, get_all, insert_batch, update, delete, etc.
32
33// Re-export LogEntry so it can be used by tests and other crates.
34pub use types::{DbError, LogEntry};
35// Re-export the StorageBackend trait so callers can use it without knowing
36// the internal module structure.
37pub use storage::{StorageBackend, EncryptedStorage};
38#[cfg(not(target_arch = "wasm32"))]
39pub use storage::{AsyncDiskStorage, SyncDiskStorage};
40
41// DashMap = concurrent hash map. DashSet = concurrent hash set.
42use dashmap::{DashMap, DashSet};
43use tracing::{info};
44// Value = dynamically-typed JSON value.
45use serde_json::Value;
46// Standard HashMap — used for return values from get operations.
47use std::collections::HashMap;
48// Arc = thread-safe reference-counted pointer.
49// Wrapping fields in Arc allows Db to be cheaply cloned — all clones share
50// the same underlying data.
51use std::ops::ControlFlow;
52use std::sync::Arc;
53// Tokio's broadcast channel: one sender, many receivers.
54// Used to push real-time change notifications to WebSocket subscribers.
55use tokio::sync::broadcast;
56
57/// The central database handle. Cheap to clone — all clones share the same state.
58///
59/// This struct is the public API of the engine. All database operations go
60/// through methods on this struct, which delegate to the operations module.
61#[derive(Clone)]
62pub struct Db {
63 /// The main document store.
64 /// Outer map: collection name (e.g. "users") → inner map.
65 /// Inner map: document key (e.g. "u1") → Hybrid Hot/Cold document state.
66 /// DashMap allows concurrent reads and writes from multiple threads.
67 state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
68
69 /// The storage backend — handles persistence to disk or OPFS.
70 /// `pub` so handlers can access it directly if needed (e.g. for compaction).
71 /// `Arc<dyn StorageBackend>` = shared pointer to any type implementing the trait.
72 pub storage: Arc<dyn StorageBackend>,
73
74 /// Broadcast channel sender for real-time change notifications.
75 /// When a document is inserted, updated, or deleted, a JSON event is sent
76 /// on this channel. WebSocket handlers subscribe to receive these events.
77 /// `pub` so the WebSocket handler in main.rs can call subscribe().
78 pub tx: broadcast::Sender<String>,
79
80 /// The index store.
81 /// Key format: "collection:field" (e.g. "users:role").
82 /// Value: field_value → set of document keys with that value.
83 /// e.g. "users:role" → { "admin" → {"u1"}, "user" → {"u2", "u3"} }
84 /// `pub` so handlers.rs can check for index existence directly.
85 pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
86
87 /// Query frequency counter for auto-indexing.
88 /// Key: "collection:field". Value: number of times queried.
89 /// When a field reaches 3 queries, an index is auto-created.
90 pub query_heatmap: Arc<DashMap<String, u32>>,
91
92 /// The maximum number of documents per collection to keep in RAM (Hot).
93 /// If a collection exceeds this, older documents are paged out to disk (Cold).
94 /// Default is 50,000.
95 pub hot_threshold: usize,
96
97 /// Max requests per window.
98 pub rate_limit_requests: u32,
99
100 /// Window size in seconds.
101 pub rate_limit_window: u64,
102
103 /// Maximum request body size in bytes.
104 pub max_body_size: usize,
105
106 /// Registered JSON schemas per collection.
107 /// Key: collection name → Value: (Original JSON, Compiled Validator).
108 #[cfg(feature = "schema")]
109 pub schemas: Arc<DashMap<String, Arc<(Value, jsonschema::Validator)>>>,
110}
111
112impl Db {
113 /// Open (or create) a database at the given file path.
114 /// Only available on native (non-WASM) builds.
115 ///
116 /// `sync_mode` — if true, use SyncDiskStorage (flush on every write).
117 /// if false, use AsyncDiskStorage (flush every 50ms).
118 /// Ignored when `tiered_mode` is true.
119 /// `tiered_mode` — if true, use TieredStorage (hot + cold two-tier backend).
120 /// Hot writes go to the active log; cold data is archived and
121 /// read via mmap on startup. Best for large datasets (100k+ docs).
122 /// Enable with STORAGE_MODE=tiered environment variable.
123 /// `encryption_key` — if Some, wrap the storage in EncryptedStorage.
124 /// if None, data is stored in plaintext (not recommended).
125 #[cfg(not(target_arch = "wasm32"))]
126 pub fn open(
127 path: &str,
128 sync_mode: bool,
129 tiered_mode: bool,
130 hot_threshold: usize,
131 rate_limit_requests: u32,
132 rate_limit_window: u64,
133 max_body_size: usize,
134 encryption_key: Option<&[u8; 32]>,
135 ) -> Result<Self, DbError> {
136 // Create the shared in-memory state containers.
137 let state = Arc::new(DashMap::new());
138 // Create the broadcast channel with a buffer of 100 messages.
139 // If the buffer fills up (no subscribers reading), old messages are dropped.
140 let (tx, _rx) = broadcast::channel(100);
141 let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
142 Arc::new(Default::default());
143 let query_heatmap = Arc::new(Default::default());
144 #[cfg(feature = "schema")]
145 let schemas = Arc::new(DashMap::new());
146
147 // Ensure the parent directory exists.
148 if let Some(parent) = std::path::Path::new(path).parent() {
149 std::fs::create_dir_all(parent)?;
150 }
151
152 // Choose the base storage backend based on the configured mode.
153 //
154 // tiered_mode = true → TieredStorage: hot log (async writes) + cold log
155 // (mmap reads). Best for large datasets. The cold log
156 // accumulates promoted hot data and is paged by the OS.
157 //
158 // sync_mode = true → SyncDiskStorage: every write is flushed to disk
159 // immediately. Zero data loss, lower throughput.
160 //
161 // default → AsyncDiskStorage: writes buffered in memory, flushed
162 // every 50ms. Highest throughput, up to 50ms data loss.
163 let base_storage: Arc<dyn StorageBackend> = if tiered_mode {
164 Arc::new(storage::TieredStorage::new(path)?)
165 } else if sync_mode {
166 Arc::new(storage::SyncDiskStorage::new(path)?)
167 } else {
168 Arc::new(storage::AsyncDiskStorage::new(path)?)
169 };
170
171 // Optionally wrap the base storage in EncryptedStorage.
172 // EncryptedStorage is transparent — it encrypts on write and decrypts
173 // on read, so the rest of the engine doesn't know encryption is happening.
174 let storage: Arc<dyn StorageBackend> = if let Some(key) = encryption_key {
175 Arc::new(storage::EncryptedStorage::new(base_storage, key))
176 } else {
177 base_storage
178 };
179
180 // Replay the log (or snapshot + delta) into the in-memory state.
181 // After this call, `state` and `indexes` reflect the persisted data.
182 storage::stream_into_state(
183 &*storage,
184 &state,
185 &indexes,
186 #[cfg(feature = "schema")] &schemas,
187 )?;
188
189 Ok(Self {
190 state,
191 storage,
192 tx,
193 indexes,
194 query_heatmap,
195 hot_threshold,
196 rate_limit_requests,
197 rate_limit_window,
198 max_body_size,
199 #[cfg(feature = "schema")]
200 schemas,
201 })
202 }
203
204 /// Open (or create) a database in the browser using OPFS.
205 /// Only available on WASM builds. Async because OPFS APIs return Promises.
206 ///
207 /// `db_name` — the filename in the OPFS root directory (e.g. "analytics_db").
208 #[cfg(target_arch = "wasm32")]
209 pub async fn open_wasm(
210 db_name: &str,
211 hot_threshold: usize,
212 rate_limit_requests: u32,
213 rate_limit_window: u64,
214 max_body_size: usize,
215 encryption_key: Option<&[u8; 32]>,
216 sync_mode: bool,
217 ) -> Result<Self, DbError> {
218 let state = Arc::new(DashMap::new());
219 let (tx, _rx) = broadcast::channel(100);
220 let indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>> =
221 Arc::new(Default::default());
222 let query_heatmap = Arc::new(Default::default());
223 #[cfg(feature = "schema")]
224 let schemas = Arc::new(DashMap::new());
225
226 // Open the OPFS file. This is async because the browser's OPFS API
227 // uses Promises which we must await.
228 let mut storage: Arc<dyn StorageBackend> =
229 Arc::new(storage::OpfsStorage::new(db_name, sync_mode).await?);
230
231 // Apply encryption wrapper if a key is provided.
232 if let Some(key) = encryption_key {
233 storage = Arc::new(storage::EncryptedStorage::new(storage, key));
234 }
235
236 // Replay the log into the in-memory state.
237 storage::stream_into_state(
238 &*storage,
239 &state,
240 &indexes,
241 #[cfg(feature = "schema")] &schemas,
242 )?;
243
244 Ok(Self {
245 state,
246 storage,
247 tx,
248 indexes,
249 query_heatmap,
250 hot_threshold,
251 rate_limit_requests,
252 rate_limit_window,
253 max_body_size,
254 #[cfg(feature = "schema")]
255 schemas,
256 })
257 }
258
259 /// Create a new broadcast receiver for real-time change notifications.
260 /// Each call returns an independent receiver — multiple WebSocket handlers
261 /// can each subscribe and receive all events independently.
262 pub fn subscribe(&self) -> broadcast::Receiver<String> {
263 self.tx.subscribe()
264 }
265
266 /// Retrieve a single document by key. Returns None if not found.
267 pub fn get(&self, collection: &str, key: &str) -> Option<Value> {
268 operations::get(&self.state, &self.storage, collection, key)
269 }
270
271 /// Retrieve all documents in a collection as a HashMap.
272 pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
273 operations::get_all(&self.state, &self.storage, collection)
274 }
275
276 /// Retrieve a specific set of documents by their keys.
277 pub fn get_batch(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
278 operations::get_batch(&self.state, &self.storage, collection, keys)
279 }
280
281 /// Insert or overwrite multiple documents in one call.
282 /// Each item is a (key, value) pair. Writes are persisted to storage.
283 pub fn insert_batch(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
284 operations::insert_batch(
285 &self.state,
286 &self.indexes,
287 &self.storage,
288 &self.tx,
289 #[cfg(feature = "schema")] &self.schemas,
290 collection,
291 items,
292 )?;
293
294 // Auto-evict if the collection exceeds the threshold.
295 let _ = self.evict_collection(collection, self.hot_threshold);
296 Ok(())
297 }
298
299 /// Partially update a document — merges `updates` into the existing document.
300 /// Returns true if the document was found and updated, false if not found.
301 pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
302 let updated = operations::update(
303 &self.state,
304 &self.indexes,
305 &self.storage,
306 &self.tx,
307 #[cfg(feature = "schema")] &self.schemas,
308 collection,
309 key,
310 updates,
311 )?;
312
313 if updated {
314 // Auto-evict if the collection exceeds the threshold.
315 let _ = self.evict_collection(collection, self.hot_threshold);
316 }
317 Ok(updated)
318 }
319
320 /// Delete a single document by key.
321 pub fn delete(&self, collection: &str, key: &str) -> Result<(), DbError> {
322 operations::delete(
323 &self.state,
324 &self.indexes,
325 &self.storage,
326 &self.tx,
327 collection,
328 key,
329 )
330 }
331
332 /// Delete multiple documents by key in one call.
333 pub fn delete_batch(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
334 operations::delete_batch(
335 &self.state,
336 &self.indexes,
337 &self.storage,
338 &self.tx,
339 collection,
340 keys,
341 )
342 }
343
344 /// Drop an entire collection — removes all documents and its indexes.
345 pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
346 operations::delete_collection(
347 &self.state,
348 &self.indexes,
349 &self.storage,
350 &self.tx,
351 collection,
352 )
353 }
354
355 /// Track that `field` was queried in `collection` and auto-create an index
356 /// if this field has been queried 3 or more times.
357 /// Errors are silently ignored — auto-indexing is best-effort.
358 pub fn track_query(&self, collection: &str, field: &str) {
359 // The `let _ =` discards the Result — a failed auto-index is not fatal.
360 let _ = indexing::track_query(
361 &self.indexes,
362 &self.query_heatmap,
363 collection,
364 field,
365 &self.storage,
366 &self.state,
367 );
368 }
369
370 /// Register a JSON schema for a collection.
371 /// All subsequent writes to this collection must conform to this schema.
372 #[cfg(feature = "schema")]
373 pub fn set_schema(&self, collection: &str, schema: Value) -> Result<(), DbError> {
374 schema::set_schema(
375 &self.schemas,
376 &self.storage,
377 &self.tx,
378 collection,
379 schema
380 )
381 }
382
383 /// Compact the log file — rewrite it to contain only the current state.
384 ///
385 /// This removes all dead entries (superseded INSERTs, DELETE tombstones)
386 /// and writes a binary snapshot for fast next startup.
387 ///
388 /// The compacted log contains:
389 /// - One INSERT entry per live document (current value only).
390 /// - One INDEX entry per registered index (index data is rebuilt on replay).
391 pub fn compact(&self) -> Result<(), DbError> {
392 info!("🔨 Starting Log Compaction...");
393
394 // Build the minimal set of entries representing the current state.
395 let mut entries = Vec::new();
396
397 // One INSERT per live document across all collections.
398 for col_ref in self.state.iter() {
399 let col_name = col_ref.key();
400 for item_ref in col_ref.value().iter() {
401 // To compact, we need the full Value. If it's Cold, we fetch it from storage.
402 let entry = match item_ref.value() {
403 crate::engine::types::DocumentState::Hot(v) => {
404 types::LogEntry::new(
405 "INSERT".to_string(),
406 col_name.clone(),
407 item_ref.key().clone(),
408 v.clone(),
409 )
410 }
411 crate::engine::types::DocumentState::Cold(ptr) => {
412 let bytes = self.storage.read_at(ptr.offset, ptr.length)?;
413 serde_json::from_slice(&bytes)?
414 }
415 };
416 entries.push(entry);
417 }
418 }
419
420 // One SCHEMA entry per collection.
421 #[cfg(feature = "schema")]
422 for schema_ref in self.schemas.iter() {
423 let col_name = schema_ref.key();
424 let (schema_json, _) = &**schema_ref.value();
425 entries.push(types::LogEntry::new(
426 "SCHEMA".to_string(),
427 col_name.clone(),
428 "".to_string(),
429 schema_json.clone(),
430 ));
431 }
432
433 // One INDEX entry per registered index.
434 // The index name format is "collection:field" — we split it to get both parts.
435 for index_ref in self.indexes.iter() {
436 let parts: Vec<&str> = index_ref.key().split(':').collect();
437 if parts.len() == 2 {
438 entries.push(types::LogEntry::new(
439 "INDEX".to_string(),
440 parts[0].to_string(),
441 parts[1].to_string(), // field name
442 serde_json::json!(null),
443 ));
444 }
445 }
446
447 // Delegate the actual file rewrite (and snapshot write) to the storage backend.
448 self.storage.compact(entries.clone())?;
449
450 // After compaction the log is rewritten and all old RecordPointers are invalid.
451 // Promote every Cold entry in the in-memory state to Hot so subsequent reads
452 // don't try to seek to stale byte offsets in the now-truncated log file.
453 for entry in &entries {
454 if entry.cmd == "INSERT" {
455 if let Some(col) = self.state.get(&entry.collection) {
456 if let Some(mut doc) = col.get_mut(&entry.key) {
457 if matches!(*doc, crate::engine::types::DocumentState::Cold(_)) {
458 *doc = crate::engine::types::DocumentState::Hot(entry.value.clone());
459 }
460 }
461 }
462 }
463 }
464
465 info!("✅ Log Compaction Finished!");
466 Ok(())
467 }
468
469 /// Evict documents from RAM to disk for a collection if it exceeds the threshold.
470 ///
471 /// This converts `Hot(Value)` entries into `Cold(RecordPointer)` entries.
472 /// In this v1, it re-scans the log to find the exact byte offsets for the documents.
473 pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
474 let col_len = if let Some(col) = self.state.get(collection) {
475 col.len()
476 } else {
477 return Err(DbError::CollectionNotFound);
478 };
479
480 if col_len <= limit {
481 return Ok(0);
482 }
483
484 let mut evicted_count = 0;
485 let mut offset = 0u64;
486 let to_evict = col_len - limit;
487
488 // To evict properly, we need the pointers. Since we don't store them for
489 // Hot documents, we re-scan the log to find them.
490 self.storage.stream_log_into(&mut |entry, length| {
491 if entry.collection == collection {
492 if evicted_count < to_evict {
493 if let Some(col) = self.state.get(collection) {
494 if let Some(mut doc_state) = col.get_mut(&entry.key) {
495 if let crate::engine::types::DocumentState::Hot(_) = *doc_state {
496 *doc_state = crate::engine::types::DocumentState::Cold(crate::engine::types::RecordPointer {
497 offset,
498 length,
499 });
500 evicted_count += 1;
501 }
502 }
503 }
504 }
505 }
506 offset += (length + 1) as u64;
507 ControlFlow::Continue(())
508 })?;
509
510 Ok(evicted_count)
511 }
512
513 /// Recover the database state to a specific point in time or sequence number.
514 /// Returns the recovered state as a Vec of LogEntries that can be written to a snapshot.
515 ///
516 /// This is a utility function used by the CLI for PITR.
517 #[cfg(not(target_arch = "wasm32"))]
518 pub fn recover_to(
519 storage: &dyn StorageBackend,
520 to_time: Option<u64>,
521 to_seq: Option<u64>,
522 ) -> Result<Vec<LogEntry>, DbError> {
523 let state: DashMap<String, DashMap<String, crate::engine::types::DocumentState>> = DashMap::new();
524 let indexes: DashMap<String, DashMap<String, DashSet<String>>> = DashMap::new();
525 #[cfg(feature = "schema")]
526 let schemas: DashMap<String, Arc<(serde_json::Value, jsonschema::Validator)>> = DashMap::new();
527
528 let mut offset = 0u64;
529 let mut count = 0u64;
530 let mut current_tx_entries = Vec::new();
531 let mut current_tx_id = None;
532
533 storage.stream_log_into(&mut |entry, length| {
534 // Condition 1: Check Timestamp
535 if let Some(t) = to_time {
536 if entry._t > t {
537 return ControlFlow::Break(());
538 }
539 }
540
541 // Condition 2: Check Sequence
542 if let Some(s) = to_seq {
543 if count >= s {
544 return ControlFlow::Break(());
545 }
546 }
547
548 let pointer = crate::engine::types::RecordPointer {
549 offset,
550 length,
551 };
552
553 match entry.cmd.as_str() {
554 "TX_BEGIN" => {
555 current_tx_id = Some(entry.key.clone());
556 current_tx_entries.clear();
557 }
558 "TX_COMMIT" => {
559 if current_tx_id.as_ref() == Some(&entry.key) {
560 for (e, p) in current_tx_entries.drain(..) {
561 crate::engine::storage::apply_entry(
562 &e,
563 &state,
564 &indexes,
565 #[cfg(feature = "schema")] &schemas,
566 Some(p),
567 );
568 }
569 current_tx_id = None;
570 }
571 }
572 _ => {
573 if current_tx_id.is_some() {
574 current_tx_entries.push((entry, pointer));
575 } else {
576 crate::engine::storage::apply_entry(
577 &entry,
578 &state,
579 &indexes,
580 #[cfg(feature = "schema")] &schemas,
581 Some(pointer),
582 );
583 }
584 }
585 }
586
587 count += 1;
588 offset += (length + 1) as u64;
589 ControlFlow::Continue(())
590 })?;
591
592 // Convert the recovered state into LogEntries (similar to compact logic)
593 let mut entries = Vec::new();
594 for col_ref in state.iter() {
595 let col_name = col_ref.key();
596 for item_ref in col_ref.value().iter() {
597 let entry = match item_ref.value() {
598 crate::engine::types::DocumentState::Hot(v) => {
599 LogEntry::new(
600 "INSERT".to_string(),
601 col_name.clone(),
602 item_ref.key().clone(),
603 v.clone(),
604 )
605 }
606 crate::engine::types::DocumentState::Cold(ptr) => {
607 let bytes = storage.read_at(ptr.offset, ptr.length).unwrap_or_default();
608 serde_json::from_slice(&bytes).unwrap_or_else(|_| {
609 LogEntry::new("INSERT".to_string(), col_name.clone(), item_ref.key().clone(), serde_json::Value::Null)
610 })
611 }
612 };
613 entries.push(entry);
614 }
615 }
616
617 #[cfg(feature = "schema")]
618 for schema_ref in schemas.iter() {
619 let col_name = schema_ref.key();
620 let (schema_json, _) = &**schema_ref.value();
621 entries.push(LogEntry::new(
622 "SCHEMA".to_string(),
623 col_name.clone(),
624 "".to_string(),
625 schema_json.clone(),
626 ));
627 }
628
629 for index_ref in indexes.iter() {
630 let parts: Vec<&str> = index_ref.key().split(':').collect();
631 if parts.len() == 2 {
632 entries.push(LogEntry::new(
633 "INDEX".to_string(),
634 parts[0].to_string(),
635 parts[1].to_string(),
636 serde_json::json!(null),
637 ));
638 }
639 }
640
641 Ok(entries)
642 }
643}