1use std::{collections::HashMap, fmt::Write};
10
11use reifydb_auth::AuthVersion;
12use reifydb_catalog::{
13 CatalogVersion,
14 bootstrap::{
15 bootstrap_config_defaults, bootstrap_system_procedures, load_materialized_catalog, load_schema_registry,
16 },
17 catalog::Catalog,
18 materialized::MaterializedCatalog,
19 schema::SchemaRegistry,
20 system::SystemCatalog,
21};
22use reifydb_cdc::{
23 CdcVersion,
24 produce::producer::{CdcProducerEventListener, spawn_cdc_producer},
25 storage::CdcStore,
26};
27use reifydb_core::{
28 CoreVersion,
29 config::SystemConfig,
30 event::{EventBus, transaction::PostCommitEvent},
31 interface::version::{ComponentType, HasVersion, SystemVersion},
32 util::ioc::IocContainer,
33};
34use reifydb_engine::{
35 EngineVersion,
36 engine::StandardEngine,
37 procedure::{registry::Procedures, system::set_config::SetConfigProcedure},
38};
39use reifydb_function::registry::Functions;
40use reifydb_rql::RqlVersion;
41use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig};
42use reifydb_store_multi::{
43 MultiStore, MultiStoreVersion,
44 config::{HotConfig, MultiStoreConfig},
45 hot::storage::HotStorage,
46};
47use reifydb_store_single::{SingleStore, SingleStoreVersion};
48use reifydb_sub_api::subsystem::Subsystem;
49use reifydb_sub_flow::{builder::FlowBuilderConfig, subsystem::FlowSubsystem};
50use reifydb_transaction::{
51 TransactionVersion,
52 interceptor::factory::InterceptorFactory,
53 multi::transaction::{MultiTransaction, register_oracle_defaults},
54 single::SingleTransaction,
55};
56use reifydb_type::{params::Params, value::identity::IdentityId};
57use wasm_bindgen::prelude::*;
58
59mod error;
60mod utils;
61
62pub use error::JsError;
63
64fn console_log(msg: &str) {
66 web_sys::console::log_1(&msg.into());
67}
68
69#[wasm_bindgen]
74pub struct WasmDB {
75 inner: StandardEngine,
76 flow_subsystem: FlowSubsystem,
77}
78
79#[wasm_bindgen]
80impl WasmDB {
81 #[wasm_bindgen(constructor)]
92 pub fn new() -> Result<WasmDB, JsValue> {
93 #[cfg(feature = "console_error_panic_hook")]
96 console_error_panic_hook::set_once();
97
98 let runtime = SharedRuntime::from_config(
100 SharedRuntimeConfig::default()
101 .async_threads(1)
102 .compute_threads(1)
103 .compute_max_in_flight(8)
104 .mock_clock(0),
105 );
106
107 let actor_system = runtime.actor_system();
110
111 let eventbus = EventBus::new(&actor_system);
113 let multi_store = MultiStore::standard(MultiStoreConfig {
114 hot: Some(HotConfig {
115 storage: HotStorage::memory(),
116 }),
117 warm: None,
118 cold: None,
119 retention: Default::default(),
120 merge_config: Default::default(),
121 event_bus: eventbus.clone(),
122 actor_system: actor_system.clone(),
123 });
124 let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
125
126 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
128 let system_config = SystemConfig::new();
129 register_oracle_defaults(&system_config);
130 let multi = MultiTransaction::new(
131 multi_store.clone(),
132 single.clone(),
133 eventbus.clone(),
134 actor_system.clone(),
135 runtime.clock().clone(),
136 system_config.clone(),
137 )
138 .map_err(|e| JsError::from_error(&e))?;
139
140 let mut ioc = IocContainer::new();
142
143 let materialized_catalog = MaterializedCatalog::new(system_config);
144 ioc = ioc.register(materialized_catalog.clone());
145
146 ioc = ioc.register(runtime.clone());
147
148 ioc = ioc.register(single_store.clone());
150
151 let cdc_store = CdcStore::memory();
153 ioc = ioc.register(cdc_store.clone());
154
155 let ioc_ref = ioc.clone();
157
158 let schema_registry = SchemaRegistry::new(single.clone());
160
161 load_materialized_catalog(&multi, &single, &materialized_catalog)
163 .map_err(|e| JsError::from_error(&e))?;
164 bootstrap_config_defaults(&multi, &single, &materialized_catalog, &eventbus)
165 .map_err(|e| JsError::from_error(&e))?;
166 bootstrap_system_procedures(&multi, &single, &materialized_catalog, &schema_registry, &eventbus)
167 .map_err(|e| JsError::from_error(&e))?;
168 load_schema_registry(&multi, &single, &schema_registry).map_err(|e| JsError::from_error(&e))?;
169
170 let procedures =
172 Procedures::builder().with_procedure("system::config::set", SetConfigProcedure::new).build();
173
174 let eventbus_clone = eventbus.clone();
176 let inner = StandardEngine::new(
177 multi,
178 single.clone(),
179 eventbus,
180 InterceptorFactory::default(),
181 Catalog::new(materialized_catalog, schema_registry),
182 runtime.clock().clone(),
183 Functions::defaults().build(),
184 procedures,
185 reifydb_engine::transform::registry::Transforms::empty(),
186 ioc,
187 #[cfg(not(target_arch = "wasm32"))]
188 None,
189 );
190
191 console_log("[WASM] Spawning CDC producer actor...");
193 let cdc_producer_handle = spawn_cdc_producer(
194 &actor_system,
195 cdc_store,
196 multi_store.clone(),
197 inner.clone(),
198 eventbus_clone.clone(),
199 );
200
201 let cdc_listener =
203 CdcProducerEventListener::new(cdc_producer_handle.actor_ref().clone(), runtime.clock().clone());
204 eventbus_clone.register::<PostCommitEvent, _>(cdc_listener);
205 console_log("[WASM] CDC producer actor registered!");
206
207 let flow_config = FlowBuilderConfig {
209 operators_dir: None, num_workers: 1, custom_operators: HashMap::new(),
212 };
213 console_log("[WASM] Creating FlowSubsystem...");
214 let mut flow_subsystem = FlowSubsystem::new(flow_config, inner.clone(), &ioc_ref);
215 console_log("[WASM] Starting FlowSubsystem...");
216 flow_subsystem.start().map_err(|e| JsError::from_error(&e))?;
217 console_log("[WASM] FlowSubsystem started successfully!");
218
219 let mut all_versions = Vec::new();
221 all_versions.push(SystemVersion {
222 name: "reifydb-webassembly".to_string(),
223 version: env!("CARGO_PKG_VERSION").to_string(),
224 description: "ReifyDB WebAssembly Engine".to_string(),
225 r#type: ComponentType::Package,
226 });
227 all_versions.push(CoreVersion.version());
228 all_versions.push(EngineVersion.version());
229 all_versions.push(CatalogVersion.version());
230 all_versions.push(MultiStoreVersion.version());
231 all_versions.push(SingleStoreVersion.version());
232 all_versions.push(TransactionVersion.version());
233 all_versions.push(AuthVersion.version());
234 all_versions.push(RqlVersion.version());
235 all_versions.push(CdcVersion.version());
236 all_versions.push(flow_subsystem.version());
237
238 ioc_ref.register_service(SystemCatalog::new(all_versions));
239
240 Ok(WasmDB {
241 inner,
242 flow_subsystem,
243 })
244 }
245
246 #[wasm_bindgen]
258 pub fn query(&self, rql: &str) -> Result<JsValue, JsValue> {
259 let identity = IdentityId::root();
260 let params = Params::None;
261
262 let frames = self.inner.query_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
264
265 utils::frames_to_js(&frames)
267 }
268
269 #[wasm_bindgen]
285 pub fn admin(&self, rql: &str) -> Result<JsValue, JsValue> {
286 let identity = IdentityId::root();
287 let params = Params::None;
288
289 let frames = self.inner.admin_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
290
291 utils::frames_to_js(&frames)
292 }
293
294 #[wasm_bindgen]
299 pub fn command(&self, rql: &str) -> Result<JsValue, JsValue> {
300 let identity = IdentityId::root();
301 let params = Params::None;
302
303 let frames = self.inner.command_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
304
305 utils::frames_to_js(&frames)
306 }
307
308 #[wasm_bindgen(js_name = queryWithParams)]
319 pub fn query_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
320 let identity = IdentityId::root();
321
322 let params = utils::parse_params(params_js)?;
324
325 let frames = self.inner.query_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
326
327 utils::frames_to_js(&frames)
328 }
329
330 #[wasm_bindgen(js_name = adminWithParams)]
332 pub fn admin_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
333 let identity = IdentityId::root();
334
335 let params = utils::parse_params(params_js)?;
336
337 let frames = self.inner.admin_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
338
339 utils::frames_to_js(&frames)
340 }
341
342 #[wasm_bindgen(js_name = commandWithParams)]
344 pub fn command_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
345 let identity = IdentityId::root();
346
347 let params = utils::parse_params(params_js)?;
348
349 let frames = self.inner.command_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
350
351 utils::frames_to_js(&frames)
352 }
353
354 #[wasm_bindgen(js_name = commandText)]
356 pub fn command_text(&self, rql: &str) -> Result<String, JsValue> {
357 let frames = self
358 .inner
359 .command_as(IdentityId::root(), rql, Params::None)
360 .map_err(|e| JsError::from_error(&e))?;
361 let mut output = String::new();
362 for frame in &frames {
363 writeln!(output, "{}", frame).map_err(|e| JsError::from_str(&e.to_string()))?;
364 }
365 Ok(output)
366 }
367
368 #[wasm_bindgen(js_name = adminText)]
370 pub fn admin_text(&self, rql: &str) -> Result<String, JsValue> {
371 let frames = self
372 .inner
373 .admin_as(IdentityId::root(), rql, Params::None)
374 .map_err(|e| JsError::from_error(&e))?;
375 let mut output = String::new();
376 for frame in &frames {
377 writeln!(output, "{}", frame).map_err(|e| JsError::from_str(&e.to_string()))?;
378 }
379 Ok(output)
380 }
381
382 #[wasm_bindgen(js_name = queryText)]
384 pub fn query_text(&self, rql: &str) -> Result<String, JsValue> {
385 let frames = self
386 .inner
387 .query_as(IdentityId::root(), rql, Params::None)
388 .map_err(|e| JsError::from_error(&e))?;
389 let mut output = String::new();
390 for frame in &frames {
391 writeln!(output, "{}", frame).map_err(|e| JsError::from_str(&e.to_string()))?;
392 }
393 Ok(output)
394 }
395}
396
397impl Drop for WasmDB {
398 fn drop(&mut self) {
399 let _ = self.flow_subsystem.shutdown();
400 }
401}
402
403impl Default for WasmDB {
404 fn default() -> Self {
405 Self::new().expect("Failed to create WasmDB")
406 }
407}