Crate dataflow_rs

Source
Expand description

§Dataflow-rs

A lightweight, rule-driven workflow engine for building powerful data processing pipelines and nanoservices in Rust.

§Overview

Dataflow-rs provides a flexible and extensible framework for processing data through a series of tasks organized in workflows. The engine automatically routes messages through appropriate workflows based on configurable rules, and each workflow can contain multiple tasks that transform, validate, or enrich the data.

§Key Components

  • Engine: The central component that processes messages through workflows
  • Workflow: A collection of tasks with conditions that determine when they should be applied
  • Task: An individual processing unit that performs a specific function on a message
  • AsyncFunctionHandler: A trait implemented by task handlers to define custom async processing logic
  • Message: The data structure that flows through the engine, containing payload, metadata, and processing results

§Built-in Functions

The engine comes with several pre-registered functions:

  • http: Fetches data from external HTTP APIs
  • map: Maps and transforms data between different parts of a message
  • validate: Validates message data against rules using JSONLogic expressions

§Async Support

The engine fully supports asynchronous operation with Tokio, providing improved scalability and performance for IO-bound operations like HTTP requests:

use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create the async workflow engine
    let mut engine = Engine::new();

    // Define and add a workflow
    let workflow_json = r#"{
        "id": "data_processor",
        "name": "Data Processor",
        "tasks": [
            {
                "id": "fetch_data",
                "name": "Fetch Data",
                "function": {
                    "name": "http",
                    "input": { "url": "https://api.example.com/data" }
                }
            }
        ]
    }"#;

    let workflow = Workflow::from_json(workflow_json)?;
    engine.add_workflow(&workflow);

    // Create and process a message
    let mut message = Message::new(&json!({}));

    // Process the message asynchronously
    engine.process_message(&mut message).await?;

    println!("Processed result: {}", message.data["result"]);
    Ok(())
}

§Usage Example

use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create the workflow engine (built-in functions are auto-registered)
    let mut engine = Engine::new();

    // Define a workflow in JSON
    let workflow_json = r#"
    {
        "id": "data_processor",
        "name": "Data Processor",
        "tasks": [
            {
                "id": "fetch_data",
                "name": "Fetch Data",
                "function": {
                    "name": "http",
                    "input": { "url": "https://api.example.com/data" }
                }
            },
            {
                "id": "transform_data",
                "name": "Transform Data",
                "function": {
                    "name": "map",
                    "input": {
                        "mappings": [
                            {
                                "path": "data.result",
                                "logic": { "var": "temp_data.body.value" }
                            }
                        ]
                    }
                }
            }
        ]
    }
    "#;

    // Parse and add the workflow to the engine
    let workflow = Workflow::from_json(workflow_json)?;
    engine.add_workflow(&workflow);

    // Create a message to process
    let mut message = Message::new(&json!({}));

    // Process the message through the workflow
    match engine.process_message(&mut message).await {
        Ok(_) => {
            println!("Processed result: {}", message.data["result"]);
        }
        Err(e) => {
            println!("Error in workflow: {:?}", e);
        }
    }

    Ok(())
}

§Error Handling

The library provides a comprehensive error handling system:

use dataflow_rs::{Engine, Result, DataflowError};
use dataflow_rs::engine::message::Message;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<()> {
    let mut engine = Engine::new();
    // ... setup workflows ...

    let mut message = Message::new(&json!({}));

    // Process the message, errors will be collected but not halt execution
    engine.process_message(&mut message).await?;

    // Check if there were any errors during processing
    if message.has_errors() {
        for error in &message.errors {
            println!("Error in workflow: {:?}, task: {:?}: {:?}",
                     error.workflow_id, error.task_id, error.error);
        }
    }

    Ok(())
}

§Extending with Custom Functions

You can extend the engine with your own custom function handlers:

use dataflow_rs::{Engine, AsyncFunctionHandler, Result, Workflow};
use dataflow_rs::engine::message::{Change, Message};
use dataflow_rs::engine::error::DataflowError;
use serde_json::{json, Value};
use async_trait::async_trait;

struct CustomFunction;

#[async_trait]
impl AsyncFunctionHandler for CustomFunction {
    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
        // Implement your custom logic here

        // Validate input
        let required_field = input.get("field")
            .ok_or_else(|| DataflowError::Validation("Missing required field".to_string()))?
            .as_str()
            .ok_or_else(|| DataflowError::Validation("Field must be a string".to_string()))?;

        // Record changes for audit trail
        let changes = vec![
            Change {
                path: "data.custom_field".to_string(),
                old_value: Value::Null,
                new_value: json!("custom value"),
            }
        ];

        // Return success code (200) and changes
        Ok((200, changes))
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    let mut engine = Engine::new();

    // Register your custom function
    engine.register_task_function("custom".to_string(), Box::new(CustomFunction));

    // Now it can be used in workflows...
    Ok(())
}

Re-exports§

pub use engine::error::DataflowError;
pub use engine::error::ErrorInfo;
pub use engine::error::Result;
pub use engine::RetryConfig;
pub use engine::AsyncFunctionHandler;
pub use engine::Engine;
pub use engine::Task;
pub use engine::Workflow;

Modules§

engine
Engine Module

Attribute Macros§

async_trait