Expand description
§Dataflow-rs
A lightweight rules engine for building IFTTT-style automation and data processing pipelines in Rust.
§Overview
Dataflow-rs provides a high-performance rules engine that follows the IF → THEN → THAT model:
- IF — Define conditions using JSONLogic expressions (evaluated against
data,metadata,temp_data) - THEN — Execute actions: data transformation, validation, or custom async logic
- THAT — Chain multiple actions and rules with priority ordering
Rules are defined declaratively in JSON and compiled once at startup for zero-overhead evaluation at runtime.
§Key Components
| Rules Engine | Workflow Engine | Description |
|---|---|---|
| RulesEngine | Engine | Central async component that evaluates rules and executes actions |
| Rule | Workflow | A condition + actions bundle — IF condition THEN execute actions |
| Action | Task | An individual processing step that performs a function on a message |
- AsyncFunctionHandler: A trait implemented by action handlers to define custom async processing logic
- Message: The data structure that flows through the engine, containing payload, metadata, and processing results
§Built-in Functions
The engine comes with several pre-registered functions:
- map: Maps and transforms data between different parts of a message
- validate: Validates message data against rules using JSONLogic expressions
§Usage Example
use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Define a workflow in JSON
let workflow_json = r#"
{
"id": "data_processor",
"name": "Data Processor",
"priority": 0,
"tasks": [
{
"id": "transform_data",
"name": "Transform Data",
"function": {
"name": "map",
"input": {
"mappings": [
{
"path": "data.result",
"logic": { "var": "temp_data.value" }
}
]
}
}
}
]
}
"#;
// Parse the workflow
let workflow = Workflow::from_json(workflow_json)?;
// Create the workflow engine with the workflow (built-in functions are auto-registered by default)
let engine = Engine::new(vec![workflow], None);
// Create a message to process
let mut message = Message::from_value(&json!({}));
// Process the message through the workflow
match engine.process_message(&mut message).await {
Ok(_) => {
println!("Processed result: {}", message.context["data"]["result"]);
}
Err(e) => {
println!("Error in workflow: {:?}", e);
}
}
Ok(())
}§Error Handling
The library provides a comprehensive error handling system:
use dataflow_rs::{Engine, Result, DataflowError};
use dataflow_rs::engine::message::Message;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<()> {
// ... setup workflows ...
let engine = Engine::new(vec![/* workflows */], None);
let mut message = Message::from_value(&json!({}));
// Process the message, errors will be collected but not halt execution
engine.process_message(&mut message).await?;
// Check if there were any errors during processing
if message.has_errors() {
for error in &message.errors {
println!("Error in workflow: {:?}, task: {:?}: {:?}",
error.workflow_id, error.task_id, error.message);
}
}
Ok(())
}§Extending with Custom Functions
You can extend the engine with your own custom function handlers:
use dataflow_rs::{Engine, AsyncFunctionHandler, Result, Workflow};
use dataflow_rs::engine::{FunctionConfig, message::{Change, Message}, error::DataflowError};
use datalogic_rs::DataLogic;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
struct CustomFunction;
#[async_trait]
impl AsyncFunctionHandler for CustomFunction {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
// Implement your custom logic here
// Extract the custom configuration from config
let input = match config {
FunctionConfig::Custom { input, .. } => input,
_ => return Err(DataflowError::Validation("Invalid configuration type".to_string())),
};
// Validate input
let required_field = input.get("field")
.ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
.as_str()
.ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;
// Record changes for audit trail
let changes = vec![
Change {
path: Arc::from("data.custom_field"),
old_value: Arc::new(Value::Null),
new_value: Arc::new(json!("custom value")),
}
];
// Return success code (200) and changes
Ok((200, changes))
}
}
#[tokio::main]
async fn main() -> Result<()> {
// Create custom functions
let mut custom_functions = HashMap::new();
custom_functions.insert(
"custom".to_string(),
Box::new(CustomFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>
);
// Create engine with workflows and custom functions
let engine = Engine::new(vec![/* workflows */], Some(custom_functions));
// Now it can be used in workflows...
Ok(())
}Re-exports§
pub use engine::error::DataflowError;pub use engine::error::ErrorInfo;pub use engine::error::Result;pub use engine::functions::AsyncFunctionHandler;pub use engine::functions::EnrichConfig;pub use engine::functions::FilterConfig;pub use engine::functions::FunctionConfig;pub use engine::functions::HttpCallConfig;pub use engine::functions::LogConfig;pub use engine::functions::MapConfig;pub use engine::functions::MapMapping;pub use engine::functions::PublishKafkaConfig;pub use engine::functions::ValidationConfig;pub use engine::functions::ValidationRule;pub use engine::message::AuditTrail;pub use engine::message::Change;pub use engine::message::Message;pub use engine::trace::ExecutionStep;pub use engine::trace::ExecutionTrace;pub use engine::trace::StepResult;pub use engine::Engine;pub use engine::Task;pub use engine::Workflow;pub use engine::WorkflowStatus;
Modules§
- engine
- Engine Module
Type Aliases§
- Action
- Type alias for
Task— an Action is an individual processing step within a rule. - Rule
- Type alias for
Workflow— a Rule represents an IF-THEN unit: IF condition THEN execute actions. - Rules
Engine - Type alias for
Engine— the RulesEngine evaluates rules and executes their actions.