dataflow-rs 1.0.7

A lightweight, rule-driven workflow engine for building powerful data processing pipelines and nanoservices in Rust. Extend it with your custom tasks to create robust, maintainable services.
Documentation

Dataflow-rs

A high-performance workflow engine for building data processing pipelines in Rust with zero-overhead JSONLogic evaluation.

License: Apache 2.0 Rust Crates.io


Dataflow-rs is a Rust library for creating high-performance data processing pipelines with pre-compiled JSONLogic and zero runtime overhead. It features a modular architecture that separates compilation from execution, ensuring predictable low-latency performance. With built-in multi-threading support through ThreadedEngine, it provides excellent vertical scaling capabilities. Whether you're building REST APIs, processing Kafka streams, or creating sophisticated data transformation pipelines, Dataflow-rs provides enterprise-grade performance with minimal complexity.

🚀 Key Features

  • Zero Runtime Compilation: All JSONLogic expressions pre-compiled at startup for optimal performance.
  • Multi-Threading Support: Built-in ThreadedEngine with configurable thread pools for vertical scaling.
  • Modular Architecture: Clear separation between compilation (LogicCompiler) and execution (InternalExecutor).
  • Direct DataLogic Instantiation: Each engine has its own DataLogic instance for zero contention.
  • Immutable Workflows: Workflows compiled once at initialization for predictable performance.
  • Dynamic Workflows: Use JSONLogic to control workflow execution based on your data.
  • Extensible: Easily add your own custom processing steps (tasks) to the engine.
  • Built-in Functions: Comes with thread-safe implementations of data mapping and validation.
  • Resilient: Built-in error handling and retry mechanisms to handle transient failures.
  • Auditing: Keep track of all the changes that happen to your data as it moves through the pipeline.

🏁 Getting Started

Here's a quick example to get you up and running.

1. Add to Cargo.toml

[dependencies]
dataflow-rs = "1.0.7"
serde_json = "1.0"

2. Create a Workflow

Workflows are defined in JSON and consist of a series of tasks.

{
    "id": "data_processor",
    "name": "Data Processor",
    "tasks": [
        {
            "id": "transform_data",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {
                            "path": "data.user_name",
                            "logic": { "var": "temp_data.name" }
                        },
                        {
                            "path": "data.user_email",
                            "logic": { "var": "temp_data.email" }
                        }
                    ]
                }
            }
        }
    ]
}

3. Run the Engine

use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Define workflows
    let workflow_json = r#"{ ... }"#; // Your workflow JSON from above
    let workflow = Workflow::from_json(workflow_json)?;
    
    // Create engine with workflows (immutable after creation)
    let mut engine = Engine::new(
        vec![workflow],  // Workflows to compile and cache
        None,           // Custom functions (optional)
        None,           // Retry config (optional)
    );

    // Process a single message
    let mut message = Message::new(&json!({}));
    engine.process_message(&mut message)?;

    println!("✅ Processed result: {}", serde_json::to_string_pretty(&message.data)?);

    Ok(())
}

4. Multi-Threading with ThreadedEngine

For high-performance multi-threaded processing, use the built-in ThreadedEngine:

use dataflow_rs::{ThreadedEngine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;
use std::sync::Arc;
use std::thread;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Define your workflows
    let workflow_json = r#"{ ... }"#; // Your workflow JSON
    let workflow = Workflow::from_json(workflow_json)?;
    
    // Create ThreadedEngine with 4 worker threads
    let engine = Arc::new(ThreadedEngine::new(
        vec![workflow],  // Workflows
        None,           // Custom functions (optional)
        None,           // Retry config (optional)
        4,              // Number of worker threads
    ));

    // Process messages concurrently from multiple client threads
    let mut handles = Vec::new();
    
    for i in 0..1000 {
        let engine = Arc::clone(&engine);
        let handle = thread::spawn(move || {
            let message = Message::new(&json!({"id": i}));
            engine.process_message_sync(message)
        });
        handles.push(handle);
    }

    // Wait for all messages to complete
    for handle in handles {
        handle.join().unwrap()?;
    }

    println!("✅ Processed 1000 messages with ThreadedEngine!");
    Ok(())
}

The ThreadedEngine provides:

  • Configurable thread pool for vertical scaling
  • Work queue distribution for efficient load balancing
  • Graceful shutdown support
  • Both sync and async APIs for flexible integration
  • Health monitoring and worker restart capabilities

✨ Core Concepts

  • Engine: High-performance engine with pre-compiled logic and immutable workflows.
  • ThreadedEngine: Multi-threaded variant with configurable thread pool for vertical scaling.
  • LogicCompiler: Compiles all JSONLogic expressions at initialization for zero runtime overhead.
  • InternalExecutor: Executes built-in functions using pre-compiled logic from the cache.
  • Workflow: A sequence of tasks executed in order, with conditions accessing only metadata.
  • Task: A single processing step with optional JSONLogic conditions.
  • Message: The data structure flowing through workflows with audit trail support.

🔧 Choosing Between Engine and ThreadedEngine

Use Engine when:

  • Processing messages sequentially in a single thread
  • Embedding in async runtimes (tokio, async-std)
  • Maximum performance for individual message processing
  • Simple integration without thread management

Use ThreadedEngine when:

  • Need to process multiple messages concurrently
  • Want vertical scaling on multi-core systems
  • Have CPU-bound workloads that benefit from parallelism
  • Need a built-in thread pool with work distribution

🏗️ Architecture

The v3.0 architecture focuses on simplicity and performance through clear separation of concerns:

Compilation Phase (Startup)

  1. LogicCompiler compiles all JSONLogic expressions from workflows and tasks
  2. Creates an indexed cache of compiled logic for O(1) runtime access
  3. Validates all logic expressions early, failing fast on errors
  4. Stores compiled logic in contiguous memory for cache efficiency

Execution Phase (Runtime)

  1. Engine orchestrates message processing through immutable workflows
  2. InternalExecutor evaluates conditions and executes built-in functions
  3. Uses compiled logic from cache - zero compilation overhead at runtime
  4. Direct DataLogic instantiation eliminates any locking or contention

Key Design Decisions

  • Immutable Workflows: All workflows defined at engine creation, cannot be modified
  • Pre-compilation: All expensive parsing/compilation done once at startup
  • Direct Instantiation: Each engine owns its DataLogic instance directly
  • Modular Design: Clear boundaries between compilation, execution, and orchestration

⚡ Performance

Dataflow-rs achieves optimal performance through architectural improvements:

  • Pre-Compilation: All JSONLogic compiled at startup, zero runtime overhead
  • Multi-Threading: Built-in ThreadedEngine for vertical scaling with configurable worker threads
  • Cache-Friendly: Compiled logic stored contiguously in memory
  • Direct Instantiation: DataLogic instances created directly without locking
  • Predictable Latency: No runtime allocations for logic evaluation
  • Modular Design: Clear separation of compilation and execution phases

Run the included benchmark to test performance on your hardware:

cargo run --example benchmark

The benchmark compares single-threaded Engine vs multi-threaded ThreadedEngine with various worker and client configurations, providing detailed performance metrics and scaling analysis.

🛠️ Custom Functions

You can extend the engine with your own custom logic by implementing the FunctionHandler trait:

use dataflow_rs::engine::{FunctionHandler, FunctionConfig, error::Result, message::{Change, Message}};
use datalogic_rs::DataLogic;
use serde_json::{json, Value};
use std::collections::HashMap;

pub struct MyCustomFunction;

impl FunctionHandler for MyCustomFunction {
    fn execute(
        &self, 
        message: &mut Message, 
        config: &FunctionConfig,
        datalogic: &DataLogic,
    ) -> Result<(usize, Vec<Change>)> {
        // Your custom logic here
        println!("Hello from a custom function!");
        
        // Modify message data
        message.data["processed"] = json!(true);
        
        // Return status code and changes for audit trail
        Ok((200, vec![Change {
            path: "data.processed".to_string(),
            old_value: json!(null),
            new_value: json!(true),
            operation: "set".to_string(),
        }]))
    }
}

// Register when creating the engine:
let mut custom_functions = HashMap::new();
custom_functions.insert(
    "my_custom_function".to_string(),
    Box::new(MyCustomFunction) as Box<dyn FunctionHandler + Send + Sync>
);

let mut engine = Engine::new(
    workflows,
    Some(custom_functions),  // Custom functions
    None,  // Use default retry config
);

🤝 Contributing

We welcome contributions! Feel free to fork the repository, make your changes, and submit a pull request. Please make sure to add tests for any new features.

🏢 About Plasmatic

Dataflow-rs is developed by the team at Plasmatic. We're passionate about building open-source tools for data processing.

📄 License

This project is licensed under the Apache License, Version 2.0. See the LICENSE file for more details.