dataflow_rs/engine/executor.rs
1//! # Internal Function Execution Module
2//!
3//! This module handles the efficient execution of built-in functions (map and validation)
4//! using pre-compiled logic from DataLogic v4. It provides optimized execution paths for:
5//!
6//! - Data transformations with JSONLogic mappings
7//! - Rule-based validation with custom error messages
8//! - Efficient condition evaluation for workflows and tasks
9//! - Thread-safe execution using Arc<CompiledLogic>
10
11use crate::engine::error::Result;
12use crate::engine::functions::{MapConfig, ValidationConfig};
13use crate::engine::message::{Change, Message};
14use datalogic_rs::{CompiledLogic, DataLogic};
15use log::error;
16use serde_json::Value;
17use std::sync::Arc;
18
19/// Executes internal functions using pre-compiled logic for optimal performance.
20///
21/// The `InternalExecutor` provides:
22/// - Efficient execution of map transformations using compiled logic
23/// - Fast validation rule evaluation with detailed error reporting
24/// - Condition evaluation for workflow and task control flow
25/// - Thread-safe operation via Arc-wrapped compiled logic
26pub struct InternalExecutor {
27 /// Shared DataLogic instance for evaluation
28 datalogic: Arc<DataLogic>,
29 /// Reference to the compiled logic cache
30 logic_cache: Vec<Arc<CompiledLogic>>,
31}
32
33impl InternalExecutor {
34 /// Create a new InternalExecutor with DataLogic v4
35 pub fn new(datalogic: Arc<DataLogic>, logic_cache: Vec<Arc<CompiledLogic>>) -> Self {
36 Self {
37 datalogic,
38 logic_cache,
39 }
40 }
41
42 /// Get a reference to the DataLogic instance
43 pub fn datalogic(&self) -> &Arc<DataLogic> {
44 &self.datalogic
45 }
46
47 /// Get a reference to the logic cache
48 pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
49 &self.logic_cache
50 }
51
52 /// Execute the internal map function with optimized data handling
53 pub fn execute_map(
54 &self,
55 message: &mut Message,
56 config: &MapConfig,
57 ) -> Result<(usize, Vec<Change>)> {
58 config.execute(message, &self.datalogic, &self.logic_cache)
59 }
60
61 /// Execute the internal map function with trace support (captures per-mapping context snapshots)
62 pub fn execute_map_with_trace(
63 &self,
64 message: &mut Message,
65 config: &MapConfig,
66 ) -> Result<(usize, Vec<Change>, Vec<Value>)> {
67 config.execute_with_trace(message, &self.datalogic, &self.logic_cache)
68 }
69
70 /// Execute the internal validation function
71 pub fn execute_validation(
72 &self,
73 message: &mut Message,
74 config: &ValidationConfig,
75 ) -> Result<(usize, Vec<Change>)> {
76 config.execute(message, &self.datalogic, &self.logic_cache)
77 }
78
79 /// Evaluate a condition using cached compiled logic
80 /// The context passed here contains data, metadata, temp_data, and payload
81 /// Conditions can now access any field: metadata.field, data.field, temp_data.field
82 pub fn evaluate_condition(
83 &self,
84 condition_index: Option<usize>,
85 context: Arc<Value>,
86 ) -> Result<bool> {
87 match condition_index {
88 Some(index) if index < self.logic_cache.len() => {
89 let compiled_logic = &self.logic_cache[index];
90 // Evaluate condition against the full context
91 let result = self.datalogic.evaluate(compiled_logic, context);
92
93 match result {
94 Ok(value) => {
95 // Check for explicit boolean true, same as validation function
96 Ok(value == Value::Bool(true))
97 }
98 Err(e) => {
99 error!("Failed to evaluate condition: {:?}", e);
100 Ok(false)
101 }
102 }
103 }
104 _ => Ok(true), // No condition means always true
105 }
106 }
107}