dataflow_rs/engine/executor.rs
1//! # Internal Function Execution Module
2//!
3//! This module handles the efficient execution of built-in functions (map and validation)
4//! using pre-compiled logic from DataLogic v4. It provides optimized execution paths for:
5//!
6//! - Data transformations with JSONLogic mappings
7//! - Rule-based validation with custom error messages
8//! - Efficient condition evaluation for workflows and tasks
9//! - Thread-safe execution using Arc<CompiledLogic>
10
11use crate::engine::error::Result;
12use crate::engine::functions::{MapConfig, ValidationConfig};
13use crate::engine::message::{Change, Message};
14use datalogic_rs::{CompiledLogic, DataLogic};
15use log::error;
16use serde_json::Value;
17use std::sync::Arc;
18
19/// Executes internal functions using pre-compiled logic for optimal performance.
20///
21/// The `InternalExecutor` provides:
22/// - Efficient execution of map transformations using compiled logic
23/// - Fast validation rule evaluation with detailed error reporting
24/// - Condition evaluation for workflow and task control flow
25/// - Thread-safe operation via Arc-wrapped compiled logic
26pub struct InternalExecutor {
27 /// Shared DataLogic instance for evaluation
28 datalogic: Arc<DataLogic>,
29 /// Reference to the compiled logic cache
30 logic_cache: Vec<Arc<CompiledLogic>>,
31}
32
33impl InternalExecutor {
34 /// Create a new InternalExecutor with DataLogic v4
35 pub fn new(datalogic: Arc<DataLogic>, logic_cache: Vec<Arc<CompiledLogic>>) -> Self {
36 Self {
37 datalogic,
38 logic_cache,
39 }
40 }
41
42 /// Get a reference to the DataLogic instance
43 pub fn datalogic(&self) -> &Arc<DataLogic> {
44 &self.datalogic
45 }
46
47 /// Get a reference to the logic cache
48 pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
49 &self.logic_cache
50 }
51
52 /// Execute the internal map function with optimized data handling
53 pub fn execute_map(
54 &self,
55 message: &mut Message,
56 config: &MapConfig,
57 ) -> Result<(usize, Vec<Change>)> {
58 config.execute(message, &self.datalogic, &self.logic_cache)
59 }
60
61 /// Execute the internal validation function
62 pub fn execute_validation(
63 &self,
64 message: &mut Message,
65 config: &ValidationConfig,
66 ) -> Result<(usize, Vec<Change>)> {
67 config.execute(message, &self.datalogic, &self.logic_cache)
68 }
69
70 /// Evaluate a condition using cached compiled logic
71 /// The context passed here contains data, metadata, temp_data, and payload
72 /// Conditions can now access any field: metadata.field, data.field, temp_data.field
73 pub fn evaluate_condition(
74 &self,
75 condition_index: Option<usize>,
76 context: Arc<Value>,
77 ) -> Result<bool> {
78 match condition_index {
79 Some(index) if index < self.logic_cache.len() => {
80 let compiled_logic = &self.logic_cache[index];
81 // Evaluate condition against the full context
82 let result = self.datalogic.evaluate(compiled_logic, context);
83
84 match result {
85 Ok(value) => {
86 // Check for explicit boolean true, same as validation function
87 Ok(value == Value::Bool(true))
88 }
89 Err(e) => {
90 error!("Failed to evaluate condition: {:?}", e);
91 Ok(false)
92 }
93 }
94 }
95 _ => Ok(true), // No condition means always true
96 }
97 }
98}