Skip to main content

dataflow_wasm/
lib.rs

1//! WebAssembly bindings for dataflow-rs workflow engine.
2//!
3//! This crate provides WASM bindings that allow using dataflow-rs from JavaScript/TypeScript.
4//!
5//! # Usage
6//!
7//! ```javascript
8//! import init, { WasmEngine } from 'dataflow-wasm';
9//!
10//! await init();
11//!
12//! // Define workflows
13//! const workflows = JSON.stringify([{
14//!     id: "example",
15//!     name: "Example Workflow",
16//!     priority: 1,
17//!     tasks: [{
18//!         id: "task1",
19//!         name: "Transform Data",
20//!         function: {
21//!             name: "map",
22//!             input: {
23//!                 mappings: [{
24//!                     path: "data.result",
25//!                     logic: { "var": "payload.input" }
26//!                 }]
27//!             }
28//!         }
29//!     }]
30//! }]);
31//!
32//! // Create engine
33//! const engine = new WasmEngine(workflows);
34//!
35//! // Process a payload directly
36//! const payload = JSON.stringify({ input: "hello" });
37//! const result = await engine.process(payload);
38//! console.log(JSON.parse(result));
39//! ```
40
41use dataflow_rs::{Engine, Message, Workflow};
42use serde_json::{Value, json};
43use std::sync::Arc;
44use wasm_bindgen::prelude::*;
45use wasm_bindgen_futures::future_to_promise;
46
47/// Initialize the WASM module.
48///
49/// This is automatically called when the module loads.
50/// Sets up the panic hook for better error messages in the browser console.
51#[wasm_bindgen(start)]
52pub fn init() {
53    console_error_panic_hook::set_once();
54}
55
56/// Create a message JSON string from data and metadata.
57///
58/// # Arguments
59/// * `data` - JSON string containing the message data (goes to context.data)
60/// * `metadata` - JSON string containing the message metadata (goes to context.metadata)
61///
62/// # Returns
63/// JSON string representing the complete message, or an error message
64///
65/// # Example
66/// ```javascript
67/// const message = create_message('{"name": "John"}', '{"type": "user"}');
68/// const result = await engine.process(message);
69/// ```
70#[wasm_bindgen]
71pub fn create_message(data: &str, metadata: &str) -> Result<String, String> {
72    let data_value: Value =
73        serde_json::from_str(data).map_err(|e| format!("Invalid data JSON: {}", e))?;
74    let metadata_value: Value =
75        serde_json::from_str(metadata).map_err(|e| format!("Invalid metadata JSON: {}", e))?;
76
77    // Create a message with empty payload, then set context data and metadata
78    let mut message = Message::from_value(&json!({}));
79    message.context["data"] = data_value;
80    message.context["metadata"] = metadata_value;
81
82    serde_json::to_string(&message).map_err(|e| e.to_string())
83}
84
85/// A WebAssembly-compatible workflow engine.
86///
87/// Wraps the dataflow-rs Engine to provide async message processing
88/// that returns JavaScript Promises.
89#[wasm_bindgen]
90pub struct WasmEngine {
91    inner: Arc<Engine>,
92}
93
94#[wasm_bindgen]
95impl WasmEngine {
96    /// Create a new WasmEngine from a JSON array of workflow definitions.
97    ///
98    /// # Arguments
99    /// * `workflows_json` - JSON string containing an array of workflow definitions
100    ///
101    /// # Example
102    /// ```javascript
103    /// const workflows = JSON.stringify([{
104    ///     id: "workflow1",
105    ///     name: "My Workflow",
106    ///     priority: 1,
107    ///     tasks: [...]
108    /// }]);
109    /// const engine = new WasmEngine(workflows);
110    /// ```
111    #[wasm_bindgen(constructor)]
112    pub fn new(workflows_json: &str) -> Result<WasmEngine, String> {
113        let workflows_value: Value = serde_json::from_str(workflows_json)
114            .map_err(|e| format!("Invalid workflows JSON: {}", e))?;
115
116        let workflows_array = workflows_value
117            .as_array()
118            .ok_or_else(|| "Workflows must be a JSON array".to_string())?;
119
120        let mut workflows = Vec::with_capacity(workflows_array.len());
121        for (i, workflow_value) in workflows_array.iter().enumerate() {
122            let workflow_str = serde_json::to_string(workflow_value).map_err(|e| e.to_string())?;
123            let workflow = Workflow::from_json(&workflow_str)
124                .map_err(|e| format!("Invalid workflow at index {}: {}", i, e))?;
125            workflows.push(workflow);
126        }
127
128        let engine = Engine::new(workflows, None);
129        Ok(WasmEngine {
130            inner: Arc::new(engine),
131        })
132    }
133
134    /// Process a payload through the engine's workflows.
135    ///
136    /// This is an async operation that returns a Promise.
137    ///
138    /// # Arguments
139    /// * `payload_json` - JSON string of the payload to process
140    ///
141    /// # Returns
142    /// A Promise that resolves to the processed message as a JSON string
143    ///
144    /// # Example
145    /// ```javascript
146    /// const payload = JSON.stringify({ name: "John", email: "john@example.com" });
147    /// const result = await engine.process(payload);
148    /// const processed = JSON.parse(result);
149    /// console.log(processed.context.data);
150    /// ```
151    #[wasm_bindgen]
152    pub fn process(&self, payload_json: &str) -> js_sys::Promise {
153        let payload_result: Result<Value, _> = serde_json::from_str(payload_json);
154
155        match payload_result {
156            Ok(payload) => {
157                // Create message from payload using Message::from_value
158                let mut message = Message::from_value(&payload);
159
160                // Clone the Arc for the async block
161                let engine = Arc::clone(&self.inner);
162
163                future_to_promise(async move {
164                    match engine.process_message(&mut message).await {
165                        Ok(()) => serde_json::to_string(&message)
166                            .map(|s| JsValue::from_str(&s))
167                            .map_err(|e| JsValue::from_str(&e.to_string())),
168                        Err(e) => Err(JsValue::from_str(&e.to_string())),
169                    }
170                })
171            }
172            Err(e) => {
173                let error_msg = format!("Invalid payload JSON: {}", e);
174                future_to_promise(async move { Err(JsValue::from_str(&error_msg)) })
175            }
176        }
177    }
178
179    /// Process a payload with step-by-step execution tracing.
180    ///
181    /// This is an async operation that returns a Promise with the execution trace.
182    /// The trace contains message snapshots after each step, including which
183    /// workflows/tasks were executed or skipped.
184    ///
185    /// # Arguments
186    /// * `payload_json` - JSON string of the payload to process
187    ///
188    /// # Returns
189    /// A Promise that resolves to the execution trace as a JSON string
190    ///
191    /// # Example
192    /// ```javascript
193    /// const payload = JSON.stringify({ name: "John", email: "john@example.com" });
194    /// const trace = await engine.process_with_trace(payload);
195    /// const traceData = JSON.parse(trace);
196    /// console.log(traceData.steps); // Array of execution steps
197    /// ```
198    #[wasm_bindgen]
199    pub fn process_with_trace(&self, payload_json: &str) -> js_sys::Promise {
200        let payload_result: Result<Value, _> = serde_json::from_str(payload_json);
201
202        match payload_result {
203            Ok(payload) => {
204                // Create message from payload using Message::from_value
205                let mut message = Message::from_value(&payload);
206
207                // Clone the Arc for the async block
208                let engine = Arc::clone(&self.inner);
209
210                future_to_promise(async move {
211                    match engine.process_message_with_trace(&mut message).await {
212                        Ok(trace) => serde_json::to_string(&trace)
213                            .map(|s| JsValue::from_str(&s))
214                            .map_err(|e| JsValue::from_str(&e.to_string())),
215                        Err(e) => Err(JsValue::from_str(&e.to_string())),
216                    }
217                })
218            }
219            Err(e) => {
220                let error_msg = format!("Invalid payload JSON: {}", e);
221                future_to_promise(async move { Err(JsValue::from_str(&error_msg)) })
222            }
223        }
224    }
225
226    /// Get the number of workflows registered in the engine.
227    #[wasm_bindgen]
228    pub fn workflow_count(&self) -> usize {
229        self.inner.workflows().len()
230    }
231
232    /// Get the list of workflow IDs.
233    ///
234    /// # Returns
235    /// JSON array of workflow IDs as a string
236    #[wasm_bindgen]
237    pub fn workflow_ids(&self) -> String {
238        let ids: Vec<&String> = self.inner.workflows().keys().collect();
239        serde_json::to_string(&ids).unwrap_or_else(|_| "[]".to_string())
240    }
241}
242
243/// Process a payload through a one-off engine (convenience function).
244///
245/// Creates an engine with the given workflows and processes a single payload.
246/// Use WasmEngine class for better performance when processing multiple payloads.
247///
248/// # Arguments
249/// * `workflows_json` - JSON string containing an array of workflow definitions
250/// * `payload_json` - JSON string of the payload to process
251///
252/// # Returns
253/// A Promise that resolves to the processed message as a JSON string
254///
255/// # Example
256/// ```javascript
257/// const payload = JSON.stringify({ name: "John", email: "john@example.com" });
258/// const result = await process_message(workflowsJson, payload);
259/// console.log(JSON.parse(result));
260/// ```
261#[wasm_bindgen]
262pub fn process_message(workflows_json: &str, payload_json: &str) -> js_sys::Promise {
263    let engine_result = WasmEngine::new(workflows_json);
264    match engine_result {
265        Ok(engine) => engine.process(payload_json),
266        Err(e) => future_to_promise(async move { Err(JsValue::from_str(&e)) }),
267    }
268}