Expand description
§Dataflow-rs
A lightweight rules engine for building IFTTT-style automation and data processing pipelines in Rust.
§Overview
Dataflow-rs provides a high-performance rules engine that follows the IF → THEN → THAT model:
- IF — Define conditions using JSONLogic expressions (evaluated against
data,metadata,temp_data) - THEN — Execute actions: data transformation, validation, or custom async logic
- THAT — Chain multiple actions and rules with priority ordering
Rules are defined declaratively in JSON and compiled once at startup for zero-overhead evaluation at runtime.
§Key Components
| Rules Engine | Workflow Engine | Description |
|---|---|---|
| RulesEngine | Engine | Central async component that evaluates rules and executes actions |
| Rule | Workflow | A condition + actions bundle — IF condition THEN execute actions |
| Action | Task | An individual processing step that performs a function on a message |
- AsyncFunctionHandler: A trait implemented by action handlers to define custom async processing logic
- TaskContext: Per-call context handed to handlers — typed data accessors, audit-trail-aware setters
- TaskOutcome: Return value of a handler —
Success,Status(code),Skip, orHalt - Message: The data structure that flows through the engine, containing payload, metadata, and processing results
§Built-in Functions
The engine ships with the following pre-registered functions, available to any workflow without further setup:
| Category | Function | Purpose |
|---|---|---|
| Parse | parse_json | Deserialize a JSON payload string into data |
| Parse | parse_xml | Deserialize an XML payload string into data |
| Transform | map | Assign JSONLogic-derived values to dot-paths within the message |
| Validate | validation | Apply JSONLogic rules with custom error messages |
| Routing | filter | Skip or halt processing based on a JSONLogic predicate |
| Routing | log | Emit a log entry at a configurable level |
| Publish | publish_json | Render data back out as a JSON payload |
| Publish | publish_xml | Render data back out as an XML payload |
In addition, dataflow-rs provides typed config schemas for three common
service-layer integrations — http_call, enrich, and publish_kafka.
These are not pre-registered: register an AsyncFunctionHandler under the
matching name and the engine handles config validation and JSONLogic
pre-compilation for you. See HttpCallConfig, EnrichConfig, and
PublishKafkaConfig.
Custom functions are registered through Engine::builder().register(...);
see the Extending with Custom Functions section below.
§Usage Example
use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Define a workflow in JSON
let workflow_json = r#"
{
"id": "data_processor",
"name": "Data Processor",
"priority": 0,
"tasks": [
{
"id": "transform_data",
"name": "Transform Data",
"function": {
"name": "map",
"input": {
"mappings": [
{
"path": "data.result",
"logic": { "var": "temp_data.value" }
}
]
}
}
}
]
}
"#;
// Parse the workflow
let workflow = Workflow::from_json(workflow_json)?;
// Create the workflow engine — builder is the recommended path; built-in
// functions are auto-registered.
let engine = Engine::builder().with_workflow(workflow).build()?;
// Create a message to process
let mut message = Message::from_value(&json!({}));
// Process the message through the workflow
match engine.process_message(&mut message).await {
Ok(_) => {
println!("Processed result: {}", message.context["data"]["result"]);
}
Err(e) => {
println!("Error in workflow: {:?}", e);
}
}
Ok(())
}§Error Handling
The library provides a comprehensive error handling system:
use dataflow_rs::{Engine, Result, DataflowError};
use dataflow_rs::engine::message::Message;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<()> {
// ... setup workflows ...
let engine = Engine::builder().build()?;
let mut message = Message::from_value(&json!({}));
// Process the message, errors will be collected but not halt execution
engine.process_message(&mut message).await?;
// Check if there were any errors during processing
if message.has_errors() {
for error in message.errors() {
println!("Error in workflow: {:?}, task: {:?}: {:?}",
error.workflow_id, error.task_id, error.message);
}
}
Ok(())
}§Extending with Custom Functions
Implement AsyncFunctionHandler with a typed Input so the engine deserializes
your config once at startup; handlers then receive typed input and a
TaskContext that records audit-trail changes automatically.
use dataflow_rs::{
AsyncFunctionHandler, Engine, Result, TaskContext, TaskOutcome, Workflow,
};
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::json;
use async_trait::async_trait;
#[derive(Deserialize)]
struct StatsInput {
/// Path inside `data` whose array of numbers to summarize.
source: String,
/// Path inside `data` to write the result to.
target: String,
}
struct Statistics;
#[async_trait]
impl AsyncFunctionHandler for Statistics {
type Input = StatsInput;
async fn execute(
&self,
ctx: &mut TaskContext<'_>,
input: &StatsInput,
) -> Result<TaskOutcome> {
let count = ctx.data()
.get(input.source.as_str())
.and_then(|v| v.as_array())
.map(|arr| arr.len())
.unwrap_or(0);
ctx.set(
&format!("data.{}", input.target),
OwnedDataValue::from(&json!({ "count": count })),
);
Ok(TaskOutcome::Success)
}
}
#[tokio::main]
async fn main() -> Result<()> {
let engine = Engine::builder()
.register("statistics", Statistics)
// .with_workflow(workflow)
.build()?;
// ...
Ok(())
}§Ecosystem
Dataflow-rs is part of a small family of crates that share the same workflow and JSONLogic shape:
| Crate | Purpose |
|---|---|
dataflow-rs | This crate — async workflow engine in Rust |
@goplasmatic/dataflow-wasm | WebAssembly bindings — run workflows in the browser or Node |
@goplasmatic/dataflow-ui | React components for visualizing and debugging workflows |
datalogic-rs | The JSONLogic compiler/evaluator used internally |
Source for all four lives under https://github.com/GoPlasmatic.
Re-exports§
pub use engine::error::DataflowError;pub use engine::error::ErrorInfo;pub use engine::error::Result;pub use engine::functions::AsyncFunctionHandler;pub use engine::functions::BoxedFunctionHandler;pub use engine::functions::EnrichConfig;pub use engine::functions::FilterConfig;pub use engine::functions::FunctionConfig;pub use engine::functions::HttpCallConfig;pub use engine::functions::LogConfig;pub use engine::functions::MapConfig;pub use engine::functions::MapMapping;pub use engine::functions::PublishKafkaConfig;pub use engine::functions::ValidationConfig;pub use engine::functions::ValidationRule;pub use engine::message::AuditTrail;pub use engine::message::Change;pub use engine::message::Message;pub use engine::message::MessageBuilder;pub use engine::task_context::TaskContext;pub use engine::task_outcome::TaskOutcome;pub use engine::trace::ExecutionStep;pub use engine::trace::ExecutionTrace;pub use engine::trace::StepResult;pub use engine::Engine;pub use engine::EngineBuilder;pub use engine::Task;pub use engine::Workflow;pub use engine::WorkflowStatus;
Modules§
Type Aliases§
- Action
- Type alias for
Task— an Action is an individual processing step within a rule. - Rule
- Type alias for
Workflow— a Rule represents an IF-THEN unit: IF condition THEN execute actions. - Rules
Engine - Type alias for
Engine— the RulesEngine evaluates rules and executes their actions.