moltendb_core/engine/mod.rs
1// ─── engine/mod.rs ────────────────────────────────────────────────────────────
2// This is the root module of the database engine. It defines the `Db` struct —
3// the central object that the rest of the application interacts with.
4//
5// The Db struct is a thin, cloneable handle to the shared database state.
6// Cloning a Db is cheap — it just increments reference counts on the Arcs
7// inside. All clones share the same underlying data, so any write made through
8// one clone is immediately visible through all others. This is how Axum handler
9// functions can each receive their own Db clone via State<> extraction while
10// all operating on the same in-memory database.
11//
12// Internal structure:
13// state — the actual document data: collection → (key → JSON value)
14// storage — the persistence layer (disk, encrypted, or OPFS)
15// tx — broadcast channel for real-time WebSocket notifications
16// indexes — field indexes for fast WHERE queries
17// query_heatmap — tracks query frequency for auto-indexing
18//
19// The Db struct has two constructors:
20// open() — native (server) build, opens a disk file
21// open_wasm() — WASM (browser) build, opens an OPFS file
22// Both are conditionally compiled with #[cfg(...)] attributes.
23// ─────────────────────────────────────────────────────────────────────────────
24
25// Declare the sub-modules of the engine.
26mod types; // LogEntry, DbError
27mod indexing; // index_doc, unindex_doc, track_query, create_index
28mod storage; // StorageBackend trait + concrete implementations
29mod config; // DbConfig struct
30#[cfg(feature = "schema")]
31mod schema; // JSON Schema validation
32mod operations; // get, get_all, insert, update, delete, etc.
33mod open; // Db::open() — native constructor
34mod open_wasm; // Db::open_wasm() — WASM constructor
35
36// Re-export LogEntry so it can be used by tests and other crates.
37pub use types::{DbError, LogEntry};
38// Re-export DbConfig
39pub use config::DbConfig;
40// Re-export the StorageBackend trait so callers can use it without knowing
41// the internal module structure.
42pub use storage::{StorageBackend, EncryptedStorage};
43#[cfg(not(target_arch = "wasm32"))]
44pub use storage::{AsyncDiskStorage, SyncDiskStorage};
45
46// DashMap = concurrent hash map. DashSet = concurrent hash set.
47use dashmap::{DashMap, DashSet};
48// Value = dynamically-typed JSON value.
49use serde_json::Value;
50// Standard HashMap — used for return values from get operations.
51use std::collections::HashMap;
52// Arc = thread-safe reference-counted pointer.
53// Wrapping fields in Arc allows Db to be cheaply cloned — all clones share
54// the same underlying data.
55use std::sync::Arc;
56// Tokio's broadcast channel: one sender, many receivers.
57// Used to push real-time change notifications to WebSocket subscribers.
58use tokio::sync::broadcast;
59
60/// The central database handle. Cheap to clone — all clones share the same state.
61///
62/// This struct is the public API of the engine. All database operations go
63/// through methods on this struct, which delegate to the operations module.
64#[derive(Clone)]
65pub struct Db {
66 /// The main document store.
67 /// Outer map: collection name (e.g. "users") → inner map.
68 /// Inner map: document key (e.g. "u1") → Hybrid Hot/Cold document state.
69 /// DashMap allows concurrent reads and writes from multiple threads.
70 state: Arc<DashMap<String, DashMap<String, crate::engine::types::DocumentState>>>,
71
72 /// The storage backend — handles persistence to disk or OPFS.
73 /// `pub` so handlers can access it directly if needed (e.g. for compaction).
74 /// `Arc<dyn StorageBackend>` = shared pointer to any type implementing the trait.
75 pub storage: Arc<dyn StorageBackend>,
76
77 /// Broadcast channel sender for real-time change notifications.
78 /// When a document is inserted, updated, or deleted, a JSON event is sent
79 /// on this channel. WebSocket handlers subscribe to receive these events.
80 /// `pub` so the WebSocket handler in main.rs can call subscribe().
81 pub tx: broadcast::Sender<String>,
82
83 /// The index store.
84 /// Key format: "collection:field" (e.g. "users:role").
85 /// Value: field_value → set of document keys with that value.
86 /// e.g. "users:role" → { "admin" → {"u1"}, "user" → {"u2", "u3"} }
87 /// `pub` so handlers.rs can check for index existence directly.
88 pub indexes: Arc<DashMap<String, DashMap<String, DashSet<String>>>>,
89
90 /// Query frequency counter for auto-indexing.
91 /// Key: "collection:field". Value: number of times queried.
92 /// When a field reaches 3 queries, an index is auto-created.
93 pub query_heatmap: Arc<DashMap<String, u32>>,
94
95 /// The maximum number of documents per collection to keep in RAM (Hot).
96 /// If a collection exceeds this, older documents are paged out to disk (Cold).
97 /// Default is 50,000.
98 pub hot_threshold: usize,
99
100 /// Max requests per window.
101 pub rate_limit_requests: u32,
102
103 /// Window size in seconds.
104 pub rate_limit_window: u64,
105
106 /// Maximum request body size in bytes.
107 pub max_body_size: usize,
108
109 /// Maximum keys allowed per request.
110 pub max_keys_per_request: usize,
111
112 /// Registered JSON schemas per collection.
113 /// Key: collection name → Value: (Original JSON, Compiled Validator).
114 #[cfg(feature = "schema")]
115 pub schemas: Arc<DashMap<String, Arc<(Value, jsonschema::Validator)>>>,
116
117 /// Optional shell command to execute after a successful backup.
118 /// Supports the {SNAPSHOT_PATH} placeholder.
119 pub post_backup_script: Option<String>,
120
121 /// Whether tiered (hot+cold) storage mode is active.
122 pub tiered_mode: bool,
123
124 /// Timestamp of when this Db instance was opened, used for uptime calculation.
125 #[cfg(not(target_arch = "wasm32"))]
126 pub started_at: std::time::Instant,
127}
128
129impl Db {
130 /// Returns the total number of hot (in-memory) keys across all collections.
131 pub fn hot_keys_count(&self) -> usize {
132 self.state.iter().map(|c| c.value().len()).sum()
133 }
134
135 /// Create a new broadcast receiver for real-time change notifications.
136 /// Each call returns an independent receiver — multiple WebSocket handlers
137 /// can each subscribe and receive all events independently.
138 pub fn subscribe(&self) -> broadcast::Receiver<String> {
139 self.tx.subscribe()
140 }
141
142 /// Retrieve documents by their keys. Returns a HashMap of found key→value pairs.
143 /// Missing keys are silently skipped. Pass a single key to retrieve one document.
144 pub fn get(&self, collection: &str, keys: Vec<String>) -> HashMap<String, Value> {
145 operations::get(&self.state, &self.storage, collection, keys)
146 }
147
148 /// Retrieve all documents in a collection as a HashMap.
149 pub fn get_all(&self, collection: &str) -> HashMap<String, Value> {
150 operations::get_all(&self.state, &self.storage, collection)
151 }
152
153 /// Insert or overwrite multiple documents in one call.
154 /// Each item is a (key, value) pair. Writes are persisted to storage.
155 pub fn insert(&self, collection: &str, items: Vec<(String, Value)>) -> Result<(), DbError> {
156 operations::insert(
157 &self.state,
158 &self.indexes,
159 &self.storage,
160 &self.tx,
161 #[cfg(feature = "schema")] &self.schemas,
162 collection,
163 items,
164 )?;
165
166 // Auto-evict if the collection exceeds the threshold.
167 let _ = self.evict_collection(collection, self.hot_threshold);
168 Ok(())
169 }
170
171 /// Partially update a document — merges `updates` into the existing document.
172 /// Returns true if the document was found and updated, false if not found.
173 pub fn update(&self, collection: &str, key: &str, updates: Value) -> Result<bool, DbError> {
174 let updated = operations::update(
175 &self.state,
176 &self.indexes,
177 &self.storage,
178 &self.tx,
179 #[cfg(feature = "schema")] &self.schemas,
180 collection,
181 key,
182 updates,
183 )?;
184
185 if updated {
186 // Auto-evict if the collection exceeds the threshold.
187 let _ = self.evict_collection(collection, self.hot_threshold);
188 }
189 Ok(updated)
190 }
191
192 /// Delete one or more documents by key. Pass a single key to delete one document.
193 pub fn delete(&self, collection: &str, keys: Vec<String>) -> Result<(), DbError> {
194 operations::delete(
195 &self.state,
196 &self.indexes,
197 &self.storage,
198 &self.tx,
199 collection,
200 keys,
201 )
202 }
203
204 /// Drop an entire collection — removes all documents and its indexes.
205 pub fn delete_collection(&self, collection: &str) -> Result<(), DbError> {
206 operations::delete_collection(
207 &self.state,
208 &self.indexes,
209 &self.storage,
210 &self.tx,
211 collection,
212 )
213 }
214
215 /// Track that `field` was queried in `collection` and auto-create an index
216 /// if this field has been queried 3 or more times.
217 /// Errors are silently ignored — auto-indexing is best-effort.
218 pub fn track_query(&self, collection: &str, field: &str) {
219 // The `let _ =` discards the Result — a failed auto-index is not fatal.
220 let _ = indexing::track_query(
221 &self.indexes,
222 &self.query_heatmap,
223 collection,
224 field,
225 &self.storage,
226 &self.state,
227 );
228 }
229
230 /// Register a JSON schema for a collection.
231 /// All subsequent writes to this collection must conform to this schema.
232 #[cfg(feature = "schema")]
233 pub fn set_schema(&self, collection: &str, schema: Value) -> Result<(), DbError> {
234 schema::set_schema(
235 &self.schemas,
236 &self.storage,
237 &self.tx,
238 collection,
239 schema
240 )
241 }
242
243 /// Wipe all in-memory state — documents, indexes, and query heatmap.
244 /// Used by the WASM layer when a browser tab unloads in in-memory mode,
245 /// so that any tab refresh clears the shared RAM store for all tabs.
246 pub fn clear_all(&self) {
247 self.state.clear();
248 self.indexes.clear();
249 self.query_heatmap.clear();
250 #[cfg(feature = "schema")]
251 self.schemas.clear();
252 }
253
254 /// Compact the log file — rewrite it to contain only the current state.
255 ///
256 /// This removes all dead entries (superseded INSERTs, DELETE tombstones)
257 /// and writes a binary snapshot for fast next startup.
258 ///
259 /// The compacted log contains:
260 /// - One INSERT entry per live document (current value only).
261 /// - One INDEX entry per registered index (index data is rebuilt on replay).
262 pub fn compact(&self) -> Result<(), DbError> {
263 let entries = operations::compact(
264 &self.state,
265 #[cfg(feature = "schema")] &self.schemas,
266 &self.indexes,
267 &*self.storage,
268 self.post_backup_script.clone(),
269 )?;
270
271 // After compaction the log is rewritten and all old RecordPointers are invalid.
272 // Promote every Cold entry in the in-memory state to Hot so subsequent reads
273 // don't try to seek to stale byte offsets in the now-truncated log file.
274 for entry in &entries {
275 if entry.cmd == "INSERT" {
276 if let Some(col) = self.state.get(&entry.collection) {
277 if let Some(mut doc) = col.get_mut(&entry.key) {
278 if matches!(*doc, crate::engine::types::DocumentState::Cold(_)) {
279 *doc = crate::engine::types::DocumentState::Hot(entry.value.clone());
280 }
281 }
282 }
283 }
284 }
285
286 Ok(())
287 }
288
289 /// Evict documents from RAM to disk for a collection if it exceeds the threshold.
290 ///
291 /// This converts `Hot(Value)` entries into `Cold(RecordPointer)` entries.
292 /// In this v1, it re-scans the log to find the exact byte offsets for the documents.
293 pub fn evict_collection(&self, collection: &str, limit: usize) -> Result<usize, DbError> {
294 operations::evict_collection(&self.state, &*self.storage, collection, limit)
295 }
296
297 /// Recover the database state to a specific point in time or sequence number.
298 /// Returns the recovered state as a Vec of LogEntries that can be written to a snapshot.
299 ///
300 /// This is a utility function used by the CLI for PITR.
301 #[cfg(not(target_arch = "wasm32"))]
302 pub fn recover_to(
303 storage: &dyn StorageBackend,
304 to_time: Option<u64>,
305 to_seq: Option<u64>,
306 ) -> Result<Vec<LogEntry>, DbError> {
307 operations::recover_to(storage, to_time, to_seq)
308 }
309}