1use std::collections::HashMap;
10
11use reifydb_catalog::schema::SchemaRegistry;
12use reifydb_engine::{engine::StandardEngine, procedure::registry::Procedures};
13use wasm_bindgen::prelude::*;
14
15fn console_log(msg: &str) {
17 web_sys::console::log_1(&msg.into());
18}
19use reifydb_cdc::{
20 produce::producer::{CdcProducerEventListener, spawn_cdc_producer},
21 storage::CdcStore,
22};
23use reifydb_core::event::transaction::PostCommitEvent;
24use reifydb_store_multi::MultiStore;
25use reifydb_store_single::SingleStore;
26use reifydb_sub_api::subsystem::Subsystem;
27use reifydb_sub_flow::{builder::FlowBuilderConfig, subsystem::FlowSubsystem};
28use reifydb_type::{params::Params, value::identity::IdentityId};
29
30mod error;
31mod utils;
32
33pub use error::JsError;
34use reifydb_function::registry::Functions;
35use reifydb_store_multi::{
36 config::{HotConfig, MultiStoreConfig},
37 hot::storage::HotStorage,
38};
39
40#[wasm_bindgen]
45pub struct WasmDB {
46 inner: StandardEngine,
47 flow_subsystem: FlowSubsystem,
48}
49
50#[wasm_bindgen]
51impl WasmDB {
52 #[wasm_bindgen(constructor)]
63 pub fn new() -> Result<WasmDB, JsValue> {
64 use reifydb_catalog::{catalog::Catalog, materialized::MaterializedCatalog};
65 use reifydb_core::{config::SystemConfig, event::EventBus, util::ioc::IocContainer};
66 use reifydb_runtime::{SharedRuntime, SharedRuntimeConfig};
67 use reifydb_transaction::{
68 interceptor::factory::InterceptorFactory,
69 multi::transaction::{MultiTransaction, register_oracle_defaults},
70 single::SingleTransaction,
71 };
72
73 #[cfg(feature = "console_error_panic_hook")]
76 console_error_panic_hook::set_once();
77
78 let runtime = SharedRuntime::from_config(
80 SharedRuntimeConfig::default().async_threads(1).compute_threads(1).compute_max_in_flight(8),
81 );
82
83 let actor_system = runtime.actor_system();
86
87 let eventbus = EventBus::new(&actor_system);
89 let multi_store = MultiStore::standard(MultiStoreConfig {
90 hot: Some(HotConfig {
91 storage: HotStorage::memory(),
92 }),
93 warm: None,
94 cold: None,
95 retention: Default::default(),
96 merge_config: Default::default(),
97 event_bus: eventbus.clone(),
98 actor_system: actor_system.clone(),
99 });
100 let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
101
102 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
104 let system_config = SystemConfig::new();
105 register_oracle_defaults(&system_config);
106 let multi = MultiTransaction::new(
107 multi_store.clone(),
108 single.clone(),
109 eventbus.clone(),
110 actor_system.clone(),
111 runtime.clock().clone(),
112 system_config.clone(),
113 )
114 .map_err(|e| JsError::from_error(&e))?;
115
116 let mut ioc = IocContainer::new();
118
119 let materialized_catalog = MaterializedCatalog::new(system_config);
120 ioc = ioc.register(materialized_catalog.clone());
121
122 ioc = ioc.register(runtime.clone());
123
124 ioc = ioc.register(single_store.clone());
126
127 let cdc_store = CdcStore::memory();
129 ioc = ioc.register(cdc_store.clone());
130
131 let ioc_ref = ioc.clone();
133
134 let eventbus_clone = eventbus.clone();
136 let inner = StandardEngine::new(
137 multi,
138 single.clone(),
139 eventbus,
140 InterceptorFactory::default(),
141 Catalog::new(materialized_catalog, SchemaRegistry::new(single)),
142 runtime.clock().clone(),
143 Functions::defaults().build(),
144 Procedures::empty(),
145 reifydb_engine::transform::registry::Transforms::empty(),
146 ioc,
147 #[cfg(not(target_arch = "wasm32"))]
148 None,
149 );
150
151 console_log("[WASM] Spawning CDC producer actor...");
153 let cdc_producer_handle = spawn_cdc_producer(
154 &actor_system,
155 cdc_store,
156 multi_store.clone(),
157 inner.clone(),
158 eventbus_clone.clone(),
159 );
160
161 let cdc_listener =
163 CdcProducerEventListener::new(cdc_producer_handle.actor_ref().clone(), runtime.clock().clone());
164 eventbus_clone.register::<PostCommitEvent, _>(cdc_listener);
165 console_log("[WASM] CDC producer actor registered!");
166
167 let flow_config = FlowBuilderConfig {
169 operators_dir: None, num_workers: 1, custom_operators: HashMap::new(),
172 };
173 console_log("[WASM] Creating FlowSubsystem...");
174 let mut flow_subsystem = FlowSubsystem::new(flow_config, inner.clone(), &ioc_ref);
175 console_log("[WASM] Starting FlowSubsystem...");
176 flow_subsystem.start().map_err(|e| JsError::from_error(&e))?;
177 console_log("[WASM] FlowSubsystem started successfully!");
178
179 Ok(WasmDB {
180 inner,
181 flow_subsystem,
182 })
183 }
184
185 #[wasm_bindgen]
197 pub fn query(&self, rql: &str) -> Result<JsValue, JsValue> {
198 let identity = IdentityId::root();
199 let params = Params::None;
200
201 let frames = self.inner.query_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
203
204 utils::frames_to_js(&frames)
206 }
207
208 #[wasm_bindgen]
224 pub fn admin(&self, rql: &str) -> Result<JsValue, JsValue> {
225 let identity = IdentityId::root();
226 let params = Params::None;
227
228 let frames = self.inner.admin_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
229
230 utils::frames_to_js(&frames)
231 }
232
233 #[wasm_bindgen]
238 pub fn command(&self, rql: &str) -> Result<JsValue, JsValue> {
239 let identity = IdentityId::root();
240 let params = Params::None;
241
242 let frames = self.inner.command_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
243
244 utils::frames_to_js(&frames)
245 }
246
247 #[wasm_bindgen(js_name = queryWithParams)]
258 pub fn query_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
259 let identity = IdentityId::root();
260
261 let params = utils::parse_params(params_js)?;
263
264 let frames = self.inner.query_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
265
266 utils::frames_to_js(&frames)
267 }
268
269 #[wasm_bindgen(js_name = adminWithParams)]
271 pub fn admin_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
272 let identity = IdentityId::root();
273
274 let params = utils::parse_params(params_js)?;
275
276 let frames = self.inner.admin_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
277
278 utils::frames_to_js(&frames)
279 }
280
281 #[wasm_bindgen(js_name = commandWithParams)]
283 pub fn command_with_params(&self, rql: &str, params_js: JsValue) -> Result<JsValue, JsValue> {
284 let identity = IdentityId::root();
285
286 let params = utils::parse_params(params_js)?;
287
288 let frames = self.inner.command_as(identity, rql, params).map_err(|e| JsError::from_error(&e))?;
289
290 utils::frames_to_js(&frames)
291 }
292}
293
294impl Drop for WasmDB {
295 fn drop(&mut self) {
296 let _ = self.flow_subsystem.shutdown();
297 }
298}
299
300impl Default for WasmDB {
301 fn default() -> Self {
302 Self::new().expect("Failed to create WasmDB")
303 }
304}