Skip to main content

varpulis_engine_wasm/
lib.rs

1//! # Varpulis Engine WASM
2//!
3//! Full VPL engine compiled to WebAssembly.  Parse VPL programs, process events,
4//! and receive output events — all in the browser or any JS/TS runtime.
5//!
6//! ## Usage (JavaScript/TypeScript)
7//!
8//! ```javascript
9//! import { WasmEngine } from '@varpulis/engine';
10//!
11//! const engine = new WasmEngine();
12//! engine.load(`
13//!   event Sensor:
14//!     temperature: float
15//!     zone: str
16//!
17//!   stream Hot = Sensor
18//!     .where(temperature > 100)
19//!     .emit(zone: zone, temp: temperature)
20//! `);
21//!
22//! const outputs = engine.processEvent('{"event_type":"Sensor","temperature":150,"zone":"A"}');
23//! // outputs: [{"stream":"Hot","event":{"zone":"A","temp":150},"timestamp":"..."}]
24//! ```
25
26use serde::Serialize;
27use varpulis_runtime::engine::graph::program_to_graph;
28use varpulis_runtime::Engine;
29use wasm_bindgen::prelude::*;
30
31/// WASM-accessible VPL engine.
32///
33/// Create with `new WasmEngine()`, load VPL via `load()`, process events via
34/// `processEvent()` or `processBatch()`, and receive output events as JSON.
35#[wasm_bindgen]
36pub struct WasmEngine {
37    engine: Engine,
38    program_source: String,
39    program_loaded: bool,
40}
41
42impl std::fmt::Debug for WasmEngine {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("WasmEngine")
45            .field("program_loaded", &self.program_loaded)
46            .finish()
47    }
48}
49
50impl Default for WasmEngine {
51    fn default() -> Self {
52        Self {
53            engine: Engine::new_sync(),
54            program_source: String::new(),
55            program_loaded: false,
56        }
57    }
58}
59
60/// JSON result for load/reload operations.
61#[derive(Serialize)]
62struct LoadResult {
63    ok: bool,
64    streams: Vec<String>,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    error: Option<String>,
67}
68
69/// JSON result for event processing.
70#[derive(Serialize)]
71struct OutputEvent {
72    stream: String,
73    event: serde_json::Value,
74    timestamp: String,
75}
76
77/// JSON result for processing.
78#[derive(Serialize)]
79struct ProcessResult {
80    ok: bool,
81    outputs: Vec<OutputEvent>,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    error: Option<String>,
84}
85
86#[wasm_bindgen]
87impl WasmEngine {
88    /// Create a new engine instance.
89    #[wasm_bindgen(constructor)]
90    pub fn new() -> Self {
91        Self::default()
92    }
93
94    /// Load a VPL program.  Returns JSON: `{ ok, streams, error? }`.
95    ///
96    /// Replaces any previously loaded program.  Call this before processing events.
97    pub fn load(&mut self, vpl: &str) -> String {
98        let program = match varpulis_parser::parse(vpl) {
99            Ok(p) => p,
100            Err(e) => {
101                return json(&LoadResult {
102                    ok: false,
103                    streams: vec![],
104                    error: Some(format!("Parse error: {e}")),
105                });
106            }
107        };
108
109        match self.engine.load(&program) {
110            Ok(()) => {
111                self.program_source = vpl.to_string();
112                self.program_loaded = true;
113                let streams = self
114                    .engine
115                    .stream_names()
116                    .into_iter()
117                    .map(String::from)
118                    .collect();
119                json(&LoadResult {
120                    ok: true,
121                    streams,
122                    error: None,
123                })
124            }
125            Err(e) => json(&LoadResult {
126                ok: false,
127                streams: vec![],
128                error: Some(format!("Load error: {e}")),
129            }),
130        }
131    }
132
133    /// Process a single event (JSON object).  Returns JSON: `{ ok, outputs, error? }`.
134    ///
135    /// The event JSON must include an `event_type` field matching a declared event type.
136    /// Additional fields are the event data.
137    ///
138    /// ```json
139    /// {"event_type": "Sensor", "temperature": 150, "zone": "A"}
140    /// ```
141    #[wasm_bindgen(js_name = "processEvent")]
142    pub fn process_event(&mut self, event_json: &str) -> String {
143        let event = match parse_event(event_json) {
144            Ok(e) => e,
145            Err(msg) => {
146                return json(&ProcessResult {
147                    ok: false,
148                    outputs: vec![],
149                    error: Some(msg),
150                });
151            }
152        };
153
154        match self.engine.process_batch_sync_collect(vec![event]) {
155            Ok(outputs) => {
156                let output_events = outputs.into_iter().map(event_to_output).collect();
157                json(&ProcessResult {
158                    ok: true,
159                    outputs: output_events,
160                    error: None,
161                })
162            }
163            Err(e) => json(&ProcessResult {
164                ok: false,
165                outputs: vec![],
166                error: Some(format!("Processing error: {e}")),
167            }),
168        }
169    }
170
171    /// Process a batch of events (JSON array of objects).  Returns JSON: `{ ok, outputs, error? }`.
172    #[wasm_bindgen(js_name = "processBatch")]
173    pub fn process_batch(&mut self, events_json: &str) -> String {
174        let events: Vec<serde_json::Value> = match serde_json::from_str(events_json) {
175            Ok(v) => v,
176            Err(e) => {
177                return json(&ProcessResult {
178                    ok: false,
179                    outputs: vec![],
180                    error: Some(format!("Invalid JSON array: {e}")),
181                });
182            }
183        };
184
185        let parsed: Result<Vec<_>, _> = events
186            .iter()
187            .map(|v| {
188                let s = serde_json::to_string(v).unwrap_or_default();
189                parse_event(&s)
190            })
191            .collect();
192
193        let events = match parsed {
194            Ok(e) => e,
195            Err(msg) => {
196                return json(&ProcessResult {
197                    ok: false,
198                    outputs: vec![],
199                    error: Some(msg),
200                });
201            }
202        };
203
204        match self.engine.process_batch_sync_collect(events) {
205            Ok(outputs) => {
206                let output_events = outputs.into_iter().map(event_to_output).collect();
207                json(&ProcessResult {
208                    ok: true,
209                    outputs: output_events,
210                    error: None,
211                })
212            }
213            Err(e) => json(&ProcessResult {
214                ok: false,
215                outputs: vec![],
216                error: Some(format!("Processing error: {e}")),
217            }),
218        }
219    }
220
221    /// Get the pipeline topology as a JSON graph.
222    #[wasm_bindgen(js_name = "getTopology")]
223    pub fn get_topology(&self) -> String {
224        if self.program_source.is_empty() {
225            return r#"{"nodes":[],"edges":[]}"#.to_string();
226        }
227        match varpulis_parser::parse(&self.program_source) {
228            Ok(program) => {
229                let graph = program_to_graph(&program);
230                serde_json::to_string(&graph)
231                    .unwrap_or_else(|_| r#"{"nodes":[],"edges":[]}"#.into())
232            }
233            Err(_) => r#"{"nodes":[],"edges":[]}"#.to_string(),
234        }
235    }
236
237    /// Get loaded stream names as a JSON array.
238    #[wasm_bindgen(js_name = "getStreams")]
239    pub fn get_streams(&self) -> String {
240        let streams: Vec<&str> = self.engine.stream_names();
241        serde_json::to_string(&streams).unwrap_or_else(|_| "[]".into())
242    }
243
244    /// Enable or disable trace mode.
245    #[wasm_bindgen(js_name = "setTrace")]
246    pub fn set_trace(&mut self, enabled: bool) {
247        self.engine.set_trace_enabled(enabled);
248    }
249
250    /// Drain trace entries as a JSON array.
251    #[wasm_bindgen(js_name = "drainTrace")]
252    pub fn drain_trace(&mut self) -> String {
253        let entries = self.engine.drain_trace();
254        let json_entries: Vec<serde_json::Value> = entries
255            .into_iter()
256            .map(|e| serde_json::to_value(format!("{e:?}")).unwrap_or_default())
257            .collect();
258        serde_json::to_string(&json_entries).unwrap_or_else(|_| "[]".into())
259    }
260
261    /// Validate VPL syntax without loading.  Returns JSON: `{ ok, error? }`.
262    pub fn validate(&self, vpl: &str) -> String {
263        match varpulis_parser::parse(vpl) {
264            Ok(_) => r#"{"ok":true}"#.to_string(),
265            Err(e) => {
266                let error = format!("{e}");
267                json(&serde_json::json!({"ok": false, "error": error}))
268            }
269        }
270    }
271
272    /// Check if a program is loaded.
273    #[wasm_bindgen(js_name = "isLoaded")]
274    pub fn is_loaded(&self) -> bool {
275        self.program_loaded
276    }
277}
278
279// =============================================================================
280// Helpers
281// =============================================================================
282
283fn json<T: Serialize>(value: &T) -> String {
284    serde_json::to_string(value)
285        .unwrap_or_else(|_| r#"{"ok":false,"error":"serialization failed"}"#.into())
286}
287
288fn parse_event(json_str: &str) -> Result<varpulis_core::Event, String> {
289    let value: serde_json::Value =
290        serde_json::from_str(json_str).map_err(|e| format!("Invalid JSON: {e}"))?;
291
292    let event_type = value
293        .get("event_type")
294        .and_then(|v| v.as_str())
295        .ok_or_else(|| "Missing 'event_type' field".to_string())?;
296
297    let mut event = varpulis_core::Event::new(event_type);
298
299    if let Some(obj) = value.as_object() {
300        for (key, val) in obj {
301            if key == "event_type" || key == "timestamp" {
302                continue;
303            }
304            if let Some(v) = json_value_to_core_value(val) {
305                event = event.with_field(key.as_str(), v);
306            }
307        }
308    }
309
310    Ok(event)
311}
312
313fn json_value_to_core_value(v: &serde_json::Value) -> Option<varpulis_core::Value> {
314    match v {
315        serde_json::Value::Null => Some(varpulis_core::Value::Null),
316        serde_json::Value::Bool(b) => Some(varpulis_core::Value::Bool(*b)),
317        serde_json::Value::Number(n) => n
318            .as_i64()
319            .map(varpulis_core::Value::Int)
320            .or_else(|| n.as_f64().map(varpulis_core::Value::Float)),
321        serde_json::Value::String(s) => Some(varpulis_core::Value::str(s)),
322        _ => None,
323    }
324}
325
326fn event_to_output(event: varpulis_core::Event) -> OutputEvent {
327    let mut fields = serde_json::Map::new();
328    for (key, value) in &event.data {
329        fields.insert(key.to_string(), core_value_to_json(value));
330    }
331    OutputEvent {
332        stream: event.event_type.to_string(),
333        event: serde_json::Value::Object(fields),
334        timestamp: event.timestamp.to_rfc3339(),
335    }
336}
337
338fn core_value_to_json(v: &varpulis_core::Value) -> serde_json::Value {
339    match v {
340        varpulis_core::Value::Null => serde_json::Value::Null,
341        varpulis_core::Value::Bool(b) => serde_json::Value::Bool(*b),
342        varpulis_core::Value::Int(i) => serde_json::json!(*i),
343        varpulis_core::Value::Float(f) => serde_json::json!(*f),
344        varpulis_core::Value::Str(s) => serde_json::Value::String(s.to_string()),
345        _ => serde_json::Value::String(format!("{v:?}")),
346    }
347}
348
349// =============================================================================
350// Tests (run natively, not in WASM)
351// =============================================================================
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    #[test]
358    fn test_new_engine() {
359        let engine = WasmEngine::new();
360        assert!(!engine.is_loaded());
361        assert_eq!(engine.get_streams(), "[]");
362    }
363
364    #[test]
365    fn test_load_valid_vpl() {
366        let mut engine = WasmEngine::new();
367        let result = engine.load("event T:\n    x: int\n\nstream S = T .where(x > 10) .emit(v: x)");
368        let json: serde_json::Value = serde_json::from_str(&result).unwrap();
369        assert_eq!(json["ok"], true);
370        assert!(json["streams"]
371            .as_array()
372            .unwrap()
373            .contains(&serde_json::json!("S")));
374        assert!(engine.is_loaded());
375    }
376
377    #[test]
378    fn test_load_invalid_vpl() {
379        let mut engine = WasmEngine::new();
380        let result = engine.load("this is not valid vpl {{{{");
381        let json: serde_json::Value = serde_json::from_str(&result).unwrap();
382        assert_eq!(json["ok"], false);
383        assert!(json["error"].as_str().unwrap().contains("Parse error"));
384    }
385
386    #[test]
387    fn test_process_event_matching() {
388        let mut engine = WasmEngine::new();
389        engine.load("event T:\n    x: int\n\nstream S = T .where(x > 10) .emit(v: x)");
390        let result = engine.process_event(r#"{"event_type":"T","x":42}"#);
391        let json: serde_json::Value = serde_json::from_str(&result).unwrap();
392        assert_eq!(json["ok"], true);
393        let outputs = json["outputs"].as_array().unwrap();
394        assert_eq!(outputs.len(), 1);
395        assert_eq!(outputs[0]["stream"], "S");
396        assert_eq!(outputs[0]["event"]["v"], 42);
397    }
398
399    #[test]
400    fn test_process_event_filtered() {
401        let mut engine = WasmEngine::new();
402        engine.load("event T:\n    x: int\n\nstream S = T .where(x > 10) .emit(v: x)");
403        let result = engine.process_event(r#"{"event_type":"T","x":5}"#);
404        let json: serde_json::Value = serde_json::from_str(&result).unwrap();
405        assert_eq!(json["ok"], true);
406        assert!(json["outputs"].as_array().unwrap().is_empty());
407    }
408
409    #[test]
410    fn test_process_batch() {
411        let mut engine = WasmEngine::new();
412        engine.load("event T:\n    x: int\n\nstream S = T .where(x > 10) .emit(v: x)");
413        let result = engine.process_batch(
414            r#"[{"event_type":"T","x":5},{"event_type":"T","x":42},{"event_type":"T","x":100}]"#,
415        );
416        let json: serde_json::Value = serde_json::from_str(&result).unwrap();
417        assert_eq!(json["ok"], true);
418        assert_eq!(json["outputs"].as_array().unwrap().len(), 2);
419    }
420
421    #[test]
422    fn test_get_topology() {
423        let mut engine = WasmEngine::new();
424        engine.load("event T:\n    x: int\n\nstream S = T .where(x > 10) .emit(v: x)");
425        let topo = engine.get_topology();
426        let json: serde_json::Value = serde_json::from_str(&topo).unwrap();
427        assert!(!json["nodes"].as_array().unwrap().is_empty());
428        assert!(!json["edges"].as_array().unwrap().is_empty());
429    }
430
431    #[test]
432    fn test_validate() {
433        let engine = WasmEngine::new();
434        let valid = engine.validate("event T:\n    x: int");
435        let json: serde_json::Value = serde_json::from_str(&valid).unwrap();
436        assert_eq!(json["ok"], true);
437
438        let invalid = engine.validate("not valid {{");
439        let json: serde_json::Value = serde_json::from_str(&invalid).unwrap();
440        assert_eq!(json["ok"], false);
441    }
442}