Skip to main content

dataflow_rs/engine/
mod.rs

1/*!
2# Engine Module
3
4This module implements the core async workflow engine for dataflow-rs. The engine provides
5high-performance, asynchronous message processing through workflows composed of tasks.
6
7## Architecture
8
9The engine features a clean async-first architecture with DataLogic v4:
10- **Compiler**: Pre-compiles JSONLogic expressions using DataLogic v4's Arc<CompiledLogic>
11- **Executor**: Handles internal function execution (map, validation) with async support
12- **Engine**: Orchestrates workflow processing with shared compiled logic
13- **Thread-Safe**: Single DataLogic instance with Arc-wrapped compiled logic for zero-copy sharing
14
15## Key Components
16
17- **Engine**: Async engine optimized for Tokio runtime with mixed I/O and CPU workloads
18- **LogicCompiler**: Compiles and caches JSONLogic expressions during initialization
19- **InternalExecutor**: Executes built-in map and validation functions with compiled logic
20- **Workflow**: Collection of tasks with JSONLogic conditions (can access data, metadata, temp_data)
21- **Task**: Individual processing unit that performs a specific function on a message
22- **AsyncFunctionHandler**: Trait for custom async processing logic
23- **Message**: Data structure flowing through the engine with audit trail
24
25## Performance Optimizations
26
27- **Pre-compilation**: All JSONLogic expressions compiled at startup
28- **Arc-wrapped Logic**: Zero-copy sharing of compiled logic across async tasks
29- **Spawn Blocking**: CPU-intensive JSONLogic evaluation in blocking tasks
30- **True Async**: I/O operations remain fully async
31
32## Usage
33
34```rust,no_run
35use dataflow_rs::{Engine, Workflow, engine::message::Message};
36use serde_json::json;
37
38#[tokio::main]
39async fn main() -> Result<(), Box<dyn std::error::Error>> {
40    // Define workflows
41    let workflows = vec![
42        Workflow::from_json(r#"{"id": "example", "name": "Example", "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#)?
43    ];
44
45    // Create engine with defaults
46    let engine = Engine::new(workflows, None);
47
48    // Process messages asynchronously
49    let mut message = Message::from_value(&json!({}));
50    engine.process_message(&mut message).await?;
51
52    Ok(())
53}
54```
55*/
56
57pub mod compiler;
58pub mod error;
59pub mod executor;
60pub mod functions;
61pub mod message;
62pub mod task;
63pub mod task_executor;
64pub mod trace;
65pub mod utils;
66pub mod workflow;
67pub mod workflow_executor;
68
69// Re-export key types for easier access
70pub use error::{DataflowError, ErrorInfo, Result};
71pub use functions::{AsyncFunctionHandler, FunctionConfig};
72pub use message::Message;
73pub use task::Task;
74pub use trace::{ExecutionStep, ExecutionTrace, StepResult};
75pub use workflow::{Workflow, WorkflowStatus};
76
77use chrono::Utc;
78use datalogic_rs::{CompiledLogic, DataLogic};
79use serde_json::json;
80use std::collections::HashMap;
81use std::sync::Arc;
82
83use compiler::LogicCompiler;
84use executor::InternalExecutor;
85use task_executor::TaskExecutor;
86use workflow_executor::WorkflowExecutor;
87
88/// High-performance async workflow engine for message processing.
89///
90/// ## Architecture
91///
92/// The engine is designed for async-first operation with Tokio:
93/// - **Separation of Concerns**: Distinct executors for workflows and tasks
94/// - **Shared DataLogic**: Single DataLogic instance with Arc for thread-safe sharing
95/// - **Arc<CompiledLogic>**: Pre-compiled logic shared across all async tasks
96/// - **Async Functions**: Native async support for I/O-bound operations
97///
98/// ## Performance Characteristics
99///
100/// - **Zero Runtime Compilation**: All logic compiled during initialization
101/// - **Zero-Copy Sharing**: Arc-wrapped compiled logic shared without cloning
102/// - **Optimal for Mixed Workloads**: Async I/O with blocking CPU evaluation
103/// - **Thread-Safe by Design**: All components safe to share across Tokio tasks
104pub struct Engine {
105    /// Registry of available workflows, pre-sorted by priority (immutable after initialization)
106    workflows: Arc<Vec<Workflow>>,
107    /// Channel index: maps channel name -> indices into workflows vec (only Active workflows)
108    channel_index: Arc<HashMap<String, Vec<usize>>>,
109    /// Workflow executor for orchestrating workflow execution
110    workflow_executor: Arc<WorkflowExecutor>,
111    /// Shared DataLogic instance for JSONLogic evaluation
112    datalogic: Arc<DataLogic>,
113    /// Compiled logic cache with Arc for zero-copy sharing
114    logic_cache: Vec<Arc<CompiledLogic>>,
115}
116
117/// Build a channel index from pre-sorted workflows.
118/// Maps channel name -> indices into workflows vec, only for Active workflows.
119fn build_channel_index(workflows: &[Workflow]) -> HashMap<String, Vec<usize>> {
120    let mut index: HashMap<String, Vec<usize>> = HashMap::new();
121    for (i, workflow) in workflows.iter().enumerate() {
122        if workflow.status == WorkflowStatus::Active {
123            index.entry(workflow.channel.clone()).or_default().push(i);
124        }
125    }
126    index
127}
128
129impl Engine {
130    /// Creates a new Engine instance with configurable parameters.
131    ///
132    /// # Arguments
133    /// * `workflows` - The workflows to use for processing messages
134    /// * `custom_functions` - Optional custom async function handlers
135    ///
136    /// # Example
137    ///
138    /// ```
139    /// use dataflow_rs::{Engine, Workflow};
140    ///
141    /// let workflows = vec![Workflow::from_json(r#"{"id": "test", "name": "Test", "priority": 0, "tasks": [{"id": "task1", "name": "Task 1", "function": {"name": "map", "input": {"mappings": []}}}]}"#).unwrap()];
142    ///
143    /// // Simple usage with defaults
144    /// let engine = Engine::new(workflows, None);
145    /// ```
146    pub fn new(
147        workflows: Vec<Workflow>,
148        custom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>,
149    ) -> Self {
150        // Compile workflows with DataLogic v4 (sorted by priority at compile time)
151        let mut compiler = LogicCompiler::new();
152        let sorted_workflows = compiler.compile_workflows(workflows);
153        let (datalogic, logic_cache) = compiler.into_parts();
154
155        let mut task_functions = custom_functions.unwrap_or_default();
156
157        // Add built-in async function handlers
158        for (name, handler) in functions::builtins::get_all_functions() {
159            task_functions.insert(name, handler);
160        }
161
162        // Create internal executor with shared DataLogic and compiled logic
163        let internal_executor = Arc::new(InternalExecutor::new(
164            Arc::clone(&datalogic),
165            logic_cache.clone(),
166        ));
167
168        // Create task executor
169        let task_executor = Arc::new(TaskExecutor::new(
170            Arc::new(task_functions),
171            Arc::clone(&internal_executor),
172            Arc::clone(&datalogic),
173        ));
174
175        // Create workflow executor
176        let workflow_executor = Arc::new(WorkflowExecutor::new(task_executor, internal_executor));
177
178        // Build channel index for O(1) channel-based routing
179        let channel_index = build_channel_index(&sorted_workflows);
180
181        Self {
182            workflows: Arc::new(sorted_workflows),
183            channel_index: Arc::new(channel_index),
184            workflow_executor,
185            datalogic,
186            logic_cache,
187        }
188    }
189
190    /// Creates a new Engine with different workflows but the same custom function handlers.
191    ///
192    /// This is the hot-reload path. The existing engine remains valid for any
193    /// in-flight `process_message` calls. The returned engine shares the same
194    /// function registry (zero-copy Arc bump) but has freshly compiled logic
195    /// for the new workflow set.
196    ///
197    /// # Arguments
198    /// * `workflows` - The new set of workflows to compile and use
199    pub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Self {
200        // Extract the shared function registry from the existing executor
201        let task_functions = self.workflow_executor.task_functions();
202
203        // Compile new workflows with a fresh DataLogic instance
204        let mut compiler = LogicCompiler::new();
205        let sorted_workflows = compiler.compile_workflows(workflows);
206        let (datalogic, logic_cache) = compiler.into_parts();
207
208        // Rebuild the executor stack, reusing the existing function registry
209        let internal_executor = Arc::new(InternalExecutor::new(
210            Arc::clone(&datalogic),
211            logic_cache.clone(),
212        ));
213
214        let task_executor = Arc::new(TaskExecutor::new(
215            task_functions,
216            Arc::clone(&internal_executor),
217            Arc::clone(&datalogic),
218        ));
219
220        let workflow_executor = Arc::new(WorkflowExecutor::new(task_executor, internal_executor));
221
222        // Build channel index for O(1) channel-based routing
223        let channel_index = build_channel_index(&sorted_workflows);
224
225        Self {
226            workflows: Arc::new(sorted_workflows),
227            channel_index: Arc::new(channel_index),
228            workflow_executor,
229            datalogic,
230            logic_cache,
231        }
232    }
233
234    /// Processes a message through workflows that match their conditions.
235    ///
236    /// This async method:
237    /// 1. Iterates through workflows sequentially in priority order (pre-sorted at construction)
238    /// 2. Delegates workflow execution to the WorkflowExecutor
239    /// 3. Updates message metadata
240    ///
241    /// # Arguments
242    /// * `message` - The message to process through workflows
243    ///
244    /// # Returns
245    /// * `Result<()>` - Ok(()) if processing succeeded, Err if a fatal error occurred
246    pub async fn process_message(&self, message: &mut Message) -> Result<()> {
247        // Set processing metadata
248        message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
249        message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
250        message.invalidate_context_cache();
251
252        // Process each workflow in priority order (pre-sorted at construction)
253        for workflow in self.workflows.iter() {
254            self.workflow_executor.execute(workflow, message).await?;
255        }
256
257        Ok(())
258    }
259
260    /// Processes a message through workflows with step-by-step tracing.
261    ///
262    /// This method is similar to `process_message` but captures an execution trace
263    /// that can be used for debugging and step-by-step visualization.
264    ///
265    /// # Arguments
266    /// * `message` - The message to process through workflows
267    ///
268    /// # Returns
269    /// * `Result<ExecutionTrace>` - The execution trace with message snapshots
270    pub async fn process_message_with_trace(
271        &self,
272        message: &mut Message,
273    ) -> Result<ExecutionTrace> {
274        use trace::ExecutionTrace;
275
276        // Set processing metadata
277        message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
278        message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
279        message.invalidate_context_cache();
280
281        let mut trace = ExecutionTrace::new();
282
283        // Process each workflow in priority order (pre-sorted at construction)
284        for workflow in self.workflows.iter() {
285            self.workflow_executor
286                .execute_with_trace(workflow, message, &mut trace)
287                .await?;
288        }
289
290        Ok(trace)
291    }
292
293    /// Processes a message through only the Active workflows registered for a given channel.
294    ///
295    /// Workflows are processed in priority order (lowest first), same as process_message().
296    /// If the channel does not exist or has no Active workflows, this is a no-op.
297    ///
298    /// # Arguments
299    /// * `channel` - The channel name to route the message through
300    /// * `message` - The message to process
301    pub async fn process_message_for_channel(
302        &self,
303        channel: &str,
304        message: &mut Message,
305    ) -> Result<()> {
306        message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
307        message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
308        message.context["metadata"]["channel"] = json!(channel);
309        message.invalidate_context_cache();
310
311        if let Some(indices) = self.channel_index.get(channel) {
312            for &idx in indices {
313                self.workflow_executor
314                    .execute(&self.workflows[idx], message)
315                    .await?;
316            }
317        }
318
319        Ok(())
320    }
321
322    /// Processes a message through a channel with step-by-step tracing.
323    ///
324    /// # Arguments
325    /// * `channel` - The channel name to route the message through
326    /// * `message` - The message to process
327    pub async fn process_message_for_channel_with_trace(
328        &self,
329        channel: &str,
330        message: &mut Message,
331    ) -> Result<ExecutionTrace> {
332        use trace::ExecutionTrace;
333
334        message.context["metadata"]["processed_at"] = json!(Utc::now().to_rfc3339());
335        message.context["metadata"]["engine_version"] = json!(env!("CARGO_PKG_VERSION"));
336        message.context["metadata"]["channel"] = json!(channel);
337        message.invalidate_context_cache();
338
339        let mut trace = ExecutionTrace::new();
340
341        if let Some(indices) = self.channel_index.get(channel) {
342            for &idx in indices {
343                self.workflow_executor
344                    .execute_with_trace(&self.workflows[idx], message, &mut trace)
345                    .await?;
346            }
347        }
348
349        Ok(trace)
350    }
351
352    /// Get a reference to the workflows (pre-sorted by priority)
353    pub fn workflows(&self) -> &Arc<Vec<Workflow>> {
354        &self.workflows
355    }
356
357    /// Look up a workflow by its ID
358    pub fn workflow_by_id(&self, id: &str) -> Option<&Workflow> {
359        self.workflows.iter().find(|w| w.id == id)
360    }
361
362    /// Get a reference to the DataLogic instance
363    pub fn datalogic(&self) -> &Arc<DataLogic> {
364        &self.datalogic
365    }
366
367    /// Get a reference to the compiled logic cache
368    pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
369        &self.logic_cache
370    }
371}