Module engine

Module engine 

Source
Expand description

§Engine Module

This module implements the core async workflow engine for dataflow-rs. The engine provides high-performance, asynchronous message processing through workflows composed of tasks.

§Architecture

The engine features a clean async-first architecture with DataLogic v4:

  • Compiler: Pre-compiles JSONLogic expressions using DataLogic v4’s Arc
  • Executor: Handles internal function execution (map, validation) with async support
  • Engine: Orchestrates workflow processing with shared compiled logic
  • Thread-Safe: Single DataLogic instance with Arc-wrapped compiled logic for zero-copy sharing

§Key Components

  • Engine: Async engine optimized for Tokio runtime with mixed I/O and CPU workloads
  • LogicCompiler: Compiles and caches JSONLogic expressions during initialization
  • InternalExecutor: Executes built-in map and validation functions with compiled logic
  • Workflow: Collection of tasks with JSONLogic conditions (metadata-only access)
  • Task: Individual processing unit that performs a specific function on a message
  • AsyncFunctionHandler: Trait for custom async processing logic
  • Message: Data structure flowing through the engine with audit trail

§Performance Optimizations

  • Pre-compilation: All JSONLogic expressions compiled at startup
  • Arc-wrapped Logic: Zero-copy sharing of compiled logic across async tasks
  • Spawn Blocking: CPU-intensive JSONLogic evaluation in blocking tasks
  • True Async: I/O operations remain fully async

§Usage

use dataflow_rs::{Engine, Workflow, engine::message::Message};
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Define workflows
    let workflows = vec![
        Workflow::from_json(r#"{"id": "example", "name": "Example", "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#)?
    ];

    // Create engine with defaults
    let engine = Engine::new(workflows, None);

    // Process messages asynchronously
    let mut message = Message::from_value(&json!({}));
    engine.process_message(&mut message).await?;

    Ok(())
}

Re-exports§

pub use error::DataflowError;
pub use error::ErrorInfo;
pub use error::Result;
pub use functions::AsyncFunctionHandler;
pub use functions::FunctionConfig;
pub use message::Message;
pub use task::Task;
pub use workflow::Workflow;

Modules§

compiler
Workflow Compilation Module
error
executor
Internal Function Execution Module
functions
message
task
task_executor
Task Execution Module
utils
Utility Functions Module
workflow
workflow_executor
Workflow Execution Module

Structs§

Engine
High-performance async workflow engine for message processing.