Expand description
§Engine Module
This module implements the core async workflow engine for dataflow-rs. The engine provides high-performance, asynchronous message processing through workflows composed of tasks.
§Architecture
The engine features a clean async-first architecture with DataLogic v4:
- Compiler: Pre-compiles JSONLogic expressions using DataLogic v4’s Arc
- Executor: Handles internal function execution (map, validation) with async support
- Engine: Orchestrates workflow processing with shared compiled logic
- Thread-Safe: Single DataLogic instance with Arc-wrapped compiled logic for zero-copy sharing
§Key Components
- Engine: Async engine optimized for Tokio runtime with mixed I/O and CPU workloads
- LogicCompiler: Compiles and caches JSONLogic expressions during initialization
- InternalExecutor: Executes built-in map and validation functions with compiled logic
- Workflow: Collection of tasks with JSONLogic conditions (metadata-only access)
- Task: Individual processing unit that performs a specific function on a message
- AsyncFunctionHandler: Trait for custom async processing logic
- Message: Data structure flowing through the engine with audit trail
§Performance Optimizations
- Pre-compilation: All JSONLogic expressions compiled at startup
- Arc-wrapped Logic: Zero-copy sharing of compiled logic across async tasks
- Spawn Blocking: CPU-intensive JSONLogic evaluation in blocking tasks
- True Async: I/O operations remain fully async
§Usage
use dataflow_rs::{Engine, Workflow, engine::message::Message};
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Define workflows
let workflows = vec![
Workflow::from_json(r#"{"id": "example", "name": "Example", "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#)?
];
// Create engine with defaults
let engine = Engine::new(workflows, None);
// Process messages asynchronously
let mut message = Message::from_value(&json!({}));
engine.process_message(&mut message).await?;
Ok(())
}
Re-exports§
pub use error::DataflowError;
pub use error::ErrorInfo;
pub use error::Result;
pub use functions::AsyncFunctionHandler;
pub use functions::FunctionConfig;
pub use message::Message;
pub use task::Task;
pub use workflow::Workflow;
Modules§
- compiler
- Workflow Compilation Module
- error
- executor
- Internal Function Execution Module
- functions
- message
- task
- task_
executor - Task Execution Module
- utils
- Utility Functions Module
- workflow
- workflow_
executor - Workflow Execution Module
Structs§
- Engine
- High-performance async workflow engine for message processing.