Skip to main content

Module engine

Module engine 

Source
Expand description

§Engine Module

This module implements the core async workflow engine for dataflow-rs. The engine provides high-performance, asynchronous message processing through workflows composed of tasks.

§Architecture

The engine features a clean async-first architecture built on datalogic v5:

  • Compiler: Pre-compiles JSONLogic expressions into Arc<Logic> via Engine::compile_arc
  • Executor: Handles internal function execution (map, validation) with async support
  • Engine: Orchestrates workflow processing with shared compiled logic
  • Thread-Safe: Single datalogic_rs::Engine shared via Arc, with Arc<Logic> entries for zero-copy sharing

§Key Components

  • Engine: Async engine optimized for Tokio runtime with mixed I/O and CPU workloads
  • LogicCompiler: Compiles and caches JSONLogic expressions during initialization
  • InternalExecutor: Executes built-in map and validation functions with compiled logic
  • Workflow: Collection of tasks with JSONLogic conditions (can access data, metadata, temp_data)
  • Task: Individual processing unit that performs a specific function on a message
  • AsyncFunctionHandler: Trait for custom async processing logic
  • Message: Data structure flowing through the engine with audit trail

§Performance Optimizations

  • Pre-compilation: All JSONLogic expressions compiled at startup
  • Arc-wrapped Logic: Zero-copy sharing of compiled logic across async tasks
  • Bump-arena evaluation: Per-worker thread-local Bump is rewound (not freed) between evals
  • True Async: I/O operations remain fully async

§Usage

use dataflow_rs::{Engine, Workflow, engine::message::Message};
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Define workflows
    let workflows = vec![
        Workflow::from_json(r#"{"id": "example", "name": "Example", "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#)?
    ];

    // Create engine with defaults
    let engine = Engine::builder().with_workflows(workflows).build()?;

    // Process messages asynchronously
    let mut message = Message::from_value(&json!({}));
    engine.process_message(&mut message).await?;

    Ok(())
}

Re-exports§

pub use error::DataflowError;
pub use error::ErrorInfo;
pub use error::Result;
pub use functions::AsyncFunctionHandler;
pub use functions::BoxedFunctionHandler;
pub use functions::CompiledCustomInput;
pub use functions::FunctionConfig;
pub use message::Message;
pub use task::Task;
pub use task_context::TaskContext;
pub use task_outcome::TaskOutcome;
pub use trace::ExecutionStep;
pub use trace::ExecutionTrace;
pub use trace::StepResult;
pub use workflow::Workflow;
pub use workflow::WorkflowStatus;

Modules§

compiler
Workflow Compilation Module
error
executor
Evaluation Primitives
functions
message
task
Task Module
task_context
Task context
task_executor
Task Execution Module
task_outcome
Task outcome
trace
Execution Trace Module
utils
Utility Functions Module
workflow
workflow_executor
Workflow Execution Module

Structs§

Engine
High-performance async workflow engine for message processing.
EngineBuilder
Builder for Engine. The recommended construction path — chain register("name", handler) and with_workflow(workflow) calls, then build() to produce a Result<Engine>. Empty registration is fine; an engine with no custom handlers still resolves the built-in functions.