Dataflow-rs
Dataflow-rs is a lightweight, rule-driven async workflow engine designed for building powerful data processing pipelines and nanoservices in Rust. Extend it with your custom async tasks to create robust, maintainable services with proper concurrency and performance.
โจ Features
- ๐ Async-First Design: Built from the ground up with Tokio for high-performance async processing
- ๐ Rule-Based Workflow Selection: Dynamically select workflows using JSONLogic expressions
- โ๏ธ Task Orchestration: Compose sequences of async tasks for complex data processing
- ๐ Message Transformation: Seamlessly modify message data via specialized async tasks
- โ Comprehensive Error Handling: Detailed error types and recovery mechanisms
- ๐ Retry Capabilities: Configurable retry policies for transient failures
- ๐ Audit Trails: Automatically record changes for debugging and monitoring
- ๐ Pluggable Architecture: Easily extend the framework by registering custom async tasks
- ๐งต Thread-Safety: Properly handles concurrent execution with thread-safe patterns
- ๐ฏ Custom Functions: Implement domain-specific async functions with full engine integration
๐ Table of Contents
- Overview
- Installation
- Quick Start
- Async Architecture
- Built-in Functions
- Custom Functions
- Advanced Examples
- Error Handling
- Performance & Benchmarking
- Contributing
- License
๐ Overview
Dataflow-rs empowers developers to build scalable async nanoservices and data pipelines with ease. Its core design focuses on asynchronous processing, flexibility, extensibility, and resilience, allowing you to integrate your custom business logic into robust workflows with proper error handling and performance optimization.
Key Components
- ๐ Engine: The central async component that processes messages through workflows
- ๐ Workflow: A collection of tasks with conditions that determine when they should be applied (Note: workflow conditions can only access metadata fields)
- โ๏ธ Task: An individual async processing unit that performs a specific function on a message
- ๐ง AsyncFunctionHandler: A trait implemented by task handlers to define custom async processing logic
- ๐จ Message: The data structure that flows through the engine, containing payload, metadata, and processing results
๐ฆ Installation
To incorporate Dataflow-rs into your project, add the following to your Cargo.toml
:
[]
= "0.1.6"
= { = "1.0", = ["full"] }
= "1.0"
๐ Quick Start
Below is a simple example demonstrating how to set up an async workflow that processes data:
use ;
use Message;
use json;
async
๐๏ธ Async Architecture
Dataflow-rs is built with async-first principles using Tokio:
Sequential Workflow Processing
Workflows are processed sequentially to ensure that later workflows can depend on the results of earlier workflows:
// The engine processes messages asynchronously
engine.process_message.await?;
// Each workflow's condition is evaluated just before execution
// using the current message state, allowing workflows to depend
// on results from previous workflows
// Multiple messages can still be processed concurrently
let futures: = messages.into_iter
.map
.collect;
let results = join_all.await;
Workflow Dependencies
Since workflows are executed sequentially and conditions are evaluated just before execution, you can create workflows that depend on each other. However, workflow conditions can only access metadata fields, not data fields:
In this example, the first workflow sets metadata flags that the second workflow's condition can evaluate.
Async Task Execution
Within each workflow, tasks are executed sequentially but asynchronously with proper error handling and audit trails maintained throughout the async execution chain.
๐ ๏ธ Built-in Functions
The engine comes with several pre-registered async functions:
๐ก HTTP Function
Fetches data from external HTTP APIs asynchronously:
๐๏ธ Map Function
Maps and transforms data between different parts of a message using JSONLogic with support for both object and array notation:
The Map function supports array notation in paths - when numeric indices like 0
, 1
, 2
are encountered, arrays are automatically created.
โ Validate Function
Validates message data against rules using JSONLogic expressions. Unlike workflow conditions, validation rules can access all message fields (data
, metadata
, temp_data
):
๐ง Custom Functions
One of the most powerful features of dataflow-rs is the ability to implement custom async functions that integrate seamlessly with the workflow engine.
๐ Basic Structure
To create a custom async function, implement the AsyncFunctionHandler
trait:
use ;
use async_trait;
use Value;
;
๐ Key Components
Input Parameters
message
: Mutable reference to the message being processedinput
: JSON configuration from the workflow definition
Return Value
Result<(usize, Vec<Change>)>
: Status code and list of changes made to the message
Message Structure
๐ Example: Statistics Function
Here's a comprehensive example of a custom function that calculates statistical measures:
use ;
use async_trait;
use ;
;
๐ข Example: Data Enrichment Function
Here's an example that demonstrates async external data lookup:
๐ Registering Custom Functions
async
โ Best Practices for Custom Functions
1. Async Operations
// โ
Good: Non-blocking async operation
let response = get.await?;
// โ
Good: Simulated async delay
sleep.await;
2. Error Handling
let required_field = input.get
.ok_or_else?;
3. Change Tracking
let changes = vec!;
Ok
๐ Advanced Examples
๐ Concurrent Message Processing
While workflows within a single message are processed sequentially, you can still process multiple messages concurrently:
use ;
async
๐ง Custom Function Handler with State
use Arc;
use Mutex;
โ Error Handling
Dataflow-rs provides comprehensive async error handling with dedicated error types:
use ;
use Message;
use json;
async
๐ Retry Configuration
Configure retry behavior for transient failures:
use ;
async
๐ Performance & Benchmarking
Running Benchmarks
To test the async performance of the workflow engine:
This benchmark demonstrates:
- โ Async vs sync performance comparison
- โ Proper async function execution timing
- โ Realistic workflow processing scenarios
- โ Statistical analysis of processing times
Running Custom Function Examples
To see custom async functions in action:
Example output:
๐ Advanced Features
๐๏ธ Engine Variants
// Full engine with all built-in functions
let engine = new;
// Empty engine for custom functions only
let engine = new_empty;
// Engine with specific functions
let mut engine = new_empty;
engine.register_task_function;
๐ง Workflow Conditions
Use JSONLogic for dynamic workflow selection. Important: Workflow conditions can only access metadata
fields:
To make data available for workflow conditions, set metadata fields in earlier workflows:
๐ข Extending the Framework
Dataflow-rs is highly extensible for building nanoservices:
- โ
Implement custom async tasks by creating structs that implement
AsyncFunctionHandler
- โ
Create your own error types by extending from the base
DataflowError
- โ Build nanoservices by integrating multiple async workflows
- โ Leverage the built-in HTTP, validation, and mapping functions
- โ Integrate with external databases, APIs, and services asynchronously
๐ Documentation
For detailed API documentation and additional examples:
๐ค Contributing
We welcome contributions! Check out our CONTRIBUTING.md for guidelines on how to help improve Dataflow-rs.
๐ License
This project is licensed under the Apache License 2.0. See the LICENSE file for details.
Built with โค๏ธ for the Rust async ecosystem