pub struct Engine { /* private fields */ }Expand description
High-performance async workflow engine for message processing.
§Architecture
The engine is designed for async-first operation with Tokio:
- Separation of Concerns: Distinct executors for workflows and tasks
- Shared datalogic engine: Single
datalogic_rs::Enginewrapped inArcfor thread-safe sharing - Arc
: Pre-compiled logic shared across all async tasks - Async Functions: Native async support for I/O-bound operations
§Performance Characteristics
- Zero Runtime Compilation: All logic compiled during initialization
- Zero-Copy Sharing: Arc-wrapped compiled logic shared without cloning
- Optimal for Mixed Workloads: Async I/O with blocking CPU evaluation
- Thread-Safe by Design: All components safe to share across Tokio tasks
Implementations§
Source§impl Engine
impl Engine
Sourcepub fn new(
workflows: Vec<Workflow>,
custom_functions: HashMap<String, BoxedFunctionHandler>,
) -> Result<Self>
pub fn new( workflows: Vec<Workflow>, custom_functions: HashMap<String, BoxedFunctionHandler>, ) -> Result<Self>
Creates a new Engine instance.
Compiles every workflow / task / function-config JSONLogic expression
up-front. Returns Err(DataflowError) if any required expression
fails to compile — fail-loud at construction time instead of silently
dropping broken workflows at runtime.
§Arguments
workflows- The workflows to use for processing messagescustom_functions- Custom async function handlers (useHashMap::new()for none, or preferEngine::builder)
§Example
use dataflow_rs::{Engine, Workflow};
let workflows = vec![Workflow::from_json(r#"{"id": "test", "name": "Test", "priority": 0, "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#).unwrap()];
let engine = Engine::builder().with_workflows(workflows).build().unwrap();The recommended construction path is Engine::builder. Engine::new
is the lower-level escape hatch — accepts handlers as a plain
HashMap (use HashMap::new() for the no-handler case).
Sourcepub fn builder() -> EngineBuilder
pub fn builder() -> EngineBuilder
Start building an engine. The recommended construction path —
chains register("name", handler) and with_workflow(w) calls,
then build() to produce a Result<Engine>.
use dataflow_rs::{Engine, Workflow};
let engine = Engine::builder()
.with_workflow(workflow)
// .register("my_handler", MyHandler) // any AsyncFunctionHandler
.build()
.unwrap();Sourcepub fn engine_version_value(&self) -> &OwnedDataValue
pub fn engine_version_value(&self) -> &OwnedDataValue
Cached OwnedDataValue::String of the engine version.
Sourcepub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Result<Self>
pub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Result<Self>
Creates a new Engine with different workflows but the same custom function handlers.
This is the hot-reload path. The existing engine remains valid for any
in-flight process_message calls. The returned engine shares the same
function registry (zero-copy Arc bump) but has freshly compiled logic
for the new workflow set.
§Arguments
workflows- The new set of workflows to compile and use
Sourcepub async fn process_message(&self, message: &mut Message) -> Result<()>
pub async fn process_message(&self, message: &mut Message) -> Result<()>
Processes a message through workflows that match their conditions.
This async method:
- Iterates through workflows sequentially in priority order (pre-sorted at construction)
- Delegates workflow execution to the WorkflowExecutor
- Updates message metadata
§Error contract
Errors flow through two complementary channels:
message.errors()— always contains every error encountered (validation failures, task panics, 5xx-status outcomes, workflow wrappers). Callers that want a uniform view inspect this list.Result::Err— signals only that the engine stopped before processing every workflow. Callers that want fail-fast match on this. The error pushed tomessage.errorsfor the same failure carries the workflow context (id) that the bareErrdoesn’t.
In particular: a workflow with continue_on_error: true records its
errors to message.errors and returns Ok(()) here. A workflow
with continue_on_error: false records to message.errors and
returns Result::Err (which short-circuits the rest of this call).
§Arguments
message- The message to process through workflows
§Returns
Result<()>—Ok(())if every workflow completed (each may have pushed errors tomessage.errors);Err(e)if the engine stopped early on a hard failure.
Sourcepub async fn process_message_with_trace(
&self,
message: &mut Message,
) -> Result<ExecutionTrace>
pub async fn process_message_with_trace( &self, message: &mut Message, ) -> Result<ExecutionTrace>
Processes a message through workflows with step-by-step tracing.
This method is similar to process_message but captures an execution trace
that can be used for debugging and step-by-step visualization.
§Arguments
message- The message to process through workflows
§Returns
Result<ExecutionTrace>- The execution trace with message snapshots
Sourcepub async fn process_message_for_channel(
&self,
channel: &str,
message: &mut Message,
) -> Result<()>
pub async fn process_message_for_channel( &self, channel: &str, message: &mut Message, ) -> Result<()>
Processes a message through only the Active workflows registered for a given channel.
Workflows are processed in priority order (lowest first), same as process_message(). If the channel does not exist or has no Active workflows, this is a no-op.
§Arguments
channel- The channel name to route the message throughmessage- The message to process
Sourcepub async fn process_message_for_channel_with_trace(
&self,
channel: &str,
message: &mut Message,
) -> Result<ExecutionTrace>
pub async fn process_message_for_channel_with_trace( &self, channel: &str, message: &mut Message, ) -> Result<ExecutionTrace>
Processes a message through a channel with step-by-step tracing.
§Arguments
channel- The channel name to route the message throughmessage- The message to process
Sourcepub fn workflows(&self) -> &Arc<Vec<Workflow>>
pub fn workflows(&self) -> &Arc<Vec<Workflow>>
Get a reference to the workflows (pre-sorted by priority)
Sourcepub fn workflow_by_id(&self, id: &str) -> Option<&Workflow>
pub fn workflow_by_id(&self, id: &str) -> Option<&Workflow>
Look up a workflow by its ID
Sourcepub fn datalogic(&self) -> &Arc<DatalogicEngine>
pub fn datalogic(&self) -> &Arc<DatalogicEngine>
Get a reference to the underlying datalogic v5 engine.