Skip to main content

Engine

Struct Engine 

Source
pub struct Engine { /* private fields */ }
Expand description

High-performance async workflow engine for message processing.

§Architecture

The engine is designed for async-first operation with Tokio:

  • Separation of Concerns: Distinct executors for workflows and tasks
  • Shared datalogic engine: Single datalogic_rs::Engine wrapped in Arc for thread-safe sharing
  • Arc: Pre-compiled logic shared across all async tasks
  • Async Functions: Native async support for I/O-bound operations

§Performance Characteristics

  • Zero Runtime Compilation: All logic compiled during initialization
  • Zero-Copy Sharing: Arc-wrapped compiled logic shared without cloning
  • Optimal for Mixed Workloads: Async I/O with blocking CPU evaluation
  • Thread-Safe by Design: All components safe to share across Tokio tasks

Implementations§

Source§

impl Engine

Source

pub fn new( workflows: Vec<Workflow>, custom_functions: HashMap<String, BoxedFunctionHandler>, ) -> Result<Self>

Creates a new Engine instance.

Compiles every workflow / task / function-config JSONLogic expression up-front. Returns Err(DataflowError) if any required expression fails to compile — fail-loud at construction time instead of silently dropping broken workflows at runtime.

§Arguments
  • workflows - The workflows to use for processing messages
  • custom_functions - Custom async function handlers (use HashMap::new() for none, or prefer Engine::builder)
§Example
use dataflow_rs::{Engine, Workflow};

let workflows = vec![Workflow::from_json(r#"{"id": "test", "name": "Test", "priority": 0, "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#).unwrap()];

let engine = Engine::builder().with_workflows(workflows).build().unwrap();

The recommended construction path is Engine::builder. Engine::new is the lower-level escape hatch — accepts handlers as a plain HashMap (use HashMap::new() for the no-handler case).

Source

pub fn builder() -> EngineBuilder

Start building an engine. The recommended construction path — chains register("name", handler) and with_workflow(w) calls, then build() to produce a Result<Engine>.

use dataflow_rs::{Engine, Workflow};
let engine = Engine::builder()
    .with_workflow(workflow)
    // .register("my_handler", MyHandler)  // any AsyncFunctionHandler
    .build()
    .unwrap();
Source

pub fn engine_version_value(&self) -> &OwnedDataValue

Cached OwnedDataValue::String of the engine version.

Source

pub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Result<Self>

Creates a new Engine with different workflows but the same custom function handlers.

This is the hot-reload path. The existing engine remains valid for any in-flight process_message calls. The returned engine shares the same function registry (zero-copy Arc bump) but has freshly compiled logic for the new workflow set.

§Arguments
  • workflows - The new set of workflows to compile and use
Source

pub async fn process_message(&self, message: &mut Message) -> Result<()>

Processes a message through workflows that match their conditions.

This async method:

  1. Iterates through workflows sequentially in priority order (pre-sorted at construction)
  2. Delegates workflow execution to the WorkflowExecutor
  3. Updates message metadata
§Error contract

Errors flow through two complementary channels:

  • message.errors()always contains every error encountered (validation failures, task panics, 5xx-status outcomes, workflow wrappers). Callers that want a uniform view inspect this list.
  • Result::Err — signals only that the engine stopped before processing every workflow. Callers that want fail-fast match on this. The error pushed to message.errors for the same failure carries the workflow context (id) that the bare Err doesn’t.

In particular: a workflow with continue_on_error: true records its errors to message.errors and returns Ok(()) here. A workflow with continue_on_error: false records to message.errors and returns Result::Err (which short-circuits the rest of this call).

§Arguments
  • message - The message to process through workflows
§Returns
  • Result<()>Ok(()) if every workflow completed (each may have pushed errors to message.errors); Err(e) if the engine stopped early on a hard failure.
Source

pub async fn process_message_with_trace( &self, message: &mut Message, ) -> Result<ExecutionTrace>

Processes a message through workflows with step-by-step tracing.

This method is similar to process_message but captures an execution trace that can be used for debugging and step-by-step visualization.

§Arguments
  • message - The message to process through workflows
§Returns
  • Result<ExecutionTrace> - The execution trace with message snapshots
Source

pub async fn process_message_for_channel( &self, channel: &str, message: &mut Message, ) -> Result<()>

Processes a message through only the Active workflows registered for a given channel.

Workflows are processed in priority order (lowest first), same as process_message(). If the channel does not exist or has no Active workflows, this is a no-op.

§Arguments
  • channel - The channel name to route the message through
  • message - The message to process
Source

pub async fn process_message_for_channel_with_trace( &self, channel: &str, message: &mut Message, ) -> Result<ExecutionTrace>

Processes a message through a channel with step-by-step tracing.

§Arguments
  • channel - The channel name to route the message through
  • message - The message to process
Source

pub fn workflows(&self) -> &Arc<Vec<Workflow>>

Get a reference to the workflows (pre-sorted by priority)

Source

pub fn workflow_by_id(&self, id: &str) -> Option<&Workflow>

Look up a workflow by its ID

Source

pub fn datalogic(&self) -> &Arc<DatalogicEngine>

Get a reference to the underlying datalogic v5 engine.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.