Expand description
§Dataflow-rs
A lightweight, rule-driven workflow engine for building powerful data processing pipelines and nanoservices in Rust.
§Overview
Dataflow-rs provides a flexible and extensible framework for processing data through a series of tasks organized in workflows. The engine automatically routes messages through appropriate workflows based on configurable rules, and each workflow can contain multiple tasks that transform, validate, or enrich the data.
§Key Components
- Engine: The central component that processes messages through workflows
- Workflow: A collection of tasks with conditions that determine when they should be applied
- Task: An individual processing unit that performs a specific function on a message
- FunctionHandler: A trait implemented by task handlers to define custom processing logic
- Message: The data structure that flows through the engine, containing payload, metadata, and processing results
§Built-in Functions
The engine comes with several pre-registered functions:
- http: Fetches data from external HTTP APIs
- map: Maps and transforms data between different parts of a message
- validate: Validates message data against rules using JSONLogic expressions
§Usage Example
use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Define a workflow in JSON
let workflow_json = r#"
{
"id": "data_processor",
"name": "Data Processor",
"priority": 0,
"tasks": [
{
"id": "fetch_data",
"name": "Fetch Data",
"function": {
"name": "http",
"input": { "url": "https://api.example.com/data" }
}
},
{
"id": "transform_data",
"name": "Transform Data",
"function": {
"name": "map",
"input": {
"mappings": [
{
"path": "data.result",
"logic": { "var": "temp_data.body.value" }
}
]
}
}
}
]
}
"#;
// Parse the workflow
let workflow = Workflow::from_json(workflow_json)?;
// Create the workflow engine with the workflow (built-in functions are auto-registered by default)
let engine = Engine::new(vec![workflow], None, None, None, None);
// Create a message to process
let mut message = Message::new(&json!({}));
// Process the message through the workflow
match engine.process_message(&mut message) {
Ok(_) => {
println!("Processed result: {}", message.data["result"]);
}
Err(e) => {
println!("Error in workflow: {:?}", e);
}
}
Ok(())
}
§Error Handling
The library provides a comprehensive error handling system:
use dataflow_rs::{Engine, Result, DataflowError};
use dataflow_rs::engine::message::Message;
use serde_json::json;
fn main() -> Result<()> {
// ... setup workflows ...
let engine = Engine::new(vec![/* workflows */], None, None, None, None);
let mut message = Message::new(&json!({}));
// Process the message, errors will be collected but not halt execution
engine.process_message(&mut message)?;
// Check if there were any errors during processing
if message.has_errors() {
for error in &message.errors {
println!("Error in workflow: {:?}, task: {:?}: {:?}",
error.workflow_id, error.task_id, error.error_message);
}
}
Ok(())
}
§Extending with Custom Functions
You can extend the engine with your own custom function handlers:
use dataflow_rs::{Engine, FunctionHandler, Result, Workflow};
use dataflow_rs::engine::{FunctionConfig, message::{Change, Message}, error::DataflowError};
use datalogic_rs::DataLogic;
use serde_json::{json, Value};
use std::collections::HashMap;
struct CustomFunction;
impl FunctionHandler for CustomFunction {
fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
datalogic: &DataLogic,
) -> Result<(usize, Vec<Change>)> {
// Implement your custom logic here
// Extract the custom configuration from config
let input = match config {
FunctionConfig::Custom { input, .. } => input,
_ => return Err(DataflowError::Validation("Invalid configuration type".to_string())),
};
// Validate input
let required_field = input.get("field")
.ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
.as_str()
.ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;
// Record changes for audit trail
let changes = vec![
Change {
path: "data.custom_field".to_string(),
old_value: Value::Null,
new_value: json!("custom value"),
}
];
// Return success code (200) and changes
Ok((200, changes))
}
}
fn main() -> Result<()> {
// Create custom functions
let mut custom_functions = HashMap::new();
custom_functions.insert(
"custom".to_string(),
Box::new(CustomFunction) as Box<dyn FunctionHandler + Send + Sync>
);
// Create engine with workflows and custom functions
let engine = Engine::new(vec![/* workflows */], Some(custom_functions), None, None, None);
// Now it can be used in workflows...
Ok(())
}
Re-exports§
pub use engine::RetryConfig;
pub use engine::error::DataflowError;
pub use engine::error::ErrorInfo;
pub use engine::error::Result;
pub use engine::functions::MapConfig;
pub use engine::functions::MapMapping;
pub use engine::functions::ValidationConfig;
pub use engine::functions::ValidationRule;
pub use engine::message::AuditTrail;
pub use engine::message::Change;
pub use engine::message::Message;
pub use engine::Engine;
pub use engine::FunctionConfig;
pub use engine::FunctionHandler;
pub use engine::Task;
pub use engine::Workflow;
Modules§
- engine
- Engine Module