Skip to main content

dataflow_rs/engine/
mod.rs

1/*!
2# Engine Module
3
4This module implements the core async workflow engine for dataflow-rs. The engine provides
5high-performance, asynchronous message processing through workflows composed of tasks.
6
7## Architecture
8
9The engine features a clean async-first architecture with DataLogic v4:
10- **Compiler**: Pre-compiles JSONLogic expressions using DataLogic v4's Arc<CompiledLogic>
11- **Executor**: Handles internal function execution (map, validation) with async support
12- **Engine**: Orchestrates workflow processing with shared compiled logic
13- **Thread-Safe**: Single DataLogic instance with Arc-wrapped compiled logic for zero-copy sharing
14
15## Key Components
16
17- **Engine**: Async engine optimized for Tokio runtime with mixed I/O and CPU workloads
18- **LogicCompiler**: Compiles and caches JSONLogic expressions during initialization
19- **InternalExecutor**: Executes built-in map and validation functions with compiled logic
20- **Workflow**: Collection of tasks with JSONLogic conditions (metadata-only access)
21- **Task**: Individual processing unit that performs a specific function on a message
22- **AsyncFunctionHandler**: Trait for custom async processing logic
23- **Message**: Data structure flowing through the engine with audit trail
24
25## Performance Optimizations
26
27- **Pre-compilation**: All JSONLogic expressions compiled at startup
28- **Arc-wrapped Logic**: Zero-copy sharing of compiled logic across async tasks
29- **Spawn Blocking**: CPU-intensive JSONLogic evaluation in blocking tasks
30- **True Async**: I/O operations remain fully async
31
32## Usage
33
34```rust,no_run
35use dataflow_rs::{Engine, Workflow, engine::message::Message};
36use serde_json::json;
37
38#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40    // Define workflows
41    let workflows = vec![
42        Workflow::from_json(r#"{"id": "example", "name": "Example", "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#)?
43    ];
44
45    // Create engine with defaults
46    let engine = Engine::new(workflows, None);
47
48    // Process messages asynchronously
49    let mut message = Message::from_value(&json!({}));
50    engine.process_message(&mut message).await?;
51
52    Ok(())
53}
54```
55*/
56
57pub mod compiler;
58pub mod error;
59pub mod executor;
60pub mod functions;
61pub mod message;
62pub mod task;
63pub mod task_executor;
64pub mod trace;
65pub mod utils;
66pub mod workflow;
67pub mod workflow_executor;
68
69// Re-export key types for easier access
70pub use error::{DataflowError, ErrorInfo, Result};
71pub use functions::{AsyncFunctionHandler, FunctionConfig};
72pub use message::Message;
73pub use task::Task;
74pub use trace::{ExecutionStep, ExecutionTrace, StepResult};
75pub use workflow::Workflow;
76
77use chrono::Utc;
78use datalogic_rs::{CompiledLogic, DataLogic};
79use serde_json::json;
80use std::collections::HashMap;
81use std::sync::Arc;
82
83use compiler::LogicCompiler;
84use executor::InternalExecutor;
85use task_executor::TaskExecutor;
86use workflow_executor::WorkflowExecutor;
87
88/// High-performance async workflow engine for message processing.
89///
90/// ## Architecture
91///
92/// The engine is designed for async-first operation with Tokio:
93/// - **Separation of Concerns**: Distinct executors for workflows and tasks
94/// - **Shared DataLogic**: Single DataLogic instance with Arc for thread-safe sharing
95/// - **Arc<CompiledLogic>**: Pre-compiled logic shared across all async tasks
96/// - **Async Functions**: Native async support for I/O-bound operations
97///
98/// ## Performance Characteristics
99///
100/// - **Zero Runtime Compilation**: All logic compiled during initialization
101/// - **Zero-Copy Sharing**: Arc-wrapped compiled logic shared without cloning
102/// - **Optimal for Mixed Workloads**: Async I/O with blocking CPU evaluation
103/// - **Thread-Safe by Design**: All components safe to share across Tokio tasks
104pub struct Engine {
105    /// Registry of available workflows (immutable after initialization)
106    workflows: Arc<HashMap<String, Workflow>>,
107    /// Workflow executor for orchestrating workflow execution
108    workflow_executor: Arc<WorkflowExecutor>,
109    /// Shared DataLogic instance for JSONLogic evaluation
110    datalogic: Arc<DataLogic>,
111    /// Compiled logic cache with Arc for zero-copy sharing
112    logic_cache: Vec<Arc<CompiledLogic>>,
113}
114
115impl Engine {
116    /// Creates a new Engine instance with configurable parameters.
117    ///
118    /// # Arguments
119    /// * `workflows` - The workflows to use for processing messages
120    /// * `custom_functions` - Optional custom async function handlers
121    ///
122    /// # Example
123    ///
124    /// ```
125    /// use dataflow_rs::{Engine, Workflow};
126    ///
127    /// let workflows = vec![Workflow::from_json(r#"{"id": "test", "name": "Test", "priority": 0, "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#).unwrap()];
128    ///
129    /// // Simple usage with defaults
130    /// let engine = Engine::new(workflows, None);
131    /// ```
132    pub fn new(
133        workflows: Vec<Workflow>,
134        custom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
135    ) -> Self {
136        // Compile workflows with DataLogic v4
137        let mut compiler = LogicCompiler::new();
138        let workflow_map = compiler.compile_workflows(workflows);
139        let (datalogic, logic_cache) = compiler.into_parts();
140
141        let mut task_functions = custom_functions.unwrap_or_default();
142
143        // Add built-in async function handlers
144        for (name, handler) in functions::builtins::get_all_functions() {
145            task_functions.insert(name, handler);
146        }
147
148        // Create internal executor with shared DataLogic and compiled logic
149        let internal_executor = Arc::new(InternalExecutor::new(
150            Arc::clone(&datalogic),
151            logic_cache.clone(),
152        ));
153
154        // Create task executor
155        let task_executor = Arc::new(TaskExecutor::new(
156            Arc::new(task_functions),
157            Arc::clone(&internal_executor),
158            Arc::clone(&datalogic),
159        ));
160
161        // Create workflow executor
162        let workflow_executor = Arc::new(WorkflowExecutor::new(task_executor, internal_executor));
163
164        Self {
165            workflows: Arc::new(workflow_map),
166            workflow_executor,
167            datalogic,
168            logic_cache,
169        }
170    }
171
172    /// Processes a message through workflows that match their conditions.
173    ///
174    /// This async method:
175    /// 1. Iterates through workflows sequentially in deterministic order (sorted by ID)
176    /// 2. Delegates workflow execution to the WorkflowExecutor
177    /// 3. Updates message metadata
178    ///
179    /// # Arguments
180    /// * `message` - The message to process through workflows
181    ///
182    /// # Returns
183    /// * `Result<()>` - Ok(()) if processing succeeded, Err if a fatal error occurred
184    pub async fn process_message(&self, message: &mut Message) -> Result<()> {
185        // Set processing metadata
186        message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
187        message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
188        message.invalidate_context_cache();
189
190        // Sort workflows by priority for proper execution order
191        let mut workflows: Vec<_> = self.workflows.values().collect();
192        workflows.sort_by_key(|w| w.priority);
193
194        // Process each workflow in priority order
195        for workflow in workflows {
196            // Execute workflow through the workflow executor
197            self.workflow_executor.execute(workflow, message).await?;
198        }
199
200        Ok(())
201    }
202
203    /// Processes a message through workflows with step-by-step tracing.
204    ///
205    /// This method is similar to `process_message` but captures an execution trace
206    /// that can be used for debugging and step-by-step visualization.
207    ///
208    /// # Arguments
209    /// * `message` - The message to process through workflows
210    ///
211    /// # Returns
212    /// * `Result<ExecutionTrace>` - The execution trace with message snapshots
213    pub async fn process_message_with_trace(
214        &self,
215        message: &mut Message,
216    ) -> Result<ExecutionTrace> {
217        use trace::ExecutionTrace;
218
219        // Set processing metadata
220        message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
221        message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
222        message.invalidate_context_cache();
223
224        let mut trace = ExecutionTrace::new();
225
226        // Sort workflows by priority for proper execution order
227        let mut workflows: Vec<_> = self.workflows.values().collect();
228        workflows.sort_by_key(|w| w.priority);
229
230        // Process each workflow in priority order
231        for workflow in workflows {
232            // Execute workflow through the workflow executor with trace collection
233            self.workflow_executor
234                .execute_with_trace(workflow, message, &mut trace)
235                .await?;
236        }
237
238        Ok(trace)
239    }
240
241    /// Get a reference to the workflows
242    pub fn workflows(&self) -> &Arc<HashMap<String, Workflow>> {
243        &self.workflows
244    }
245
246    /// Get a reference to the DataLogic instance
247    pub fn datalogic(&self) -> &Arc<DataLogic> {
248        &self.datalogic
249    }
250
251    /// Get a reference to the compiled logic cache
252    pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
253        &self.logic_cache
254    }
255}