Skip to main content

Crate dataflow_rs

Crate dataflow_rs 

Source
Expand description

§Dataflow-rs

A lightweight rules engine for building IFTTT-style automation and data processing pipelines in Rust.

§Overview

Dataflow-rs provides a high-performance rules engine that follows the IF → THEN → THAT model:

  • IF — Define conditions using JSONLogic expressions (evaluated against data, metadata, temp_data)
  • THEN — Execute actions: data transformation, validation, or custom async logic
  • THAT — Chain multiple actions and rules with priority ordering

Rules are defined declaratively in JSON and compiled once at startup for zero-overhead evaluation at runtime.

§Key Components

Rules EngineWorkflow EngineDescription
RulesEngineEngineCentral async component that evaluates rules and executes actions
RuleWorkflowA condition + actions bundle — IF condition THEN execute actions
ActionTaskAn individual processing step that performs a function on a message
  • AsyncFunctionHandler: A trait implemented by action handlers to define custom async processing logic
  • TaskContext: Per-call context handed to handlers — typed data accessors, audit-trail-aware setters
  • TaskOutcome: Return value of a handler — Success, Status(code), Skip, or Halt
  • Message: The data structure that flows through the engine, containing payload, metadata, and processing results

§Built-in Functions

The engine ships with the following pre-registered functions, available to any workflow without further setup:

CategoryFunctionPurpose
Parseparse_jsonDeserialize a JSON payload string into data
Parseparse_xmlDeserialize an XML payload string into data
TransformmapAssign JSONLogic-derived values to dot-paths within the message
ValidatevalidationApply JSONLogic rules with custom error messages
RoutingfilterSkip or halt processing based on a JSONLogic predicate
RoutinglogEmit a log entry at a configurable level
Publishpublish_jsonRender data back out as a JSON payload
Publishpublish_xmlRender data back out as an XML payload

In addition, dataflow-rs provides typed config schemas for three common service-layer integrations — http_call, enrich, and publish_kafka. These are not pre-registered: register an AsyncFunctionHandler under the matching name and the engine handles config validation and JSONLogic pre-compilation for you. See HttpCallConfig, EnrichConfig, and PublishKafkaConfig.

Custom functions are registered through Engine::builder().register(...); see the Extending with Custom Functions section below.

§Usage Example

use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Define a workflow in JSON
    let workflow_json = r#"
    {
        "id": "data_processor",
        "name": "Data Processor",
        "priority": 0,
        "tasks": [
            {
                "id": "transform_data",
                "name": "Transform Data",
                "function": {
                    "name": "map",
                    "input": {
                        "mappings": [
                            {
                                "path": "data.result",
                                "logic": { "var": "temp_data.value" }
                            }
                        ]
                    }
                }
            }
        ]
    }
    "#;

    // Parse the workflow
    let workflow = Workflow::from_json(workflow_json)?;

    // Create the workflow engine — builder is the recommended path; built-in
    // functions are auto-registered.
    let engine = Engine::builder().with_workflow(workflow).build()?;

    // Create a message to process
    let mut message = Message::from_value(&json!({}));

    // Process the message through the workflow
    match engine.process_message(&mut message).await {
        Ok(_) => {
            println!("Processed result: {}", message.context["data"]["result"]);
        }
        Err(e) => {
            println!("Error in workflow: {:?}", e);
        }
    }

    Ok(())
}

§Error Handling

The library provides a comprehensive error handling system:

use dataflow_rs::{Engine, Result, DataflowError};
use dataflow_rs::engine::message::Message;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<()> {
    // ... setup workflows ...
    let engine = Engine::builder().build()?;

    let mut message = Message::from_value(&json!({}));

    // Process the message, errors will be collected but not halt execution
    engine.process_message(&mut message).await?;

    // Check if there were any errors during processing
    if message.has_errors() {
        for error in message.errors() {
            println!("Error in workflow: {:?}, task: {:?}: {:?}",
                     error.workflow_id, error.task_id, error.message);
        }
    }

    Ok(())
}

§Extending with Custom Functions

Implement AsyncFunctionHandler with a typed Input so the engine deserializes your config once at startup; handlers then receive typed input and a TaskContext that records audit-trail changes automatically.

use dataflow_rs::{
    AsyncFunctionHandler, Engine, Result, TaskContext, TaskOutcome, Workflow,
};
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::json;
use async_trait::async_trait;

#[derive(Deserialize)]
struct StatsInput {
    /// Path inside `data` whose array of numbers to summarize.
    source: String,
    /// Path inside `data` to write the result to.
    target: String,
}

struct Statistics;

#[async_trait]
impl AsyncFunctionHandler for Statistics {
    type Input = StatsInput;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &StatsInput,
    ) -> Result<TaskOutcome> {
        let count = ctx.data()
            .get(input.source.as_str())
            .and_then(|v| v.as_array())
            .map(|arr| arr.len())
            .unwrap_or(0);

        ctx.set(
            &format!("data.{}", input.target),
            OwnedDataValue::from(&json!({ "count": count })),
        );
        Ok(TaskOutcome::Success)
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    let engine = Engine::builder()
        .register("statistics", Statistics)
        // .with_workflow(workflow)
        .build()?;
    // ...
    Ok(())
}

§Ecosystem

Dataflow-rs is part of a small family of crates that share the same workflow and JSONLogic shape:

CratePurpose
dataflow-rsThis crate — async workflow engine in Rust
@goplasmatic/dataflow-wasmWebAssembly bindings — run workflows in the browser or Node
@goplasmatic/dataflow-uiReact components for visualizing and debugging workflows
datalogic-rsThe JSONLogic compiler/evaluator used internally

Source for all four lives under https://github.com/GoPlasmatic.

Re-exports§

pub use engine::error::DataflowError;
pub use engine::error::ErrorInfo;
pub use engine::error::Result;
pub use engine::functions::AsyncFunctionHandler;
pub use engine::functions::BoxedFunctionHandler;
pub use engine::functions::EnrichConfig;
pub use engine::functions::FilterConfig;
pub use engine::functions::FunctionConfig;
pub use engine::functions::HttpCallConfig;
pub use engine::functions::LogConfig;
pub use engine::functions::MapConfig;
pub use engine::functions::MapMapping;
pub use engine::functions::PublishKafkaConfig;
pub use engine::functions::ValidationConfig;
pub use engine::functions::ValidationRule;
pub use engine::message::AuditTrail;
pub use engine::message::Change;
pub use engine::message::Message;
pub use engine::message::MessageBuilder;
pub use engine::task_context::TaskContext;
pub use engine::task_outcome::TaskOutcome;
pub use engine::trace::ExecutionStep;
pub use engine::trace::ExecutionTrace;
pub use engine::trace::StepResult;
pub use engine::Engine;
pub use engine::EngineBuilder;
pub use engine::Task;
pub use engine::Workflow;
pub use engine::WorkflowStatus;

Modules§

engine
Engine Module
prelude
Common imports for building dataflow-rs engines and handlers.

Type Aliases§

Action
Type alias for Task — an Action is an individual processing step within a rule.
Rule
Type alias for Workflow — a Rule represents an IF-THEN unit: IF condition THEN execute actions.
RulesEngine
Type alias for Engine — the RulesEngine evaluates rules and executes their actions.