Skip to main content

reifydb_webassembly/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! WebAssembly bindings for ReifyDB query engine
5//!
6//! This crate provides JavaScript-compatible bindings for running ReifyDB
7//! queries in a browser or Node.js environment with in-memory storage.
8
9use std::collections::HashMap;
10
11use reifydb_catalog::schema::SchemaRegistry;
12use reifydb_engine::{engine::StandardEngine, procedure::registry::Procedures};
13use wasm_bindgen::prelude::*;
14
15// Debug helper to log to browser console
16fn console_log(msg: &str) {
17	web_sys::console::log_1(&msg.into());
18}
19use reifydb_cdc::{
20	produce::producer::{CdcProducerEventListener, spawn_cdc_producer},
21	storage::CdcStore,
22};
23use reifydb_core::event::transaction::PostCommitEvent;
24use reifydb_store_multi::MultiStore;
25use reifydb_store_single::SingleStore;
26use reifydb_sub_api::subsystem::Subsystem;
27use reifydb_sub_flow::{builder::FlowBuilderConfig, subsystem::FlowSubsystem};
28use reifydb_type::{params::Params, value::identity::IdentityId};
29
30mod error;
31mod utils;
32
33pub use error::JsError;
34use reifydb_function::registry::Functions;
35use reifydb_store_multi::{
36	config::{HotConfig, MultiStoreConfig},
37	hot::storage::HotStorage,
38};
39
40/// WebAssembly ReifyDB Engine
41///
42/// Provides an in-memory query engine that runs entirely in the browser.
43/// All data is stored in memory and lost when the page is closed.
44#[wasm_bindgen]
45pub struct WasmDB {
46	inner: StandardEngine,
47	flow_subsystem: FlowSubsystem,
48}
49
50#[wasm_bindgen]
51impl WasmDB {
52	/// Create a new in-memory ReifyDB engine
53	///
54	/// # Example
55	///
56	/// ```javascript
57	/// import init, { WasmDB } from './pkg/reifydb_engine_wasm.js';
58	///
59	/// await init();
60	/// const db = new WasmDB();
61	/// ```
62	#[wasm_bindgen(constructor)]
63	pub fn new() -> Result<WasmDB, JsValue> {
64		use reifydb_catalog::{catalog::Catalog, materialized::MaterializedCatalog};
65		use reifydb_core::{config::SystemConfig, event::EventBus, util::ioc::IocContainer};
66		use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig};
67		use reifydb_transaction::{
68			interceptor::factory::InterceptorFactory,
69			multi::transaction::{MultiTransaction, register_oracle_defaults},
70			single::SingleTransaction,
71		};
72
73		// Set panic hook for better error messages in browser console
74
75		#[cfg(feature = "console_error_panic_hook")]
76		console_error_panic_hook::set_once();
77
78		// WASM runtime with minimal threads (single-threaded)
79		let runtime = SharedRuntime::from_config(
80			SharedRuntimeConfig::default().async_threads(1).compute_threads(1).compute_max_in_flight(8),
81		);
82
83		// Create actor system at the top level - this will be shared by
84		// the transaction manager (watermark actors) and flow subsystem (poll/coordinator actors)
85		let actor_system = runtime.actor_system();
86
87		// Create event bus and stores
88		let eventbus = EventBus::new(&actor_system);
89		let multi_store = MultiStore::standard(MultiStoreConfig {
90			hot: Some(HotConfig {
91				storage: HotStorage::memory(),
92			}),
93			warm: None,
94			cold: None,
95			retention: Default::default(),
96			merge_config: Default::default(),
97			event_bus: eventbus.clone(),
98			actor_system: actor_system.clone(),
99		});
100		let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
101
102		// Create transactions
103		let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
104		let system_config = SystemConfig::new();
105		register_oracle_defaults(&system_config);
106		let multi = MultiTransaction::new(
107			multi_store.clone(),
108			single.clone(),
109			eventbus.clone(),
110			actor_system.clone(),
111			runtime.clock().clone(),
112			system_config.clone(),
113		)
114		.map_err(|e| JsError::from_error(&e))?;
115
116		// Setup IoC container
117		let mut ioc = IocContainer::new();
118
119		let materialized_catalog = MaterializedCatalog::new(system_config);
120		ioc = ioc.register(materialized_catalog.clone());
121
122		ioc = ioc.register(runtime.clone());
123
124		// Register metrics store for engine
125		ioc = ioc.register(single_store.clone());
126
127		// Register CdcStore (required by sub-flow)
128		let cdc_store = CdcStore::memory();
129		ioc = ioc.register(cdc_store.clone());
130
131		// Clone ioc for FlowSubsystem (engine consumes ioc)
132		let ioc_ref = ioc.clone();
133
134		// Build engine first — CDC producer needs it as CdcHost for schema registry access
135		let eventbus_clone = eventbus.clone();
136		let inner = StandardEngine::new(
137			multi,
138			single.clone(),
139			eventbus,
140			InterceptorFactory::default(),
141			Catalog::new(materialized_catalog, SchemaRegistry::new(single)),
142			runtime.clock().clone(),
143			Functions::defaults().build(),
144			Procedures::empty(),
145			reifydb_engine::transform::registry::Transforms::empty(),
146			ioc,
147			#[cfg(not(target_arch = "wasm32"))]
148			None,
149		);
150
151		// Spawn CDC producer actor on the shared runtime, passing engine as CdcHost
152		console_log("[WASM] Spawning CDC producer actor...");
153		let cdc_producer_handle = spawn_cdc_producer(
154			&actor_system,
155			cdc_store,
156			multi_store.clone(),
157			inner.clone(),
158			eventbus_clone.clone(),
159		);
160
161		// Register event listener to forward PostCommitEvent to CDC producer
162		let cdc_listener =
163			CdcProducerEventListener::new(cdc_producer_handle.actor_ref().clone(), runtime.clock().clone());
164		eventbus_clone.register::<PostCommitEvent, _>(cdc_listener);
165		console_log("[WASM] CDC producer actor registered!");
166
167		// Create and start FlowSubsystem
168		let flow_config = FlowBuilderConfig {
169			operators_dir: None, // No FFI operators in WASM
170			num_workers: 1,      // Single-threaded for WASM
171			custom_operators: HashMap::new(),
172		};
173		console_log("[WASM] Creating FlowSubsystem...");
174		let mut flow_subsystem = FlowSubsystem::new(flow_config, inner.clone(), &ioc_ref);
175		console_log("[WASM] Starting FlowSubsystem...");
176		flow_subsystem.start().map_err(|e| JsError::from_error(&e))?;
177		console_log("[WASM] FlowSubsystem started successfully!");
178
179		Ok(WasmDB {
180			inner,
181			flow_subsystem,
182		})
183	}
184
185	/// Execute a query and return results as JavaScript objects
186	///
187	/// # Example
188	///
189	/// ```javascript
190	/// const results = await db.query(`
191	///   FROM [{ name: "Alice", age: 30 }]
192	///   FILTER age > 25
193	/// `);
194	/// console.log(results); // [{ name: "Alice", age: 30 }]
195	/// ```
196	#[wasm_bindgen]
197	pub fn query(&self, rql: &str) -> Result<JsValue, JsValue> {
198		let identity = IdentityId::root();
199		let params = Params::None;
200
201		// Execute query
202		let frames = self.inner.query_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
203
204		// Convert frames to JavaScript array of objects
205		utils::frames_to_js(&frames)
206	}
207
208	/// Execute an admin operation (DDL + DML + Query) and return results
209	///
210	/// Admin operations include CREATE, ALTER, INSERT, UPDATE, DELETE, etc.
211	///
212	/// # Example
213	///
214	/// ```javascript
215	/// await db.admin("CREATE NAMESPACE demo");
216	/// await db.admin(`
217	///   CREATE TABLE demo.users {
218	///     id: int4,
219	///     name: utf8
220	///   }
221	/// `);
222	/// ```
223	#[wasm_bindgen]
224	pub fn admin(&self, rql: &str) -> Result<JsValue, JsValue> {
225		let identity = IdentityId::root();
226		let params = Params::None;
227
228		let frames = self.inner.admin_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
229
230		utils::frames_to_js(&frames)
231	}
232
233	/// Execute a command (DML) and return results
234	///
235	/// Commands include INSERT, UPDATE, DELETE, etc.
236	/// For DDL operations (CREATE, ALTER), use `admin()` instead.
237	#[wasm_bindgen]
238	pub fn command(&self, rql: &str) -> Result<JsValue, JsValue> {
239		let identity = IdentityId::root();
240		let params = Params::None;
241
242		let frames = self.inner.command_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
243
244		utils::frames_to_js(&frames)
245	}
246
247	/// Execute query with JSON parameters
248	///
249	/// # Example
250	///
251	/// ```javascript
252	/// const results = await db.queryWithParams(
253	///   "FROM users FILTER age > $min_age",
254	///   { min_age: 25 }
255	/// );
256	/// ```
257	#[wasm_bindgen(js_name = queryWithParams)]
258	pub fn query_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
259		let identity = IdentityId::root();
260
261		// Parse JavaScript params to Rust Params
262		let params = utils::parse_params(params_js)?;
263
264		let frames = self.inner.query_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
265
266		utils::frames_to_js(&frames)
267	}
268
269	/// Execute admin with JSON parameters
270	#[wasm_bindgen(js_name = adminWithParams)]
271	pub fn admin_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
272		let identity = IdentityId::root();
273
274		let params = utils::parse_params(params_js)?;
275
276		let frames = self.inner.admin_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
277
278		utils::frames_to_js(&frames)
279	}
280
281	/// Execute command with JSON parameters
282	#[wasm_bindgen(js_name = commandWithParams)]
283	pub fn command_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
284		let identity = IdentityId::root();
285
286		let params = utils::parse_params(params_js)?;
287
288		let frames = self.inner.command_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
289
290		utils::frames_to_js(&frames)
291	}
292}
293
294impl Drop for WasmDB {
295	fn drop(&mut self) {
296		let _ = self.flow_subsystem.shutdown();
297	}
298}
299
300impl Default for WasmDB {
301	fn default() -> Self {
302		Self::new().expect("Failed to create WasmDB")
303	}
304}