dataflow_rs/engine/executor.rs
1//! # Internal Function Execution Module
2//!
3//! This module handles the efficient execution of built-in functions (map and validation)
4//! using pre-compiled logic from DataLogic v4. It provides optimized execution paths for:
5//!
6//! - Data transformations with JSONLogic mappings
7//! - Rule-based validation with custom error messages
8//! - Efficient condition evaluation for workflows and tasks
9//! - Thread-safe execution using Arc<CompiledLogic>
10
11use crate::engine::error::Result;
12use crate::engine::functions::{FilterConfig, LogConfig, MapConfig, ValidationConfig};
13use crate::engine::message::{Change, Message};
14use datalogic_rs::{CompiledLogic, DataLogic};
15use log::error;
16use serde_json::Value;
17use std::sync::Arc;
18
19/// Executes internal functions using pre-compiled logic for optimal performance.
20///
21/// The `InternalExecutor` provides:
22/// - Efficient execution of map transformations using compiled logic
23/// - Fast validation rule evaluation with detailed error reporting
24/// - Condition evaluation for workflow and task control flow
25/// - Thread-safe operation via Arc-wrapped compiled logic
26pub struct InternalExecutor {
27 /// Shared DataLogic instance for evaluation
28 datalogic: Arc<DataLogic>,
29 /// Reference to the compiled logic cache
30 logic_cache: Vec<Arc<CompiledLogic>>,
31}
32
33impl InternalExecutor {
34 /// Create a new InternalExecutor with DataLogic v4
35 pub fn new(datalogic: Arc<DataLogic>, logic_cache: Vec<Arc<CompiledLogic>>) -> Self {
36 Self {
37 datalogic,
38 logic_cache,
39 }
40 }
41
42 /// Get a reference to the DataLogic instance
43 pub fn datalogic(&self) -> &Arc<DataLogic> {
44 &self.datalogic
45 }
46
47 /// Get a reference to the logic cache
48 pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
49 &self.logic_cache
50 }
51
52 /// Execute the internal map function with optimized data handling
53 pub fn execute_map(
54 &self,
55 message: &mut Message,
56 config: &MapConfig,
57 ) -> Result<(usize, Vec<Change>)> {
58 config.execute(message, &self.datalogic, &self.logic_cache)
59 }
60
61 /// Execute the internal map function with trace support (captures per-mapping context snapshots)
62 pub fn execute_map_with_trace(
63 &self,
64 message: &mut Message,
65 config: &MapConfig,
66 ) -> Result<(usize, Vec<Change>, Vec<Value>)> {
67 config.execute_with_trace(message, &self.datalogic, &self.logic_cache)
68 }
69
70 /// Execute the internal validation function
71 pub fn execute_validation(
72 &self,
73 message: &mut Message,
74 config: &ValidationConfig,
75 ) -> Result<(usize, Vec<Change>)> {
76 config.execute(message, &self.datalogic, &self.logic_cache)
77 }
78
79 /// Execute the internal log function
80 pub fn execute_log(
81 &self,
82 message: &mut Message,
83 config: &LogConfig,
84 ) -> Result<(usize, Vec<Change>)> {
85 config.execute(message, &self.datalogic, &self.logic_cache)
86 }
87
88 /// Execute the internal filter function
89 pub fn execute_filter(
90 &self,
91 message: &mut Message,
92 config: &FilterConfig,
93 ) -> Result<(usize, Vec<Change>)> {
94 config.execute(message, &self.datalogic, &self.logic_cache)
95 }
96
97 /// Evaluate a condition using cached compiled logic
98 /// The context passed here contains data, metadata, temp_data, and payload
99 /// Conditions can now access any field: metadata.field, data.field, temp_data.field
100 pub fn evaluate_condition(
101 &self,
102 condition_index: Option<usize>,
103 context: Arc<Value>,
104 ) -> Result<bool> {
105 match condition_index {
106 Some(index) if index < self.logic_cache.len() => {
107 let compiled_logic = &self.logic_cache[index];
108 // Evaluate condition against the full context
109 let result = self.datalogic.evaluate(compiled_logic, context);
110
111 match result {
112 Ok(value) => {
113 // Check for explicit boolean true, same as validation function
114 Ok(value == Value::Bool(true))
115 }
116 Err(e) => {
117 error!("Failed to evaluate condition: {:?}", e);
118 Ok(false)
119 }
120 }
121 }
122 _ => Ok(true), // No condition means always true
123 }
124 }
125}