Expand description
§Engine Module
This module implements the core workflow engine for dataflow-rs. The engine provides thread-safe, vertically-scalable message processing through workflows composed of tasks.
§Thread-Safety & Concurrency (v1.0)
The engine now features a unified concurrency model with:
- DataLogic Pool: Thread-safe pool of DataLogic instances for JSONLogic evaluation
- Arc-Swap Workflows: Lock-free reads and atomic updates for workflow management
- Unified Concurrency: Single parameter controls both pool size and max concurrent messages
- Zero Contention: Pool size matches concurrent tasks to eliminate resource competition
§Key Components
- Engine: Thread-safe engine with configurable concurrency levels
- Workflow: Collection of tasks with JSONLogic conditions, stored using Arc-Swap
- Task: Individual processing unit that performs a specific function on a message
- AsyncFunctionHandler: Trait for custom async processing logic (now receives DataLogic parameter)
- Message: Data structure flowing through the engine, with dedicated DataLogic instance per workflow
- DataLogicPool: Pool of DataLogic instances for concurrent message processing
§Usage
use dataflow_rs::{Engine, engine::message::Message};
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create engine with default concurrency (CPU count)
let engine = Engine::new();
// Or specify custom concurrency level
let engine = Engine::with_concurrency(32);
// Process messages concurrently
let mut message = Message::new(&json!({}));
engine.process_message_concurrent(&mut message).await?;
Ok(())
}
Re-exports§
pub use error::DataflowError;
pub use error::ErrorInfo;
pub use error::Result;
pub use functions::AsyncFunctionHandler;
pub use message::Message;
pub use task::Task;
pub use workflow::Workflow;
pub use datalogic_rs as jsonlogic;
Modules§
Structs§
- Data
Logic Pool - DataLogic pool for thread-safe concurrent access
- Engine
- Thread-safe engine that processes messages through workflows using non-blocking async IO.
- Retry
Config - Configuration for retry behavior