pub struct Engine { /* private fields */ }Expand description
High-performance async workflow engine for message processing.
§Architecture
The engine is designed for async-first operation with Tokio:
- Separation of Concerns: Distinct executors for workflows and tasks
- Shared DataLogic: Single DataLogic instance with Arc for thread-safe sharing
- Arc
: Pre-compiled logic shared across all async tasks - Async Functions: Native async support for I/O-bound operations
§Performance Characteristics
- Zero Runtime Compilation: All logic compiled during initialization
- Zero-Copy Sharing: Arc-wrapped compiled logic shared without cloning
- Optimal for Mixed Workloads: Async I/O with blocking CPU evaluation
- Thread-Safe by Design: All components safe to share across Tokio tasks
Implementations§
Source§impl Engine
impl Engine
Sourcepub fn new(
workflows: Vec<Workflow>,
custom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
) -> Self
pub fn new( workflows: Vec<Workflow>, custom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>, ) -> Self
Creates a new Engine instance with configurable parameters.
§Arguments
workflows- The workflows to use for processing messagescustom_functions- Optional custom async function handlers
§Example
use dataflow_rs::{Engine, Workflow};
let workflows = vec![Workflow::from_json(r#"{"id": "test", "name": "Test", "priority": 0, "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#).unwrap()];
// Simple usage with defaults
let engine = Engine::new(workflows, None);Sourcepub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Self
pub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Self
Creates a new Engine with different workflows but the same custom function handlers.
This is the hot-reload path. The existing engine remains valid for any
in-flight process_message calls. The returned engine shares the same
function registry (zero-copy Arc bump) but has freshly compiled logic
for the new workflow set.
§Arguments
workflows- The new set of workflows to compile and use
Sourcepub async fn process_message(&self, message: &mut Message) -> Result<()>
pub async fn process_message(&self, message: &mut Message) -> Result<()>
Processes a message through workflows that match their conditions.
This async method:
- Iterates through workflows sequentially in priority order (pre-sorted at construction)
- Delegates workflow execution to the WorkflowExecutor
- Updates message metadata
§Arguments
message- The message to process through workflows
§Returns
Result<()>- Ok(()) if processing succeeded, Err if a fatal error occurred
Sourcepub async fn process_message_with_trace(
&self,
message: &mut Message,
) -> Result<ExecutionTrace>
pub async fn process_message_with_trace( &self, message: &mut Message, ) -> Result<ExecutionTrace>
Processes a message through workflows with step-by-step tracing.
This method is similar to process_message but captures an execution trace
that can be used for debugging and step-by-step visualization.
§Arguments
message- The message to process through workflows
§Returns
Result<ExecutionTrace>- The execution trace with message snapshots
Sourcepub async fn process_message_for_channel(
&self,
channel: &str,
message: &mut Message,
) -> Result<()>
pub async fn process_message_for_channel( &self, channel: &str, message: &mut Message, ) -> Result<()>
Processes a message through only the Active workflows registered for a given channel.
Workflows are processed in priority order (lowest first), same as process_message(). If the channel does not exist or has no Active workflows, this is a no-op.
§Arguments
channel- The channel name to route the message throughmessage- The message to process
Sourcepub async fn process_message_for_channel_with_trace(
&self,
channel: &str,
message: &mut Message,
) -> Result<ExecutionTrace>
pub async fn process_message_for_channel_with_trace( &self, channel: &str, message: &mut Message, ) -> Result<ExecutionTrace>
Processes a message through a channel with step-by-step tracing.
§Arguments
channel- The channel name to route the message throughmessage- The message to process
Sourcepub fn workflows(&self) -> &Arc<Vec<Workflow>>
pub fn workflows(&self) -> &Arc<Vec<Workflow>>
Get a reference to the workflows (pre-sorted by priority)
Sourcepub fn workflow_by_id(&self, id: &str) -> Option<&Workflow>
pub fn workflow_by_id(&self, id: &str) -> Option<&Workflow>
Look up a workflow by its ID
Sourcepub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>>
pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>>
Get a reference to the compiled logic cache