Dataflow-rs
A high-performance rules engine for IFTTT-style automation in Rust with zero-overhead JSONLogic evaluation.
Dataflow-rs is a lightweight rules engine that lets you define IF → THEN → THAT automation in JSON. Rules are evaluated using pre-compiled JSONLogic for zero runtime overhead, and actions execute asynchronously for high throughput. Whether you're routing events, validating data, or building complex automation pipelines, Dataflow-rs gives you enterprise-grade performance with minimal complexity.
How It Works: IF → THEN → THAT
┌─────────────────────────────────────────────────────────────────┐
│ Rule (Workflow) │
│ │
│ IF condition matches → JSONLogic against any field │
│ THEN execute actions (tasks) → map, validate, custom logic │
│ THAT chain more rules → priority-ordered execution │
└─────────────────────────────────────────────────────────────────┘
Example: IF order.total > 1000 THEN apply_discount AND notify_manager
Core Concepts
| Rules Engine | Workflow Engine | Description |
|---|---|---|
| Rule | Workflow | A condition + actions bundle — IF condition THEN execute actions |
| Action | Task | An individual processing step (map, validate, or custom function) |
| RulesEngine | Engine | Evaluates rules against messages and executes matching actions |
Both naming conventions are fully supported — use whichever fits your mental model.
Getting Started
1. Add to Cargo.toml
[]
= "2.1"
= { = "1", = ["rt-multi-thread", "macros"] }
= "1.0"
2. Define Rules in JSON
3. Run the Engine
use ;
use Message;
use json;
use Arc;
async
Using Rules Engine Aliases
use ;
// These are type aliases — same types, rules-engine terminology
let rule = from_json?;
let engine = new;
Key Features
- IF → THEN → THAT Model: Define rules with JSONLogic conditions, execute actions, chain with priority ordering.
- Zero Runtime Compilation: All JSONLogic expressions pre-compiled at startup for optimal performance.
- Full Context Access: Conditions can access any field —
data,metadata,temp_data. - Async-First Architecture: Native async/await support with Tokio for high-throughput processing.
- Execution Tracing: Step-by-step debugging with message snapshots after each action.
- Built-in Functions: Parse, Map, Validate, Filter, Log, and Publish for complete data pipelines.
- Pipeline Control Flow: Filter/gate function to halt workflows or skip tasks based on conditions.
- Channel Routing: Route messages to specific workflow channels with O(1) lookup.
- Workflow Lifecycle: Manage workflow status (active/paused/archived), versioning, and tagging.
- Hot Reload: Swap workflows at runtime without re-registering custom functions.
- Extensible: Add custom async actions by implementing the
AsyncFunctionHandlertrait. - Typed Integration Configs: Pre-validated configs for HTTP, Enrich, and Kafka integrations.
- WebAssembly Support: Run rules in the browser with
@goplasmatic/dataflow-wasm. - React UI Components: Visualize and debug rules with
@goplasmatic/dataflow-ui. - Auditing: Full audit trail of all changes as data flows through the pipeline.
Architecture
Compilation Phase (Startup)
- All JSONLogic expressions compiled once when the Engine is created
- Compiled logic cached with Arc for zero-copy sharing
- Validates all expressions early, failing fast on errors
Execution Phase (Runtime)
- Engine evaluates each rule's condition against the message context
- Matching rules execute their actions with pre-compiled logic (zero compilation overhead)
process_message()for normal execution,process_message_with_trace()for debugging- Each action can be async, enabling I/O operations without blocking
Performance
- Pre-Compilation: All JSONLogic compiled at startup, zero runtime overhead
- Arc-Wrapped Logic: Zero-copy sharing of compiled expressions
- Context Arc Caching: 50% improvement via cached Arc context
- Async I/O: Non-blocking operations for external services
- Predictable Latency: No runtime allocations for logic evaluation
Custom Functions
Extend the engine with your own async actions:
use async_trait;
use ;
use DataLogic;
use json;
use HashMap;
use Arc;
;
// Register when creating the engine:
let mut custom_functions: = new;
custom_functions.insert;
let engine = new;
Built-in Functions
| Function | Purpose | Modifies Data |
|---|---|---|
parse_json |
Parse JSON from payload into data context | Yes |
parse_xml |
Parse XML string into JSON data structure | Yes |
map |
Data transformation using JSONLogic | Yes |
validation |
Rule-based data validation | No (read-only) |
filter |
Pipeline control flow — halt workflow or skip task | No |
log |
Structured logging with JSONLogic expressions | No |
publish_json |
Serialize data to JSON string | Yes |
publish_xml |
Serialize data to XML string | Yes |
Filter (Pipeline Control Flow)
The filter function evaluates a JSONLogic condition and controls pipeline execution:
on_reject: "halt"— stops the entire workflow when the condition is falseon_reject: "skip"— skips just the current task and continues
Log (Structured Logging)
The log function outputs structured log messages using the log crate:
Log levels: trace, debug, info, warn, error. Messages and fields support JSONLogic expressions.
Channel Routing
Route messages to specific workflow channels for efficient O(1) dispatch:
// Workflows define their channel
// { "id": "order_rule", "channel": "orders", "status": "active", ... }
// Process only workflows on a specific channel
engine.process_message_for_channel.await?;
Only active workflows are included in channel routing. Workflows default to the "default" channel.
Workflow Lifecycle
Workflows support lifecycle management fields:
| Field | Type | Default | Description |
|---|---|---|---|
channel |
string | "default" |
Channel for message routing |
version |
number | 1 |
Workflow version |
status |
string | "active" |
active, paused, or archived |
tags |
array | [] |
Arbitrary tags for organization |
created_at |
datetime | null |
Creation timestamp (ISO 8601) |
updated_at |
datetime | null |
Last update timestamp (ISO 8601) |
All fields are optional and backward-compatible with existing configurations.
Engine Hot Reload
Swap workflows at runtime without losing custom function registrations:
let new_workflows = vec!;
let new_engine = engine.with_new_workflows;
// Old engine remains valid for in-flight messages
Related Packages
| Package | Description |
|---|---|
| @goplasmatic/dataflow-wasm | WebAssembly bindings for browser execution |
| @goplasmatic/dataflow-ui | React components for rule visualization and debugging |
Contributing
We welcome contributions! Feel free to fork the repository, make your changes, and submit a pull request. Please make sure to add tests for any new features.
About Plasmatic
Dataflow-rs is developed by the team at Plasmatic. We're passionate about building open-source tools for data processing and automation.
License
This project is licensed under the Apache License, Version 2.0. See the LICENSE file for more details.