1use serde::Serialize;
27use varpulis_runtime::engine::graph::program_to_graph;
28use varpulis_runtime::Engine;
29use wasm_bindgen::prelude::*;
30
31#[wasm_bindgen]
36pub struct WasmEngine {
37 engine: Engine,
38 program_source: String,
39 program_loaded: bool,
40}
41
42impl std::fmt::Debug for WasmEngine {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("WasmEngine")
45 .field("program_loaded", &self.program_loaded)
46 .finish()
47 }
48}
49
50impl Default for WasmEngine {
51 fn default() -> Self {
52 Self {
53 engine: Engine::new_sync(),
54 program_source: String::new(),
55 program_loaded: false,
56 }
57 }
58}
59
60#[derive(Serialize)]
62struct LoadResult {
63 ok: bool,
64 streams: Vec<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 error: Option<String>,
67}
68
69#[derive(Serialize)]
71struct OutputEvent {
72 stream: String,
73 event: serde_json::Value,
74 timestamp: String,
75}
76
77#[derive(Serialize)]
79struct ProcessResult {
80 ok: bool,
81 outputs: Vec<OutputEvent>,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 error: Option<String>,
84}
85
86#[wasm_bindgen]
87impl WasmEngine {
88 #[wasm_bindgen(constructor)]
90 pub fn new() -> Self {
91 Self::default()
92 }
93
94 pub fn load(&mut self, vpl: &str) -> String {
98 let program = match varpulis_parser::parse(vpl) {
99 Ok(p) => p,
100 Err(e) => {
101 return json(&LoadResult {
102 ok: false,
103 streams: vec![],
104 error: Some(format!("Parse error: {e}")),
105 });
106 }
107 };
108
109 match self.engine.load(&program) {
110 Ok(()) => {
111 self.program_source = vpl.to_string();
112 self.program_loaded = true;
113 let streams = self
114 .engine
115 .stream_names()
116 .into_iter()
117 .map(String::from)
118 .collect();
119 json(&LoadResult {
120 ok: true,
121 streams,
122 error: None,
123 })
124 }
125 Err(e) => json(&LoadResult {
126 ok: false,
127 streams: vec![],
128 error: Some(format!("Load error: {e}")),
129 }),
130 }
131 }
132
133 #[wasm_bindgen(js_name = "processEvent")]
142 pub fn process_event(&mut self, event_json: &str) -> String {
143 let event = match parse_event(event_json) {
144 Ok(e) => e,
145 Err(msg) => {
146 return json(&ProcessResult {
147 ok: false,
148 outputs: vec![],
149 error: Some(msg),
150 });
151 }
152 };
153
154 match self.engine.process_batch_sync_collect(vec![event]) {
155 Ok(outputs) => {
156 let output_events = outputs.into_iter().map(event_to_output).collect();
157 json(&ProcessResult {
158 ok: true,
159 outputs: output_events,
160 error: None,
161 })
162 }
163 Err(e) => json(&ProcessResult {
164 ok: false,
165 outputs: vec![],
166 error: Some(format!("Processing error: {e}")),
167 }),
168 }
169 }
170
171 #[wasm_bindgen(js_name = "processBatch")]
173 pub fn process_batch(&mut self, events_json: &str) -> String {
174 let events: Vec<serde_json::Value> = match serde_json::from_str(events_json) {
175 Ok(v) => v,
176 Err(e) => {
177 return json(&ProcessResult {
178 ok: false,
179 outputs: vec![],
180 error: Some(format!("Invalid JSON array: {e}")),
181 });
182 }
183 };
184
185 let parsed: Result<Vec<_>, _> = events
186 .iter()
187 .map(|v| {
188 let s = serde_json::to_string(v).unwrap_or_default();
189 parse_event(&s)
190 })
191 .collect();
192
193 let events = match parsed {
194 Ok(e) => e,
195 Err(msg) => {
196 return json(&ProcessResult {
197 ok: false,
198 outputs: vec![],
199 error: Some(msg),
200 });
201 }
202 };
203
204 match self.engine.process_batch_sync_collect(events) {
205 Ok(outputs) => {
206 let output_events = outputs.into_iter().map(event_to_output).collect();
207 json(&ProcessResult {
208 ok: true,
209 outputs: output_events,
210 error: None,
211 })
212 }
213 Err(e) => json(&ProcessResult {
214 ok: false,
215 outputs: vec![],
216 error: Some(format!("Processing error: {e}")),
217 }),
218 }
219 }
220
221 #[wasm_bindgen(js_name = "getTopology")]
223 pub fn get_topology(&self) -> String {
224 if self.program_source.is_empty() {
225 return r#"{"nodes":[],"edges":[]}"#.to_string();
226 }
227 match varpulis_parser::parse(&self.program_source) {
228 Ok(program) => {
229 let graph = program_to_graph(&program);
230 serde_json::to_string(&graph)
231 .unwrap_or_else(|_| r#"{"nodes":[],"edges":[]}"#.into())
232 }
233 Err(_) => r#"{"nodes":[],"edges":[]}"#.to_string(),
234 }
235 }
236
237 #[wasm_bindgen(js_name = "getStreams")]
239 pub fn get_streams(&self) -> String {
240 let streams: Vec<&str> = self.engine.stream_names();
241 serde_json::to_string(&streams).unwrap_or_else(|_| "[]".into())
242 }
243
244 #[wasm_bindgen(js_name = "setTrace")]
246 pub fn set_trace(&mut self, enabled: bool) {
247 self.engine.set_trace_enabled(enabled);
248 }
249
250 #[wasm_bindgen(js_name = "drainTrace")]
252 pub fn drain_trace(&mut self) -> String {
253 let entries = self.engine.drain_trace();
254 let json_entries: Vec<serde_json::Value> = entries
255 .into_iter()
256 .map(|e| serde_json::to_value(format!("{e:?}")).unwrap_or_default())
257 .collect();
258 serde_json::to_string(&json_entries).unwrap_or_else(|_| "[]".into())
259 }
260
261 pub fn validate(&self, vpl: &str) -> String {
263 match varpulis_parser::parse(vpl) {
264 Ok(_) => r#"{"ok":true}"#.to_string(),
265 Err(e) => {
266 let error = format!("{e}");
267 json(&serde_json::json!({"ok": false, "error": error}))
268 }
269 }
270 }
271
272 #[wasm_bindgen(js_name = "isLoaded")]
274 pub fn is_loaded(&self) -> bool {
275 self.program_loaded
276 }
277}
278
279fn json<T: Serialize>(value: &T) -> String {
284 serde_json::to_string(value)
285 .unwrap_or_else(|_| r#"{"ok":false,"error":"serialization failed"}"#.into())
286}
287
288fn parse_event(json_str: &str) -> Result<varpulis_core::Event, String> {
289 let value: serde_json::Value =
290 serde_json::from_str(json_str).map_err(|e| format!("Invalid JSON: {e}"))?;
291
292 let event_type = value
293 .get("event_type")
294 .and_then(|v| v.as_str())
295 .ok_or_else(|| "Missing 'event_type' field".to_string())?;
296
297 let mut event = varpulis_core::Event::new(event_type);
298
299 if let Some(obj) = value.as_object() {
300 for (key, val) in obj {
301 if key == "event_type" || key == "timestamp" {
302 continue;
303 }
304 if let Some(v) = json_value_to_core_value(val) {
305 event = event.with_field(key.as_str(), v);
306 }
307 }
308 }
309
310 Ok(event)
311}
312
313fn json_value_to_core_value(v: &serde_json::Value) -> Option<varpulis_core::Value> {
314 match v {
315 serde_json::Value::Null => Some(varpulis_core::Value::Null),
316 serde_json::Value::Bool(b) => Some(varpulis_core::Value::Bool(*b)),
317 serde_json::Value::Number(n) => n
318 .as_i64()
319 .map(varpulis_core::Value::Int)
320 .or_else(|| n.as_f64().map(varpulis_core::Value::Float)),
321 serde_json::Value::String(s) => Some(varpulis_core::Value::str(s)),
322 _ => None,
323 }
324}
325
326fn event_to_output(event: varpulis_core::Event) -> OutputEvent {
327 let mut fields = serde_json::Map::new();
328 for (key, value) in &event.data {
329 fields.insert(key.to_string(), core_value_to_json(value));
330 }
331 OutputEvent {
332 stream: event.event_type.to_string(),
333 event: serde_json::Value::Object(fields),
334 timestamp: event.timestamp.to_rfc3339(),
335 }
336}
337
338fn core_value_to_json(v: &varpulis_core::Value) -> serde_json::Value {
339 match v {
340 varpulis_core::Value::Null => serde_json::Value::Null,
341 varpulis_core::Value::Bool(b) => serde_json::Value::Bool(*b),
342 varpulis_core::Value::Int(i) => serde_json::json!(*i),
343 varpulis_core::Value::Float(f) => serde_json::json!(*f),
344 varpulis_core::Value::Str(s) => serde_json::Value::String(s.to_string()),
345 _ => serde_json::Value::String(format!("{v:?}")),
346 }
347}
348
349#[cfg(test)]
354mod tests {
355 use super::*;
356
357 #[test]
358 fn test_new_engine() {
359 let engine = WasmEngine::new();
360 assert!(!engine.is_loaded());
361 assert_eq!(engine.get_streams(), "[]");
362 }
363
364 #[test]
365 fn test_load_valid_vpl() {
366 let mut engine = WasmEngine::new();
367 let result = engine.load("event T:\n x: int\n\nstream S = T .where(x > 10) .emit(v: x)");
368 let json: serde_json::Value = serde_json::from_str(&result).unwrap();
369 assert_eq!(json["ok"], true);
370 assert!(json["streams"]
371 .as_array()
372 .unwrap()
373 .contains(&serde_json::json!("S")));
374 assert!(engine.is_loaded());
375 }
376
377 #[test]
378 fn test_load_invalid_vpl() {
379 let mut engine = WasmEngine::new();
380 let result = engine.load("this is not valid vpl {{{{");
381 let json: serde_json::Value = serde_json::from_str(&result).unwrap();
382 assert_eq!(json["ok"], false);
383 assert!(json["error"].as_str().unwrap().contains("Parse error"));
384 }
385
386 #[test]
387 fn test_process_event_matching() {
388 let mut engine = WasmEngine::new();
389 engine.load("event T:\n x: int\n\nstream S = T .where(x > 10) .emit(v: x)");
390 let result = engine.process_event(r#"{"event_type":"T","x":42}"#);
391 let json: serde_json::Value = serde_json::from_str(&result).unwrap();
392 assert_eq!(json["ok"], true);
393 let outputs = json["outputs"].as_array().unwrap();
394 assert_eq!(outputs.len(), 1);
395 assert_eq!(outputs[0]["stream"], "S");
396 assert_eq!(outputs[0]["event"]["v"], 42);
397 }
398
399 #[test]
400 fn test_process_event_filtered() {
401 let mut engine = WasmEngine::new();
402 engine.load("event T:\n x: int\n\nstream S = T .where(x > 10) .emit(v: x)");
403 let result = engine.process_event(r#"{"event_type":"T","x":5}"#);
404 let json: serde_json::Value = serde_json::from_str(&result).unwrap();
405 assert_eq!(json["ok"], true);
406 assert!(json["outputs"].as_array().unwrap().is_empty());
407 }
408
409 #[test]
410 fn test_process_batch() {
411 let mut engine = WasmEngine::new();
412 engine.load("event T:\n x: int\n\nstream S = T .where(x > 10) .emit(v: x)");
413 let result = engine.process_batch(
414 r#"[{"event_type":"T","x":5},{"event_type":"T","x":42},{"event_type":"T","x":100}]"#,
415 );
416 let json: serde_json::Value = serde_json::from_str(&result).unwrap();
417 assert_eq!(json["ok"], true);
418 assert_eq!(json["outputs"].as_array().unwrap().len(), 2);
419 }
420
421 #[test]
422 fn test_get_topology() {
423 let mut engine = WasmEngine::new();
424 engine.load("event T:\n x: int\n\nstream S = T .where(x > 10) .emit(v: x)");
425 let topo = engine.get_topology();
426 let json: serde_json::Value = serde_json::from_str(&topo).unwrap();
427 assert!(!json["nodes"].as_array().unwrap().is_empty());
428 assert!(!json["edges"].as_array().unwrap().is_empty());
429 }
430
431 #[test]
432 fn test_validate() {
433 let engine = WasmEngine::new();
434 let valid = engine.validate("event T:\n x: int");
435 let json: serde_json::Value = serde_json::from_str(&valid).unwrap();
436 assert_eq!(json["ok"], true);
437
438 let invalid = engine.validate("not valid {{");
439 let json: serde_json::Value = serde_json::from_str(&invalid).unwrap();
440 assert_eq!(json["ok"], false);
441 }
442}