moltendb-core 0.2.0-beta.2

MoltenDB core engine — in-memory DashMap storage, WAL persistence, query evaluation. No HTTP, no auth.
Documentation
// ─── worker.rs ────────────────────────────────────────────────────────────────
// This file is the WASM entry point — it exposes the database to JavaScript.
//
// What is WASM?
//   WebAssembly (WASM) is a binary format that runs in the browser at near-native
//   speed. Rust code compiled to WASM can be called directly from JavaScript.
//   `wasm-bindgen` is the tool that generates the JavaScript glue code.
//
// What is a Web Worker?
//   A Web Worker is a background thread in the browser. JavaScript is normally
//   single-threaded — running heavy computation on the main thread freezes the UI.
//   By running the database inside a Worker, all DB operations happen off the main
//   thread, keeping the dashboard responsive even during heavy queries.
//
// How this file fits in:
//   analytics-worker.js (Web Worker)
//     → imports WorkerDb from pkg/moltendb.js  (generated by wasm-bindgen)
//       → calls WorkerDb::new(dbName)           (this file, Rust)
//         → Db::open_wasm(dbName)               (engine/mod.rs)
//           → OpfsStorage::new(dbName)          (engine/storage/wasm.rs)
//
//   Then for each message from the main thread:
//     analytics-worker.js
//       → db.handle_message({ action: 'set', collection, key, value })
//         → WorkerDb::handle_message()          (this file)
//           → routes to handle_set / handle_get / etc.
//
// This file only compiles when targeting WASM (wasm32-unknown-unknown).
// The server-side code (main.rs, handlers.rs, etc.) is excluded from WASM builds.
// ─────────────────────────────────────────────────────────────────────────────

// Only compile this file when building for WASM (browser target).
// The `#![cfg(...)]` attribute applies to the entire file.
#![cfg(target_arch = "wasm32")]

// wasm_bindgen::prelude::* imports the core WASM-JS interop tools:
//   - `#[wasm_bindgen]` attribute — marks structs/functions as callable from JS.
//   - `JsValue` — a generic JavaScript value (string, number, object, null, etc.).
//   - `wasm_bindgen` — the proc-macro that generates JS glue code.
use wasm_bindgen::prelude::*;

// Db = the main database engine (same code used on the server).
// This is the key insight of MoltenDB: the same Db struct runs in both
// the browser (via WASM + OpfsStorage) and on the server (via DiskStorage).
use crate::engine::Db;

// analytics = the analytics query engine (COUNT, SUM, AVG, MIN, MAX).
use crate::analytics;

// handlers = the full query/insert/update/delete pipeline (same as HTTP handlers).
use crate::handlers;

// Value = a generic JSON value.
// json! = macro to create JSON values inline, e.g. json!({"status": "ok"}).
use serde_json::{Value, json};

// Auto-compaction thresholds (mirrors the JS worker defaults).
// Compaction is triggered when EITHER condition is met after a write.
const COMPACT_AFTER_WRITES: u32 = 500;   // number of write operations
const COMPACT_SIZE_BYTES:   u64 = 5 * 1024 * 1024; // 5 MB

// ─── WorkerDb ─────────────────────────────────────────────────────────────────

/// The WASM-exposed database handle used by the JavaScript Web Worker.
///
/// `#[wasm_bindgen]` on the struct makes it visible to JavaScript.
/// JavaScript creates an instance with: `const db = await new WorkerDb("mydb")`
///
/// The struct wraps a `Db` — the same engine used on the server.
/// All methods on this struct are thin adapters that:
///   1. Convert JavaScript values (JsValue) to Rust types.
///   2. Call the underlying Db methods.
///   3. Convert the result back to JsValue for JavaScript.
#[wasm_bindgen]
pub struct WorkerDb {
    /// The underlying database engine instance.
    /// `Db` holds the in-memory state (DashMap) and the storage backend (OpfsStorage).
    db: Db,

    /// Counts write operations (set / update / delete) since the last compaction.
    /// `Cell<u32>` gives interior mutability without requiring `&mut self` —
    /// necessary because all `#[wasm_bindgen]` methods receive `&self`.
    write_count: std::cell::Cell<u32>,
}

/// Implementation block for WorkerDb.
/// `#[wasm_bindgen]` on the impl block makes all pub methods callable from JS.
#[wasm_bindgen]
impl WorkerDb {
    /// Initialize the database and open (or create) the OPFS storage file.
    ///
    /// This is the constructor — called from JavaScript as:
    ///   `const db = await new WorkerDb("click_analytics_db")`
    ///
    /// `#[wasm_bindgen(constructor)]` tells wasm-bindgen that this function
    /// is the JavaScript `new WorkerDb(...)` constructor.
    ///
    /// `async` because opening the OPFS file handle is an async browser API.
    /// Returns `Result<WorkerDb, JsValue>` — on error, the JsValue becomes a
    /// JavaScript exception that the worker's try/catch can handle.
    ///
    /// # Arguments
    /// * `db_name` — The name of the OPFS file to open (e.g. "click_analytics_db").
    ///   Each unique name is a separate database file in the browser's OPFS storage.
    #[wasm_bindgen(constructor)]
    pub async fn new(db_name: &str) -> Result<WorkerDb, JsValue> {
        // Install a panic hook that converts Rust panics into readable JS error messages.
        // Without this, a Rust panic in WASM produces an unhelpful "unreachable" error.
        // `set_once` ensures it's only installed once even if new() is called multiple times.
        console_error_panic_hook::set_once();

        // Open the database. `Db::open_wasm` creates an OpfsStorage backend,
        // reads the existing OPFS file (if any), and replays the log into memory.
        // `.map_err(...)` converts a DbError into a JsValue string for JavaScript.
        let db = Db::open_wasm(db_name)
            .await
            .map_err(|e| JsValue::from_str(&format!("Failed to open database: {}", e)))?;

        // Log a success message to the browser's DevTools console.
        web_sys::console::log_1(&JsValue::from_str("✅ MoltenDB initialized in worker"));

        Ok(WorkerDb { db, write_count: std::cell::Cell::new(0) })
    }

    /// Route an incoming message from the JavaScript worker to the correct handler.
    ///
    /// Called from moltendb-worker.js as:
    ///   `db.handle_message({ action: 'get', collection: 'laptops', keys: 'lp1' })`
    ///
    /// The `data` parameter is a plain JavaScript object (not a MessageEvent wrapper).
    /// It must have an `action` field that determines which operation to perform.
    ///
    /// Supported actions — identical to the HTTP server endpoints:
    ///   - "get"      → query documents (single key, batch, full collection, WHERE, joins, sort, pagination)
    ///   - "set"      → insert or upsert documents: { collection, data: { key: doc, ... } }
    ///   - "update"   → patch/merge documents:      { collection, data: { key: patch, ... } }
    ///   - "delete"   → delete documents or drop:   { collection, keys: ... } or { drop: true }
    ///   - "compact"  → compact the OPFS log file
    ///   - "get_size" → return current OPFS file size in bytes
    ///
    /// Returns a JsValue result on success, or a JsValue error string on failure.
    #[wasm_bindgen]
    pub fn handle_message(&self, data: JsValue) -> Result<JsValue, JsValue> {
        // Deserialize the JavaScript object into a serde_json::Value.
        let request: serde_json::Value = serde_wasm_bindgen::from_value(data)
            .map_err(|e| JsValue::from_str(&format!("Invalid request: {}", e)))?;

        // Extract the "action" field — required in every message.
        let action = request["action"]
            .as_str()
            .ok_or_else(|| JsValue::from_str("Missing action field"))?;

        // Build a payload without the "action" key so the handlers' property
        // validators don't see it as an unknown field.
        let payload = {
            let mut p = request.clone();
            if let Some(obj) = p.as_object_mut() {
                obj.remove("action");
            }
            p
        };

        // Route to the appropriate handler — mirrors the HTTP server endpoints exactly.
        let result = match action {
            // Full query pipeline: single key, batch, WHERE, fields, joins, sort, pagination.
            // Equivalent to POST /get on the server.
            "get"      => self.handle_get(&payload),
            // Insert/upsert batch: { collection, data: { key: doc, ... } }.
            // Equivalent to POST /set on the server.
            "set"      => self.handle_set(&payload),
            // Patch/merge batch: { collection, data: { key: patch, ... } }.
            // Equivalent to POST /update on the server.
            "update"   => self.handle_update(&payload),
            // Delete single/batch/drop: { collection, keys: ... } or { drop: true }.
            // Equivalent to POST /delete on the server.
            "delete"   => self.handle_delete(&payload),
            // Compact the OPFS log file (removes superseded entries).
            "compact"  => self.handle_compact(),
            // Return the current OPFS file size in bytes.
            "get_size" => self.handle_get_size(),
            // Unknown action — return an error instead of panicking.
            _ => Err(JsValue::from_str(&format!("Unknown action: {}", action))),
        }?;

        Ok(result)
    }

    /// Return the current size of the OPFS log file in bytes.
    ///
    /// Used by the JavaScript worker to implement size-based auto-compaction:
    /// after every INSERT batch, the worker calls `get_size` and compacts if
    /// the file exceeds the configured threshold (default: 5 MB).
    ///
    /// Returns `{ "size": <bytes> }` on success.
    fn handle_get_size(&self) -> Result<JsValue, JsValue> {
        // Delegate to the storage backend's get_size() method.
        // OpfsStorage implements this by calling FileSystemSyncAccessHandle.getSize(),
        // which returns the current byte length of the OPFS file without reading it.
        let size = self.db.storage.get_size()
            .map_err(|e| JsValue::from_str(&e.to_string()))?;

        Ok(serde_wasm_bindgen::to_value(&serde_json::json!({ "size": size }))?)
    }

    /// Compact the OPFS log file.
    ///
    /// Compaction rewrites the log file to contain only the current state —
    /// removing all superseded INSERT entries and all DELETE tombstones.
    /// This shrinks the file and speeds up future startup replay.
    ///
    /// Returns `{ "status": "compacted" }` on success.
    fn handle_compact(&self) -> Result<JsValue, JsValue> {
        self.db
            .compact()
            .map_err(|e| JsValue::from_str(&e.to_string()))?;

        Ok(serde_wasm_bindgen::to_value(&serde_json::json!({"status": "compacted"}))?)
    }

    /// Query documents — full pipeline: single key, batch, WHERE, fields, joins, sort, pagination.
    /// Equivalent to POST /get on the server.
    ///   { "collection": "laptops", "where": { "brand": "Apple" }, "fields": ["brand", "model"] }
    fn handle_get(&self, request: &Value) -> Result<JsValue, JsValue> {
        let (code, mut body) = handlers::process_get::process_get(&self.db, request, 5 * 1024 * 1024);
        if let Some(obj) = body.as_object_mut() { obj.insert("statusCode".into(), serde_json::json!(code)); }
        if code >= 400 { return Err(serde_wasm_bindgen::to_value(&body).map_err(|e| JsValue::from_str(&e.to_string()))?); }
        serde_wasm_bindgen::to_value(&body).map_err(|e| JsValue::from_str(&e.to_string()))
    }

    /// Insert/upsert documents. Equivalent to POST /set on the server.
    ///   { "collection": "laptops", "data": { "lp1": { ... }, "lp2": { ... } } }
    fn handle_set(&self, request: &Value) -> Result<JsValue, JsValue> {
        let (code, mut body) = handlers::process_set::process_set(&self.db, request, 5 * 1024 * 1024);
        if let Some(obj) = body.as_object_mut() { obj.insert("statusCode".into(), serde_json::json!(code)); }
        if code >= 400 { return Err(serde_wasm_bindgen::to_value(&body).map_err(|e| JsValue::from_str(&e.to_string()))?); }
        self.maybe_compact();
        serde_wasm_bindgen::to_value(&body).map_err(|e| JsValue::from_str(&e.to_string()))
    }

    /// Patch/merge documents. Equivalent to POST /update on the server.
    ///   { "collection": "laptops", "data": { "lp4": { "price": 1749 } } }
    fn handle_update(&self, request: &Value) -> Result<JsValue, JsValue> {
        let (code, mut body) = handlers::process_update::process_update(&self.db, request, 5 * 1024 * 1024);
        if let Some(obj) = body.as_object_mut() { obj.insert("statusCode".into(), serde_json::json!(code)); }
        if code >= 400 { return Err(serde_wasm_bindgen::to_value(&body).map_err(|e| JsValue::from_str(&e.to_string()))?); }
        self.maybe_compact();
        serde_wasm_bindgen::to_value(&body).map_err(|e| JsValue::from_str(&e.to_string()))
    }

    /// Delete documents or drop a collection. Equivalent to POST /delete on the server.
    ///   { "collection": "laptops", "keys": "lp6" }  or  { "drop": true }
    fn handle_delete(&self, request: &Value) -> Result<JsValue, JsValue> {
        let (code, mut body) = handlers::process_delete::process_delete(&self.db, request, 5 * 1024 * 1024);
        if let Some(obj) = body.as_object_mut() { obj.insert("statusCode".into(), serde_json::json!(code)); }
        if code >= 400 { return Err(serde_wasm_bindgen::to_value(&body).map_err(|e| JsValue::from_str(&e.to_string()))?); }
        self.maybe_compact();
        serde_wasm_bindgen::to_value(&body).map_err(|e| JsValue::from_str(&e.to_string()))
    }

    /// Execute an analytics query and return the result as a JSON string.
    ///
    /// This is the method called by the dashboard's auto-refresh loop:
    ///   `const resultStr = db.analytics(JSON.stringify(query))`
    ///
    /// Takes a JSON string (not a JsValue) because the analytics query format
    /// is complex and easier to pass as a pre-serialized string from JavaScript.
    ///
    /// Returns a JSON string (not a JsValue) so JavaScript can parse it with
    /// `JSON.parse(resultStr)` and access `result` and `metadata`.
    ///
    /// Example input:
    ///   `'{"collection":"events","metric":{"type":"COUNT"},"where":{"event_type":"button_click"}}'`
    ///
    /// Example output:
    ///   `'{"result":42,"metadata":{"execution_time_ms":0,"rows_scanned":42}}'`
    ///
    /// `#[wasm_bindgen(js_name = analytics)]` sets the JavaScript method name to
    /// "analytics" (matching the call in analytics-worker.js).
    #[wasm_bindgen(js_name = analytics)]
    pub fn analytics(&self, query_json: &str) -> String {
        // Parse the JSON string into an AnalyticsQuery struct.
        // If parsing fails (malformed JSON or missing fields), return an error JSON string.
        let query: analytics::AnalyticsQuery = match serde_json::from_str(query_json) {
            Ok(q) => q,
            Err(e) => return json!({ "error": format!("Invalid query: {}", e) }).to_string(),
        };

        // Execute the analytics query against the in-memory database.
        let result = analytics::execute_query(&self.db, &query);

        // Serialize the result to a JSON string.
        // We manually construct the output shape to match what the dashboard expects.
        json!({
            "result": result.result,
            "metadata": {
                "execution_time_ms": result.metadata.execution_time_ms,
                "rows_scanned": result.metadata.rows_scanned
            }
        }).to_string()
    }

    /// Check auto-compaction thresholds and compact if needed.
    ///
    /// Called after every successful write (set / update / delete).
    /// Compaction is triggered when EITHER:
    ///   - `write_count` reaches `COMPACT_AFTER_WRITES` (500), OR
    ///   - the OPFS file size exceeds `COMPACT_SIZE_BYTES` (5 MB).
    ///
    /// On compaction the counter is reset to 0. Errors are logged to the
    /// browser console but never propagated — a failed compaction is not
    /// fatal; the log simply grows a little larger until the next attempt.
    fn maybe_compact(&self) {
        let count = self.write_count.get() + 1;
        self.write_count.set(count);

        // Check count-based threshold first (cheap, no OPFS call).
        let count_triggered = count >= COMPACT_AFTER_WRITES;

        // Check size-based threshold only when count hasn't already triggered.
        let size_triggered = if !count_triggered {
            self.db.storage.get_size()
                .map(|s| s >= COMPACT_SIZE_BYTES)
                .unwrap_or(false)
        } else {
            false
        };

        if count_triggered || size_triggered {
            let reason = if count_triggered {
                format!("write count reached {}", count)
            } else {
                "file size exceeded 5 MB".to_string()
            };

            web_sys::console::log_1(&JsValue::from_str(
                &format!("🗜️ MoltenDB: auto-compaction triggered ({})", reason)
            ));

            match self.db.compact() {
                Ok(()) => {
                    self.write_count.set(0);
                    web_sys::console::log_1(&JsValue::from_str(
                        "✅ MoltenDB: auto-compaction complete"
                    ));
                }
                Err(e) => {
                    web_sys::console::error_1(&JsValue::from_str(
                        &format!("❌ MoltenDB: auto-compaction failed: {}", e)
                    ));
                }
            }
        }
    }

    /// Subscribe to real-time database changes.
    /// The provided JavaScript function will be called with a JSON string
    /// representing the mutation event.
    #[wasm_bindgen]
    pub fn subscribe(&self, callback: js_sys::Function) {
        // Tap into the exact same channel the native WebSocket server uses
        let mut rx = self.db.subscribe();

        // Spawn an async task on the local WASM thread
        wasm_bindgen_futures::spawn_local(async move {
            while let Ok(msg) = rx.recv().await {
                // msg is already a formatted JSON string from operations.rs
                // e.g., {"event": "change", "collection": "laptops", "key": "lp1", "new_v": 2}
                let _ = callback.call1(&wasm_bindgen::JsValue::NULL, &wasm_bindgen::JsValue::from_str(&msg));
            }
        });
    }
}