Module engine

Module engine 

Source
Expand description

§Engine Module

This module implements the core workflow engine for dataflow-rs. The engine provides thread-safe, vertically-scalable message processing through workflows composed of tasks.

§Thread-Safety & Concurrency (v1.0)

The engine now features a unified concurrency model with:

  • DataLogic Pool: Thread-safe pool of DataLogic instances for JSONLogic evaluation
  • Arc-Swap Workflows: Lock-free reads and atomic updates for workflow management
  • Unified Concurrency: Single parameter controls both pool size and max concurrent messages
  • Zero Contention: Pool size matches concurrent tasks to eliminate resource competition

§Key Components

  • Engine: Thread-safe engine with configurable concurrency levels
  • Workflow: Collection of tasks with JSONLogic conditions, stored using Arc-Swap
  • Task: Individual processing unit that performs a specific function on a message
  • AsyncFunctionHandler: Trait for custom async processing logic (now receives DataLogic parameter)
  • Message: Data structure flowing through the engine, with dedicated DataLogic instance per workflow
  • DataLogicPool: Pool of DataLogic instances for concurrent message processing

§Usage

use dataflow_rs::{Engine, engine::message::Message};
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create engine with default concurrency (CPU count)
    let engine = Engine::new();

    // Or specify custom concurrency level
    let engine = Engine::with_concurrency(32);

    // Process messages concurrently
    let mut message = Message::new(&json!({}));
    engine.process_message_concurrent(&mut message).await?;

    Ok(())
}

Re-exports§

pub use error::DataflowError;
pub use error::ErrorInfo;
pub use error::Result;
pub use functions::AsyncFunctionHandler;
pub use message::Message;
pub use task::Task;
pub use workflow::Workflow;
pub use datalogic_rs as jsonlogic;

Modules§

error
functions
message
task
workflow

Structs§

DataLogicPool
DataLogic pool for thread-safe concurrent access
Engine
Thread-safe engine that processes messages through workflows using non-blocking async IO.
RetryConfig
Configuration for retry behavior