dataflow_rs/engine/mod.rs
1/*!
2# Engine Module
3
4This module implements the core async workflow engine for dataflow-rs. The engine provides
5high-performance, asynchronous message processing through workflows composed of tasks.
6
7## Architecture
8
9The engine features a clean async-first architecture with DataLogic v4:
10- **Compiler**: Pre-compiles JSONLogic expressions using DataLogic v4's Arc<CompiledLogic>
11- **Executor**: Handles internal function execution (map, validation) with async support
12- **Engine**: Orchestrates workflow processing with shared compiled logic
13- **Thread-Safe**: Single DataLogic instance with Arc-wrapped compiled logic for zero-copy sharing
14
15## Key Components
16
17- **Engine**: Async engine optimized for Tokio runtime with mixed I/O and CPU workloads
18- **LogicCompiler**: Compiles and caches JSONLogic expressions during initialization
19- **InternalExecutor**: Executes built-in map and validation functions with compiled logic
20- **Workflow**: Collection of tasks with JSONLogic conditions (metadata-only access)
21- **Task**: Individual processing unit that performs a specific function on a message
22- **AsyncFunctionHandler**: Trait for custom async processing logic
23- **Message**: Data structure flowing through the engine with audit trail
24
25## Performance Optimizations
26
27- **Pre-compilation**: All JSONLogic expressions compiled at startup
28- **Arc-wrapped Logic**: Zero-copy sharing of compiled logic across async tasks
29- **Spawn Blocking**: CPU-intensive JSONLogic evaluation in blocking tasks
30- **True Async**: I/O operations remain fully async
31
32## Usage
33
34```rust,no_run
35use dataflow_rs::{Engine, Workflow, engine::message::Message};
36use serde_json::json;
37
38#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40 // Define workflows
41 let workflows = vec![
42 Workflow::from_json(r#"{"id": "example", "name": "Example", "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#)?
43 ];
44
45 // Create engine with defaults
46 let engine = Engine::new(workflows, None);
47
48 // Process messages asynchronously
49 let mut message = Message::from_value(&json!({}));
50 engine.process_message(&mut message).await?;
51
52 Ok(())
53}
54```
55*/
56
57pub mod compiler;
58pub mod error;
59pub mod executor;
60pub mod functions;
61pub mod message;
62pub mod task;
63pub mod task_executor;
64pub mod utils;
65pub mod workflow;
66pub mod workflow_executor;
67
68// Re-export key types for easier access
69pub use error::{DataflowError, ErrorInfo, Result};
70pub use functions::{AsyncFunctionHandler, FunctionConfig};
71pub use message::Message;
72pub use task::Task;
73pub use workflow::Workflow;
74
75use chrono::Utc;
76use datalogic_rs::{CompiledLogic, DataLogic};
77use serde_json::json;
78use std::collections::HashMap;
79use std::sync::Arc;
80
81use compiler::LogicCompiler;
82use executor::InternalExecutor;
83use task_executor::TaskExecutor;
84use workflow_executor::WorkflowExecutor;
85
86/// High-performance async workflow engine for message processing.
87///
88/// ## Architecture
89///
90/// The engine is designed for async-first operation with Tokio:
91/// - **Separation of Concerns**: Distinct executors for workflows and tasks
92/// - **Shared DataLogic**: Single DataLogic instance with Arc for thread-safe sharing
93/// - **Arc<CompiledLogic>**: Pre-compiled logic shared across all async tasks
94/// - **Async Functions**: Native async support for I/O-bound operations
95///
96/// ## Performance Characteristics
97///
98/// - **Zero Runtime Compilation**: All logic compiled during initialization
99/// - **Zero-Copy Sharing**: Arc-wrapped compiled logic shared without cloning
100/// - **Optimal for Mixed Workloads**: Async I/O with blocking CPU evaluation
101/// - **Thread-Safe by Design**: All components safe to share across Tokio tasks
102pub struct Engine {
103 /// Registry of available workflows (immutable after initialization)
104 workflows: Arc<HashMap<String, Workflow>>,
105 /// Workflow executor for orchestrating workflow execution
106 workflow_executor: Arc<WorkflowExecutor>,
107 /// Shared DataLogic instance for JSONLogic evaluation
108 datalogic: Arc<DataLogic>,
109 /// Compiled logic cache with Arc for zero-copy sharing
110 logic_cache: Vec<Arc<CompiledLogic>>,
111}
112
113impl Engine {
114 /// Creates a new Engine instance with configurable parameters.
115 ///
116 /// # Arguments
117 /// * `workflows` - The workflows to use for processing messages
118 /// * `custom_functions` - Optional custom async function handlers
119 ///
120 /// # Example
121 ///
122 /// ```
123 /// use dataflow_rs::{Engine, Workflow};
124 ///
125 /// let workflows = vec![Workflow::from_json(r#"{"id": "test", "name": "Test", "priority": 0, "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#).unwrap()];
126 ///
127 /// // Simple usage with defaults
128 /// let engine = Engine::new(workflows, None);
129 /// ```
130 pub fn new(
131 workflows: Vec<Workflow>,
132 custom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
133 ) -> Self {
134 // Compile workflows with DataLogic v4
135 let mut compiler = LogicCompiler::new();
136 let workflow_map = compiler.compile_workflows(workflows);
137 let (datalogic, logic_cache) = compiler.into_parts();
138
139 let mut task_functions = custom_functions.unwrap_or_default();
140
141 // Add built-in async function handlers
142 for (name, handler) in functions::builtins::get_all_functions() {
143 task_functions.insert(name, handler);
144 }
145
146 // Create internal executor with shared DataLogic and compiled logic
147 let internal_executor = Arc::new(InternalExecutor::new(
148 Arc::clone(&datalogic),
149 logic_cache.clone(),
150 ));
151
152 // Create task executor
153 let task_executor = Arc::new(TaskExecutor::new(
154 Arc::new(task_functions),
155 Arc::clone(&internal_executor),
156 Arc::clone(&datalogic),
157 ));
158
159 // Create workflow executor
160 let workflow_executor = Arc::new(WorkflowExecutor::new(task_executor, internal_executor));
161
162 Self {
163 workflows: Arc::new(workflow_map),
164 workflow_executor,
165 datalogic,
166 logic_cache,
167 }
168 }
169
170 /// Processes a message through workflows that match their conditions.
171 ///
172 /// This async method:
173 /// 1. Iterates through workflows sequentially in deterministic order (sorted by ID)
174 /// 2. Delegates workflow execution to the WorkflowExecutor
175 /// 3. Updates message metadata
176 ///
177 /// # Arguments
178 /// * `message` - The message to process through workflows
179 ///
180 /// # Returns
181 /// * `Result<()>` - Ok(()) if processing succeeded, Err if a fatal error occurred
182 pub async fn process_message(&self, message: &mut Message) -> Result<()> {
183 // Set processing metadata
184 message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
185 message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
186 message.invalidate_context_cache();
187
188 // Sort workflows by priority for proper execution order
189 let mut workflows: Vec<_> = self.workflows.values().collect();
190 workflows.sort_by_key(|w| w.priority);
191
192 // Process each workflow in priority order
193 for workflow in workflows {
194 // Execute workflow through the workflow executor
195 self.workflow_executor.execute(workflow, message).await?;
196 }
197
198 Ok(())
199 }
200
201 /// Get a reference to the workflows
202 pub fn workflows(&self) -> &Arc<HashMap<String, Workflow>> {
203 &self.workflows
204 }
205
206 /// Get a reference to the DataLogic instance
207 pub fn datalogic(&self) -> &Arc<DataLogic> {
208 &self.datalogic
209 }
210
211 /// Get a reference to the compiled logic cache
212 pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
213 &self.logic_cache
214 }
215}