Expand description
§Engine Module
This module implements the core async workflow engine for dataflow-rs. The engine provides high-performance, asynchronous message processing through workflows composed of tasks.
§Architecture
The engine features a clean async-first architecture built on datalogic v5:
- Compiler: Pre-compiles JSONLogic expressions into
Arc<Logic>viaEngine::compile_arc - Executor: Handles internal function execution (map, validation) with async support
- Engine: Orchestrates workflow processing with shared compiled logic
- Thread-Safe: Single
datalogic_rs::Engineshared viaArc, withArc<Logic>entries for zero-copy sharing
§Key Components
- Engine: Async engine optimized for Tokio runtime with mixed I/O and CPU workloads
- LogicCompiler: Compiles and caches JSONLogic expressions during initialization
- InternalExecutor: Executes built-in map and validation functions with compiled logic
- Workflow: Collection of tasks with JSONLogic conditions (can access data, metadata, temp_data)
- Task: Individual processing unit that performs a specific function on a message
- AsyncFunctionHandler: Trait for custom async processing logic
- Message: Data structure flowing through the engine with audit trail
§Performance Optimizations
- Pre-compilation: All JSONLogic expressions compiled at startup
- Arc-wrapped Logic: Zero-copy sharing of compiled logic across async tasks
- Bump-arena evaluation: Per-worker thread-local
Bumpis rewound (not freed) between evals - True Async: I/O operations remain fully async
§Usage
use dataflow_rs::{Engine, Workflow, engine::message::Message};
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Define workflows
let workflows = vec![
Workflow::from_json(r#"{"id": "example", "name": "Example", "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#)?
];
// Create engine with defaults
let engine = Engine::builder().with_workflows(workflows).build()?;
// Process messages asynchronously
let mut message = Message::from_value(&json!({}));
engine.process_message(&mut message).await?;
Ok(())
}Re-exports§
pub use error::DataflowError;pub use error::ErrorInfo;pub use error::Result;pub use functions::AsyncFunctionHandler;pub use functions::BoxedFunctionHandler;pub use functions::CompiledCustomInput;pub use functions::FunctionConfig;pub use message::Message;pub use task::Task;pub use task_context::TaskContext;pub use task_outcome::TaskOutcome;pub use trace::ExecutionStep;pub use trace::ExecutionTrace;pub use trace::StepResult;pub use workflow::Workflow;pub use workflow::WorkflowStatus;
Modules§
- compiler
- Workflow Compilation Module
- error
- executor
- Evaluation Primitives
- functions
- message
- task
- Task Module
- task_
context - Task context
- task_
executor - Task Execution Module
- task_
outcome - Task outcome
- trace
- Execution Trace Module
- utils
- Utility Functions Module
- workflow
- workflow_
executor - Workflow Execution Module
Structs§
- Engine
- High-performance async workflow engine for message processing.
- Engine
Builder - Builder for
Engine. The recommended construction path — chainregister("name", handler)andwith_workflow(workflow)calls, thenbuild()to produce aResult<Engine>. Empty registration is fine; an engine with no custom handlers still resolves the built-in functions.