Skip to main content

Crate dataflow_rs

Crate dataflow_rs 

Source
Expand description

§Dataflow-rs

A lightweight rules engine for building IFTTT-style automation and data processing pipelines in Rust.

§Overview

Dataflow-rs provides a high-performance rules engine that follows the IF → THEN → THAT model:

  • IF — Define conditions using JSONLogic expressions (evaluated against data, metadata, temp_data)
  • THEN — Execute actions: data transformation, validation, or custom async logic
  • THAT — Chain multiple actions and rules with priority ordering

Rules are defined declaratively in JSON and compiled once at startup for zero-overhead evaluation at runtime.

§Key Components

Rules EngineWorkflow EngineDescription
RulesEngineEngineCentral async component that evaluates rules and executes actions
RuleWorkflowA condition + actions bundle — IF condition THEN execute actions
ActionTaskAn individual processing step that performs a function on a message
  • AsyncFunctionHandler: A trait implemented by action handlers to define custom async processing logic
  • Message: The data structure that flows through the engine, containing payload, metadata, and processing results

§Built-in Functions

The engine comes with several pre-registered functions:

  • map: Maps and transforms data between different parts of a message
  • validate: Validates message data against rules using JSONLogic expressions

§Usage Example

use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Define a workflow in JSON
    let workflow_json = r#"
    {
        "id": "data_processor",
        "name": "Data Processor",
        "priority": 0,
        "tasks": [
            {
                "id": "transform_data",
                "name": "Transform Data",
                "function": {
                    "name": "map",
                    "input": {
                        "mappings": [
                            {
                                "path": "data.result",
                                "logic": { "var": "temp_data.value" }
                            }
                        ]
                    }
                }
            }
        ]
    }
    "#;

    // Parse the workflow
    let workflow = Workflow::from_json(workflow_json)?;

    // Create the workflow engine with the workflow (built-in functions are auto-registered by default)
    let engine = Engine::new(vec![workflow], None);

    // Create a message to process
    let mut message = Message::from_value(&json!({}));

    // Process the message through the workflow
    match engine.process_message(&mut message).await {
        Ok(_) => {
            println!("Processed result: {}", message.context["data"]["result"]);
        }
        Err(e) => {
            println!("Error in workflow: {:?}", e);
        }
    }

    Ok(())
}

§Error Handling

The library provides a comprehensive error handling system:

use dataflow_rs::{Engine, Result, DataflowError};
use dataflow_rs::engine::message::Message;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<()> {
    // ... setup workflows ...
    let engine = Engine::new(vec![/* workflows */], None);

    let mut message = Message::from_value(&json!({}));

    // Process the message, errors will be collected but not halt execution
    engine.process_message(&mut message).await?;

    // Check if there were any errors during processing
    if message.has_errors() {
        for error in &message.errors {
            println!("Error in workflow: {:?}, task: {:?}: {:?}",
                     error.workflow_id, error.task_id, error.message);
        }
    }

    Ok(())
}

§Extending with Custom Functions

You can extend the engine with your own custom function handlers:

use dataflow_rs::{Engine, AsyncFunctionHandler, Result, Workflow};
use dataflow_rs::engine::{FunctionConfig, message::{Change, Message}, error::DataflowError};
use datalogic_rs::DataLogic;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;

struct CustomFunction;

#[async_trait]
impl AsyncFunctionHandler for CustomFunction {
    async fn execute(
        &self,
        message: &mut Message,
        config: &FunctionConfig,
        datalogic: Arc<DataLogic>,
    ) -> Result<(usize, Vec<Change>)> {
        // Implement your custom logic here

        // Extract the custom configuration from config
        let input = match config {
            FunctionConfig::Custom { input, .. } => input,
            _ => return Err(DataflowError::Validation("Invalid configuration type".to_string())),
        };

        // Validate input
        let required_field = input.get("field")
            .ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
            .as_str()
            .ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;

        // Record changes for audit trail
        let changes = vec![
            Change {
                path: Arc::from("data.custom_field"),
                old_value: Arc::new(Value::Null),
                new_value: Arc::new(json!("custom value")),
            }
        ];

        // Return success code (200) and changes
        Ok((200, changes))
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Create custom functions
    let mut custom_functions = HashMap::new();
    custom_functions.insert(
        "custom".to_string(),
        Box::new(CustomFunction) as Box<dyn AsyncFunctionHandler + Send + Sync>
    );

    // Create engine with workflows and custom functions
    let engine = Engine::new(vec![/* workflows */], Some(custom_functions));

    // Now it can be used in workflows...
    Ok(())
}

Re-exports§

pub use engine::error::DataflowError;
pub use engine::error::ErrorInfo;
pub use engine::error::Result;
pub use engine::functions::AsyncFunctionHandler;
pub use engine::functions::EnrichConfig;
pub use engine::functions::FilterConfig;
pub use engine::functions::FunctionConfig;
pub use engine::functions::HttpCallConfig;
pub use engine::functions::LogConfig;
pub use engine::functions::MapConfig;
pub use engine::functions::MapMapping;
pub use engine::functions::PublishKafkaConfig;
pub use engine::functions::ValidationConfig;
pub use engine::functions::ValidationRule;
pub use engine::message::AuditTrail;
pub use engine::message::Change;
pub use engine::message::Message;
pub use engine::trace::ExecutionStep;
pub use engine::trace::ExecutionTrace;
pub use engine::trace::StepResult;
pub use engine::Engine;
pub use engine::Task;
pub use engine::Workflow;
pub use engine::WorkflowStatus;

Modules§

engine
Engine Module

Type Aliases§

Action
Type alias for Task — an Action is an individual processing step within a rule.
Rule
Type alias for Workflow — a Rule represents an IF-THEN unit: IF condition THEN execute actions.
RulesEngine
Type alias for Engine — the RulesEngine evaluates rules and executes their actions.