Skip to main content

reifydb_webassembly/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! WebAssembly bindings for ReifyDB query engine
5//!
6//! This crate provides JavaScript-compatible bindings for running ReifyDB
7//! queries in a browser or Node.js environment with in-memory storage.
8
9use std::{collections::HashMap, fmt::Write};
10
11use reifydb_auth::AuthVersion;
12use reifydb_catalog::{
13	CatalogVersion,
14	bootstrap::{
15		bootstrap_config_defaults, bootstrap_system_procedures, load_materialized_catalog, load_schema_registry,
16	},
17	catalog::Catalog,
18	materialized::MaterializedCatalog,
19	schema::SchemaRegistry,
20	system::SystemCatalog,
21};
22use reifydb_cdc::{
23	CdcVersion,
24	produce::producer::{CdcProducerEventListener, spawn_cdc_producer},
25	storage::CdcStore,
26};
27use reifydb_core::{
28	CoreVersion,
29	config::SystemConfig,
30	event::{EventBus, transaction::PostCommitEvent},
31	interface::version::{ComponentType, HasVersion, SystemVersion},
32	util::ioc::IocContainer,
33};
34use reifydb_engine::{
35	EngineVersion,
36	engine::StandardEngine,
37	procedure::{registry::Procedures, system::set_config::SetConfigProcedure},
38};
39use reifydb_function::registry::Functions;
40use reifydb_rql::RqlVersion;
41use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig};
42use reifydb_store_multi::{
43	MultiStore, MultiStoreVersion,
44	config::{HotConfig, MultiStoreConfig},
45	hot::storage::HotStorage,
46};
47use reifydb_store_single::{SingleStore, SingleStoreVersion};
48use reifydb_sub_api::subsystem::Subsystem;
49use reifydb_sub_flow::{builder::FlowBuilderConfig, subsystem::FlowSubsystem};
50use reifydb_transaction::{
51	TransactionVersion,
52	interceptor::factory::InterceptorFactory,
53	multi::transaction::{MultiTransaction, register_oracle_defaults},
54	single::SingleTransaction,
55};
56use reifydb_type::{params::Params, value::identity::IdentityId};
57use wasm_bindgen::prelude::*;
58
59mod error;
60mod utils;
61
62pub use error::JsError;
63
64// Debug helper to log to browser console
65fn console_log(msg: &str) {
66	web_sys::console::log_1(&msg.into());
67}
68
69/// WebAssembly ReifyDB Engine
70///
71/// Provides an in-memory query engine that runs entirely in the browser.
72/// All data is stored in memory and lost when the page is closed.
73#[wasm_bindgen]
74pub struct WasmDB {
75	inner: StandardEngine,
76	flow_subsystem: FlowSubsystem,
77}
78
79#[wasm_bindgen]
80impl WasmDB {
81	/// Create a new in-memory ReifyDB engine
82	///
83	/// # Example
84	///
85	/// ```javascript
86	/// import init, { WasmDB } from './pkg/reifydb_engine_wasm.js';
87	///
88	/// await init();
89	/// const db = new WasmDB();
90	/// ```
91	#[wasm_bindgen(constructor)]
92	pub fn new() -> Result<WasmDB, JsValue> {
93		// Set panic hook for better error messages in browser console
94
95		#[cfg(feature = "console_error_panic_hook")]
96		console_error_panic_hook::set_once();
97
98		// WASM runtime with minimal threads (single-threaded)
99		let runtime = SharedRuntime::from_config(
100			SharedRuntimeConfig::default()
101				.async_threads(1)
102				.compute_threads(1)
103				.compute_max_in_flight(8)
104				.mock_clock(0),
105		);
106
107		// Create actor system at the top level - this will be shared by
108		// the transaction manager (watermark actors) and flow subsystem (poll/coordinator actors)
109		let actor_system = runtime.actor_system();
110
111		// Create event bus and stores
112		let eventbus = EventBus::new(&actor_system);
113		let multi_store = MultiStore::standard(MultiStoreConfig {
114			hot: Some(HotConfig {
115				storage: HotStorage::memory(),
116			}),
117			warm: None,
118			cold: None,
119			retention: Default::default(),
120			merge_config: Default::default(),
121			event_bus: eventbus.clone(),
122			actor_system: actor_system.clone(),
123		});
124		let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
125
126		// Create transactions
127		let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
128		let system_config = SystemConfig::new();
129		register_oracle_defaults(&system_config);
130		let multi = MultiTransaction::new(
131			multi_store.clone(),
132			single.clone(),
133			eventbus.clone(),
134			actor_system.clone(),
135			runtime.clock().clone(),
136			system_config.clone(),
137		)
138		.map_err(|e| JsError::from_error(&e))?;
139
140		// Setup IoC container
141		let mut ioc = IocContainer::new();
142
143		let materialized_catalog = MaterializedCatalog::new(system_config);
144		ioc = ioc.register(materialized_catalog.clone());
145
146		ioc = ioc.register(runtime.clone());
147
148		// Register metrics store for engine
149		ioc = ioc.register(single_store.clone());
150
151		// Register CdcStore (required by sub-flow)
152		let cdc_store = CdcStore::memory();
153		ioc = ioc.register(cdc_store.clone());
154
155		// Clone ioc for FlowSubsystem (engine consumes ioc)
156		let ioc_ref = ioc.clone();
157
158		// Create SchemaRegistry for bootstrap
159		let schema_registry = SchemaRegistry::new(single.clone());
160
161		// Run shared bootstrap: load catalog, config defaults, system procedures, schemas
162		load_materialized_catalog(&multi, &single, &materialized_catalog)
163			.map_err(|e| JsError::from_error(&e))?;
164		bootstrap_config_defaults(&multi, &single, &materialized_catalog, &eventbus)
165			.map_err(|e| JsError::from_error(&e))?;
166		bootstrap_system_procedures(&multi, &single, &materialized_catalog, &schema_registry, &eventbus)
167			.map_err(|e| JsError::from_error(&e))?;
168		load_schema_registry(&multi, &single, &schema_registry).map_err(|e| JsError::from_error(&e))?;
169
170		// Build procedures with system::config::set native procedure
171		let procedures =
172			Procedures::builder().with_procedure("system::config::set", SetConfigProcedure::new).build();
173
174		// Build engine with bootstrap-initialized catalog
175		let eventbus_clone = eventbus.clone();
176		let inner = StandardEngine::new(
177			multi,
178			single.clone(),
179			eventbus,
180			InterceptorFactory::default(),
181			Catalog::new(materialized_catalog, schema_registry),
182			runtime.clock().clone(),
183			Functions::defaults().build(),
184			procedures,
185			reifydb_engine::transform::registry::Transforms::empty(),
186			ioc,
187			#[cfg(not(target_arch = "wasm32"))]
188			None,
189		);
190
191		// Spawn CDC producer actor on the shared runtime, passing engine as CdcHost
192		console_log("[WASM] Spawning CDC producer actor...");
193		let cdc_producer_handle = spawn_cdc_producer(
194			&actor_system,
195			cdc_store,
196			multi_store.clone(),
197			inner.clone(),
198			eventbus_clone.clone(),
199		);
200
201		// Register event listener to forward PostCommitEvent to CDC producer
202		let cdc_listener =
203			CdcProducerEventListener::new(cdc_producer_handle.actor_ref().clone(), runtime.clock().clone());
204		eventbus_clone.register::<PostCommitEvent, _>(cdc_listener);
205		console_log("[WASM] CDC producer actor registered!");
206
207		// Create and start FlowSubsystem
208		let flow_config = FlowBuilderConfig {
209			operators_dir: None, // No FFI operators in WASM
210			num_workers: 1,      // Single-threaded for WASM
211			custom_operators: HashMap::new(),
212		};
213		console_log("[WASM] Creating FlowSubsystem...");
214		let mut flow_subsystem = FlowSubsystem::new(flow_config, inner.clone(), &ioc_ref);
215		console_log("[WASM] Starting FlowSubsystem...");
216		flow_subsystem.start().map_err(|e| JsError::from_error(&e))?;
217		console_log("[WASM] FlowSubsystem started successfully!");
218
219		// Collect all versions and register SystemCatalog
220		let mut all_versions = Vec::new();
221		all_versions.push(SystemVersion {
222			name: "reifydb-webassembly".to_string(),
223			version: env!("CARGO_PKG_VERSION").to_string(),
224			description: "ReifyDB WebAssembly Engine".to_string(),
225			r#type: ComponentType::Package,
226		});
227		all_versions.push(CoreVersion.version());
228		all_versions.push(EngineVersion.version());
229		all_versions.push(CatalogVersion.version());
230		all_versions.push(MultiStoreVersion.version());
231		all_versions.push(SingleStoreVersion.version());
232		all_versions.push(TransactionVersion.version());
233		all_versions.push(AuthVersion.version());
234		all_versions.push(RqlVersion.version());
235		all_versions.push(CdcVersion.version());
236		all_versions.push(flow_subsystem.version());
237
238		ioc_ref.register_service(SystemCatalog::new(all_versions));
239
240		Ok(WasmDB {
241			inner,
242			flow_subsystem,
243		})
244	}
245
246	/// Execute a query and return results as JavaScript objects
247	///
248	/// # Example
249	///
250	/// ```javascript
251	/// const results = await db.query(`
252	///   FROM [{ name: "Alice", age: 30 }]
253	///   FILTER age > 25
254	/// `);
255	/// console.log(results); // [{ name: "Alice", age: 30 }]
256	/// ```
257	#[wasm_bindgen]
258	pub fn query(&self, rql: &str) -> Result<JsValue, JsValue> {
259		let identity = IdentityId::root();
260		let params = Params::None;
261
262		// Execute query
263		let frames = self.inner.query_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
264
265		// Convert frames to JavaScript array of objects
266		utils::frames_to_js(&frames)
267	}
268
269	/// Execute an admin operation (DDL + DML + Query) and return results
270	///
271	/// Admin operations include CREATE, ALTER, INSERT, UPDATE, DELETE, etc.
272	///
273	/// # Example
274	///
275	/// ```javascript
276	/// await db.admin("CREATE NAMESPACE demo");
277	/// await db.admin(`
278	///   CREATE TABLE demo.users {
279	///     id: int4,
280	///     name: utf8
281	///   }
282	/// `);
283	/// ```
284	#[wasm_bindgen]
285	pub fn admin(&self, rql: &str) -> Result<JsValue, JsValue> {
286		let identity = IdentityId::root();
287		let params = Params::None;
288
289		let frames = self.inner.admin_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
290
291		utils::frames_to_js(&frames)
292	}
293
294	/// Execute a command (DML) and return results
295	///
296	/// Commands include INSERT, UPDATE, DELETE, etc.
297	/// For DDL operations (CREATE, ALTER), use `admin()` instead.
298	#[wasm_bindgen]
299	pub fn command(&self, rql: &str) -> Result<JsValue, JsValue> {
300		let identity = IdentityId::root();
301		let params = Params::None;
302
303		let frames = self.inner.command_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
304
305		utils::frames_to_js(&frames)
306	}
307
308	/// Execute query with JSON parameters
309	///
310	/// # Example
311	///
312	/// ```javascript
313	/// const results = await db.queryWithParams(
314	///   "FROM users FILTER age > $min_age",
315	///   { min_age: 25 }
316	/// );
317	/// ```
318	#[wasm_bindgen(js_name = queryWithParams)]
319	pub fn query_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
320		let identity = IdentityId::root();
321
322		// Parse JavaScript params to Rust Params
323		let params = utils::parse_params(params_js)?;
324
325		let frames = self.inner.query_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
326
327		utils::frames_to_js(&frames)
328	}
329
330	/// Execute admin with JSON parameters
331	#[wasm_bindgen(js_name = adminWithParams)]
332	pub fn admin_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
333		let identity = IdentityId::root();
334
335		let params = utils::parse_params(params_js)?;
336
337		let frames = self.inner.admin_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
338
339		utils::frames_to_js(&frames)
340	}
341
342	/// Execute command with JSON parameters
343	#[wasm_bindgen(js_name = commandWithParams)]
344	pub fn command_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
345		let identity = IdentityId::root();
346
347		let params = utils::parse_params(params_js)?;
348
349		let frames = self.inner.command_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
350
351		utils::frames_to_js(&frames)
352	}
353
354	/// Execute a command and return Display-formatted text output
355	#[wasm_bindgen(js_name = commandText)]
356	pub fn command_text(&self, rql: &str) -> Result<String, JsValue> {
357		let frames = self
358			.inner
359			.command_as(IdentityId::root(), rql, Params::None)
360			.map_err(|e| JsError::from_error(&e))?;
361		let mut output = String::new();
362		for frame in &frames {
363			writeln!(output, "{}", frame).map_err(|e| JsError::from_str(&e.to_string()))?;
364		}
365		Ok(output)
366	}
367
368	/// Execute an admin operation and return Display-formatted text output
369	#[wasm_bindgen(js_name = adminText)]
370	pub fn admin_text(&self, rql: &str) -> Result<String, JsValue> {
371		let frames = self
372			.inner
373			.admin_as(IdentityId::root(), rql, Params::None)
374			.map_err(|e| JsError::from_error(&e))?;
375		let mut output = String::new();
376		for frame in &frames {
377			writeln!(output, "{}", frame).map_err(|e| JsError::from_str(&e.to_string()))?;
378		}
379		Ok(output)
380	}
381
382	/// Execute a query and return Display-formatted text output
383	#[wasm_bindgen(js_name = queryText)]
384	pub fn query_text(&self, rql: &str) -> Result<String, JsValue> {
385		let frames = self
386			.inner
387			.query_as(IdentityId::root(), rql, Params::None)
388			.map_err(|e| JsError::from_error(&e))?;
389		let mut output = String::new();
390		for frame in &frames {
391			writeln!(output, "{}", frame).map_err(|e| JsError::from_str(&e.to_string()))?;
392		}
393		Ok(output)
394	}
395}
396
397impl Drop for WasmDB {
398	fn drop(&mut self) {
399		let _ = self.flow_subsystem.shutdown();
400	}
401}
402
403impl Default for WasmDB {
404	fn default() -> Self {
405		Self::new().expect("Failed to create WasmDB")
406	}
407}