dataflow_rs/engine/
executor.rs

1//! # Internal Function Execution Module
2//!
3//! This module handles the efficient execution of built-in functions (map and validation)
4//! using pre-compiled logic from DataLogic v4. It provides optimized execution paths for:
5//!
6//! - Data transformations with JSONLogic mappings
7//! - Rule-based validation with custom error messages
8//! - Efficient condition evaluation for workflows and tasks
9//! - Thread-safe execution using Arc<CompiledLogic>
10
11use crate::engine::error::Result;
12use crate::engine::functions::{MapConfig, ValidationConfig};
13use crate::engine::message::{Change, Message};
14use datalogic_rs::{CompiledLogic, DataLogic};
15use log::error;
16use serde_json::Value;
17use std::sync::Arc;
18
19/// Executes internal functions using pre-compiled logic for optimal performance.
20///
21/// The `InternalExecutor` provides:
22/// - Efficient execution of map transformations using compiled logic
23/// - Fast validation rule evaluation with detailed error reporting
24/// - Condition evaluation for workflow and task control flow
25/// - Thread-safe operation via Arc-wrapped compiled logic
26pub struct InternalExecutor {
27    /// Shared DataLogic instance for evaluation
28    datalogic: Arc<DataLogic>,
29    /// Reference to the compiled logic cache
30    logic_cache: Vec<Arc<CompiledLogic>>,
31}
32
33impl InternalExecutor {
34    /// Create a new InternalExecutor with DataLogic v4
35    pub fn new(datalogic: Arc<DataLogic>, logic_cache: Vec<Arc<CompiledLogic>>) -> Self {
36        Self {
37            datalogic,
38            logic_cache,
39        }
40    }
41
42    /// Get a reference to the DataLogic instance
43    pub fn datalogic(&self) -> &Arc<DataLogic> {
44        &self.datalogic
45    }
46
47    /// Get a reference to the logic cache
48    pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
49        &self.logic_cache
50    }
51
52    /// Execute the internal map function with optimized data handling
53    pub fn execute_map(
54        &self,
55        message: &mut Message,
56        config: &MapConfig,
57    ) -> Result<(usize, Vec<Change>)> {
58        config.execute(message, &self.datalogic, &self.logic_cache)
59    }
60
61    /// Execute the internal validation function
62    pub fn execute_validation(
63        &self,
64        message: &mut Message,
65        config: &ValidationConfig,
66    ) -> Result<(usize, Vec<Change>)> {
67        config.execute(message, &self.datalogic, &self.logic_cache)
68    }
69
70    /// Evaluate a condition using cached compiled logic
71    /// The context passed here contains data, metadata, temp_data, and payload
72    /// Conditions can now access any field: metadata.field, data.field, temp_data.field
73    pub fn evaluate_condition(
74        &self,
75        condition_index: Option<usize>,
76        context: Arc<Value>,
77    ) -> Result<bool> {
78        match condition_index {
79            Some(index) if index < self.logic_cache.len() => {
80                let compiled_logic = &self.logic_cache[index];
81                // Evaluate condition against the full context
82                let result = self.datalogic.evaluate(compiled_logic, context);
83
84                match result {
85                    Ok(value) => {
86                        // Check for explicit boolean true, same as validation function
87                        Ok(value == Value::Bool(true))
88                    }
89                    Err(e) => {
90                        error!("Failed to evaluate condition: {:?}", e);
91                        Ok(false)
92                    }
93                }
94            }
95            _ => Ok(true), // No condition means always true
96        }
97    }
98}