Skip to main content

dataflow_rs/
lib.rs

1/*!
2# Dataflow-rs
3
4A lightweight rules engine for building IFTTT-style automation and data processing pipelines in Rust.
5
6## Overview
7
8Dataflow-rs provides a high-performance rules engine that follows the **IF → THEN → THAT** model:
9
10- **IF** — Define conditions using JSONLogic expressions (evaluated against `data`, `metadata`, `temp_data`)
11- **THEN** — Execute actions: data transformation, validation, or custom async logic
12- **THAT** — Chain multiple actions and rules with priority ordering
13
14Rules are defined declaratively in JSON and compiled once at startup for zero-overhead evaluation at runtime.
15
16## Key Components
17
18| Rules Engine | Workflow Engine | Description |
19|---|---|---|
20| **RulesEngine** | **Engine** | Central async component that evaluates rules and executes actions |
21| **Rule** | **Workflow** | A condition + actions bundle — IF condition THEN execute actions |
22| **Action** | **Task** | An individual processing step that performs a function on a message |
23
24* **AsyncFunctionHandler**: A trait implemented by action handlers to define custom async processing logic
25* **TaskContext**: Per-call context handed to handlers — typed data accessors, audit-trail-aware setters
26* **TaskOutcome**: Return value of a handler — `Success`, `Status(code)`, `Skip`, or `Halt`
27* **Message**: The data structure that flows through the engine, containing payload, metadata, and processing results
28
29## Built-in Functions
30
31The engine ships with the following pre-registered functions, available to
32any workflow without further setup:
33
34| Category | Function | Purpose |
35|---|---|---|
36| **Parse** | `parse_json` | Deserialize a JSON payload string into `data` |
37| **Parse** | `parse_xml` | Deserialize an XML payload string into `data` |
38| **Transform** | `map` | Assign JSONLogic-derived values to dot-paths within the message |
39| **Validate** | `validation` | Apply JSONLogic rules with custom error messages |
40| **Routing** | `filter` | Skip or halt processing based on a JSONLogic predicate |
41| **Routing** | `log` | Emit a log entry at a configurable level |
42| **Publish** | `publish_json` | Render `data` back out as a JSON payload |
43| **Publish** | `publish_xml` | Render `data` back out as an XML payload |
44
45In addition, dataflow-rs provides **typed config schemas** for three common
46service-layer integrations — `http_call`, `enrich`, and `publish_kafka`.
47These are *not* pre-registered: register an `AsyncFunctionHandler` under the
48matching name and the engine handles config validation and JSONLogic
49pre-compilation for you. See [`HttpCallConfig`], [`EnrichConfig`], and
50[`PublishKafkaConfig`].
51
52Custom functions are registered through `Engine::builder().register(...)`;
53see the **Extending with Custom Functions** section below.
54
55## Usage Example
56
57```rust,no_run
58use dataflow_rs::{Engine, Workflow};
59use dataflow_rs::engine::message::Message;
60use serde_json::json;
61
62#[tokio::main]
63async fn main() -> Result<(), Box<dyn std::error::Error>> {
64    // Define a workflow in JSON
65    let workflow_json = r#"
66    {
67        "id": "data_processor",
68        "name": "Data Processor",
69        "priority": 0,
70        "tasks": [
71            {
72                "id": "transform_data",
73                "name": "Transform Data",
74                "function": {
75                    "name": "map",
76                    "input": {
77                        "mappings": [
78                            {
79                                "path": "data.result",
80                                "logic": { "var": "temp_data.value" }
81                            }
82                        ]
83                    }
84                }
85            }
86        ]
87    }
88    "#;
89
90    // Parse the workflow
91    let workflow = Workflow::from_json(workflow_json)?;
92
93    // Create the workflow engine — builder is the recommended path; built-in
94    // functions are auto-registered.
95    let engine = Engine::builder().with_workflow(workflow).build()?;
96
97    // Create a message to process
98    let mut message = Message::from_value(&json!({}));
99
100    // Process the message through the workflow
101    match engine.process_message(&mut message).await {
102        Ok(_) => {
103            println!("Processed result: {}", message.context["data"]["result"]);
104        }
105        Err(e) => {
106            println!("Error in workflow: {:?}", e);
107        }
108    }
109
110    Ok(())
111}
112```
113
114## Error Handling
115
116The library provides a comprehensive error handling system:
117
118```rust,no_run
119use dataflow_rs::{Engine, Result, DataflowError};
120use dataflow_rs::engine::message::Message;
121use serde_json::json;
122
123#[tokio::main]
124async fn main() -> Result<()> {
125    // ... setup workflows ...
126    let engine = Engine::builder().build()?;
127
128    let mut message = Message::from_value(&json!({}));
129
130    // Process the message, errors will be collected but not halt execution
131    engine.process_message(&mut message).await?;
132
133    // Check if there were any errors during processing
134    if message.has_errors() {
135        for error in message.errors() {
136            println!("Error in workflow: {:?}, task: {:?}: {:?}",
137                     error.workflow_id, error.task_id, error.message);
138        }
139    }
140
141    Ok(())
142}
143```
144
145## Extending with Custom Functions
146
147Implement `AsyncFunctionHandler` with a typed `Input` so the engine deserializes
148your config once at startup; handlers then receive typed input and a
149`TaskContext` that records audit-trail changes automatically.
150
151```rust,no_run
152use dataflow_rs::{
153    AsyncFunctionHandler, Engine, Result, TaskContext, TaskOutcome, Workflow,
154};
155use datavalue::OwnedDataValue;
156use serde::Deserialize;
157use serde_json::json;
158use async_trait::async_trait;
159
160#[derive(Deserialize)]
161struct StatsInput {
162    /// Path inside `data` whose array of numbers to summarize.
163    source: String,
164    /// Path inside `data` to write the result to.
165    target: String,
166}
167
168struct Statistics;
169
170#[async_trait]
171impl AsyncFunctionHandler for Statistics {
172    type Input = StatsInput;
173
174    async fn execute(
175        &self,
176        ctx: &mut TaskContext<'_>,
177        input: &StatsInput,
178    ) -> Result<TaskOutcome> {
179        let count = ctx.data()
180            .get(input.source.as_str())
181            .and_then(|v| v.as_array())
182            .map(|arr| arr.len())
183            .unwrap_or(0);
184
185        ctx.set(
186            &format!("data.{}", input.target),
187            OwnedDataValue::from(&json!({ "count": count })),
188        );
189        Ok(TaskOutcome::Success)
190    }
191}
192
193#[tokio::main]
194async fn main() -> Result<()> {
195    let engine = Engine::builder()
196        .register("statistics", Statistics)
197        // .with_workflow(workflow)
198        .build()?;
199    // ...
200    Ok(())
201}
202```
203
204## Ecosystem
205
206Dataflow-rs is part of a small family of crates that share the same workflow
207and JSONLogic shape:
208
209| Crate | Purpose |
210|---|---|
211| [`dataflow-rs`](https://crates.io/crates/dataflow-rs) | This crate — async workflow engine in Rust |
212| [`@goplasmatic/dataflow-wasm`](https://www.npmjs.com/package/@goplasmatic/dataflow-wasm) | WebAssembly bindings — run workflows in the browser or Node |
213| [`@goplasmatic/dataflow-ui`](https://www.npmjs.com/package/@goplasmatic/dataflow-ui) | React components for visualizing and debugging workflows |
214| [`datalogic-rs`](https://crates.io/crates/datalogic-rs) | The JSONLogic compiler/evaluator used internally |
215
216Source for all four lives under <https://github.com/GoPlasmatic>.
217*/
218
219pub mod engine;
220pub mod prelude;
221
222// Re-export all public APIs for easier access
223pub use engine::error::{DataflowError, ErrorInfo, Result};
224pub use engine::functions::{
225    AsyncFunctionHandler, BoxedFunctionHandler, EnrichConfig, FilterConfig, FunctionConfig,
226    HttpCallConfig, LogConfig, MapConfig, MapMapping, PublishKafkaConfig, ValidationConfig,
227    ValidationRule,
228};
229pub use engine::message::{AuditTrail, Change, Message, MessageBuilder};
230pub use engine::task_context::TaskContext;
231pub use engine::task_outcome::TaskOutcome;
232pub use engine::trace::{ExecutionStep, ExecutionTrace, StepResult};
233pub use engine::{Engine, EngineBuilder, Task, Workflow, WorkflowStatus};
234
235/// Type alias for `Workflow` — a Rule represents an IF-THEN unit: IF condition THEN execute actions.
236pub type Rule = Workflow;
237
238/// Type alias for `Task` — an Action is an individual processing step within a rule.
239pub type Action = Task;
240
241/// Type alias for `Engine` — the RulesEngine evaluates rules and executes their actions.
242pub type RulesEngine = Engine;