dataflow_rs/engine/
executor.rs

1//! # Internal Function Execution Module
2//!
3//! This module handles the efficient execution of built-in functions (map and validation)
4//! using pre-compiled logic. It provides optimized execution paths for:
5//!
6//! - Data transformations with JSONLogic mappings
7//! - Rule-based validation with custom error messages
8//! - Efficient condition evaluation for workflows and tasks
9//! - Minimal allocation through lazy data structure initialization
10
11use crate::engine::error::{DataflowError, ErrorInfo, Result};
12use crate::engine::functions::{MapConfig, ValidationConfig};
13use crate::engine::message::{Change, Message};
14use datalogic_rs::value::ToJson;
15use datalogic_rs::{DataLogic, Logic};
16use log::{debug, error};
17use serde_json::{Value, json};
18
19/// Executes internal functions using pre-compiled logic for optimal performance.
20///
21/// The `InternalExecutor` provides:
22/// - Efficient execution of map transformations using compiled logic
23/// - Fast validation rule evaluation with detailed error reporting
24/// - Condition evaluation for workflow and task control flow
25/// - Lazy initialization to avoid unnecessary allocations
26///
27/// By using pre-compiled logic from the cache, the executor avoids all
28/// runtime compilation overhead, ensuring predictable low-latency execution.
29pub struct InternalExecutor<'a> {
30    /// Reference to the DataLogic instance for evaluation
31    datalogic: &'a DataLogic<'static>,
32    /// Reference to the compiled logic cache
33    logic_cache: &'a Vec<Logic<'static>>,
34}
35
36impl<'a> InternalExecutor<'a> {
37    /// Create a new InternalExecutor
38    pub fn new(datalogic: &'a DataLogic<'static>, logic_cache: &'a Vec<Logic<'static>>) -> Self {
39        Self {
40            datalogic,
41            logic_cache,
42        }
43    }
44
45    /// Execute the internal map function with optimized data handling
46    pub fn execute_map(
47        &self,
48        message: &mut Message,
49        config: &MapConfig,
50    ) -> Result<(usize, Vec<Change>)> {
51        let mut changes = Vec::with_capacity(config.mappings.len());
52
53        // Check if we need parsed data for compiled logic
54        let needs_parsed_data = config.mappings.iter().any(|m| m.logic_index.is_some());
55
56        // Determine if we need data for evaluation
57        let needs_data = needs_parsed_data
58            || config
59                .mappings
60                .iter()
61                .any(|m| m.logic.is_object() || m.logic.is_array());
62
63        // Initialize mutable data_for_eval if needed
64        let mut data_for_eval = if needs_data {
65            Some(json!({
66                "data": &message.data,
67                "metadata": &message.metadata,
68                "temp_data": &message.temp_data,
69            }))
70        } else {
71            None
72        };
73
74        for mapping in &config.mappings {
75            let target_path = &mapping.path;
76            let logic = &mapping.logic;
77
78            // Parse data for compiled logic evaluation if needed
79            let parsed_data = if let Some(logic_index) = mapping.logic_index {
80                if logic_index < self.logic_cache.len() && data_for_eval.is_some() {
81                    Some(
82                        self.datalogic
83                            .parse_data_json(data_for_eval.as_ref().unwrap())
84                            .map_err(|e| {
85                                error!("Failed to parse data for evaluation: {e:?}");
86                                DataflowError::LogicEvaluation(format!("Error parsing data: {}", e))
87                            })?,
88                    )
89                } else {
90                    None
91                }
92            } else {
93                None
94            };
95
96            // Evaluate using compiled logic if available
97            let result = if let Some(logic_index) = mapping.logic_index {
98                if logic_index < self.logic_cache.len() && parsed_data.is_some() {
99                    let compiled_logic = &self.logic_cache[logic_index];
100                    let eval_result = self
101                        .datalogic
102                        .evaluate(compiled_logic, parsed_data.as_ref().unwrap())
103                        .map_err(|e| {
104                            error!("Failed to evaluate compiled logic: {e:?}");
105                            DataflowError::LogicEvaluation(format!("Error evaluating logic: {}", e))
106                        })?;
107                    eval_result.to_json()
108                } else if let Some(ref data) = data_for_eval {
109                    self.datalogic.evaluate_json(logic, data).map_err(|e| {
110                        error!("Failed to evaluate logic: {e:?}");
111                        DataflowError::LogicEvaluation(format!("Error evaluating logic: {}", e))
112                    })?
113                } else {
114                    // Fallback for edge cases
115                    logic.clone()
116                }
117            } else {
118                // For simple values (strings, numbers, etc.), just clone them directly
119                if !logic.is_object() && !logic.is_array() {
120                    logic.clone()
121                } else if let Some(ref data) = data_for_eval {
122                    self.datalogic.evaluate_json(logic, data).map_err(|e| {
123                        error!("Failed to evaluate logic: {e:?}");
124                        DataflowError::LogicEvaluation(format!("Error evaluating logic: {}", e))
125                    })?
126                } else {
127                    logic.clone()
128                }
129            };
130
131            if result.is_null() {
132                continue;
133            }
134
135            // Determine target object based on path prefix
136            let (target_object, adjusted_path) = self.resolve_target_path(message, target_path);
137
138            // Set the result at the target path
139            let old_value = self.set_value_at_path(target_object, adjusted_path, &result)?;
140
141            changes.push(Change {
142                path: target_path.clone(),
143                old_value,
144                new_value: result.clone(),
145            });
146
147            // Update data_for_eval with the new value so subsequent mappings see the changes
148            if let Some(ref mut data) = data_for_eval {
149                // Update the appropriate field in data_for_eval based on the path
150                if let Some(adjusted_path) = target_path.strip_prefix("data.") {
151                    if let Some(data_obj) = data.get_mut("data") {
152                        let _ = self.set_value_at_path(data_obj, adjusted_path, &result);
153                    }
154                } else if let Some(adjusted_path) = target_path.strip_prefix("temp_data.") {
155                    if let Some(temp_data_obj) = data.get_mut("temp_data") {
156                        let _ = self.set_value_at_path(temp_data_obj, adjusted_path, &result);
157                    }
158                } else if let Some(adjusted_path) = target_path.strip_prefix("metadata.")
159                    && let Some(metadata_obj) = data.get_mut("metadata")
160                {
161                    let _ = self.set_value_at_path(metadata_obj, adjusted_path, &result);
162                }
163            }
164        }
165
166        Ok((200, changes))
167    }
168
169    /// Execute the internal validation function
170    pub fn execute_validate(
171        &self,
172        message: &mut Message,
173        config: &ValidationConfig,
174    ) -> Result<(usize, Vec<Change>)> {
175        // Pre-parse data for different validation paths
176        let data_json = json!({"data": &message.data});
177        let metadata_json = json!({"metadata": &message.metadata});
178        let temp_data_json = json!({"temp_data": &message.temp_data});
179
180        // For now, we'll skip the caching optimization since DataValue has lifetime issues
181        // This will be addressed in a future optimization
182
183        for rule in &config.rules {
184            let rule_logic = &rule.logic;
185            let rule_path = &rule.path;
186            let error_message = &rule.message;
187
188            // Evaluate using compiled logic if available
189            let validation_result = if let Some(logic_index) = rule.logic_index {
190                if logic_index < self.logic_cache.len() {
191                    let compiled_logic = &self.logic_cache[logic_index];
192                    let data_to_validate = if rule_path == "data" || rule_path.starts_with("data.")
193                    {
194                        &data_json
195                    } else if rule_path == "metadata" || rule_path.starts_with("metadata.") {
196                        &metadata_json
197                    } else {
198                        &temp_data_json
199                    };
200
201                    if let Ok(data_val) = self.datalogic.parse_data_json(data_to_validate) {
202                        self.datalogic
203                            .evaluate(compiled_logic, data_val)
204                            .map(|v| v.as_bool().unwrap_or(false))
205                            .unwrap_or(false)
206                    } else {
207                        false
208                    }
209                } else {
210                    // Fallback to JSON evaluation
211                    let data_to_validate = if rule_path == "data" || rule_path.starts_with("data.")
212                    {
213                        &data_json
214                    } else if rule_path == "metadata" || rule_path.starts_with("metadata.") {
215                        &metadata_json
216                    } else {
217                        &temp_data_json
218                    };
219
220                    self.datalogic
221                        .evaluate_json(rule_logic, data_to_validate)
222                        .map(|v| v.as_bool().unwrap_or(false))
223                        .unwrap_or(false)
224                }
225            } else {
226                // Direct evaluation for non-compiled logic
227                let data_to_validate = if rule_path == "data" || rule_path.starts_with("data.") {
228                    &data_json
229                } else if rule_path == "metadata" || rule_path.starts_with("metadata.") {
230                    &metadata_json
231                } else {
232                    &temp_data_json
233                };
234
235                self.datalogic
236                    .evaluate_json(rule_logic, data_to_validate)
237                    .map(|v| v.as_bool().unwrap_or(false))
238                    .unwrap_or(false)
239            };
240
241            if !validation_result {
242                debug!("Validation failed: {}", error_message);
243
244                // Store the validation error
245                message.errors.push(ErrorInfo::new(
246                    None,
247                    None,
248                    DataflowError::Validation(error_message.clone()),
249                ));
250
251                // Store validation failure in temp_data
252                self.record_validation_error(message, error_message);
253            }
254        }
255
256        // Check if any validation errors occurred
257        let has_validation_errors = message
258            .temp_data
259            .get("validation_errors")
260            .and_then(|v| v.as_array())
261            .map(|arr| !arr.is_empty())
262            .unwrap_or(false);
263
264        let status = if has_validation_errors { 400 } else { 200 };
265        Ok((status, vec![]))
266    }
267
268    /// Evaluate logic using compiled index or direct evaluation
269    pub fn evaluate_logic(
270        &self,
271        logic_index: Option<usize>,
272        logic: &Value,
273        data: &Value,
274    ) -> Result<Value> {
275        if let Some(index) = logic_index {
276            debug!("Using compiled logic at index {}", index);
277            if index < self.logic_cache.len() {
278                let compiled_logic = &self.logic_cache[index];
279                let data_value = self.datalogic.parse_data_json(data).map_err(|e| {
280                    DataflowError::LogicEvaluation(format!("Error parsing data: {}", e))
281                })?;
282
283                let result = self
284                    .datalogic
285                    .evaluate(compiled_logic, data_value)
286                    .map_err(|e| {
287                        DataflowError::LogicEvaluation(format!("Error evaluating logic: {}", e))
288                    })?;
289
290                Ok(result.to_json())
291            } else {
292                Err(DataflowError::LogicEvaluation(format!(
293                    "Logic index {} out of bounds",
294                    index
295                )))
296            }
297        } else {
298            debug!("Evaluating logic directly (not compiled): {:?}", logic);
299            self.datalogic.evaluate_json(logic, data).map_err(|e| {
300                DataflowError::LogicEvaluation(format!("Error evaluating logic: {}", e))
301            })
302        }
303    }
304
305    /// Evaluate a condition
306    pub fn evaluate_condition(
307        &self,
308        condition_index: Option<usize>,
309        condition: &Value,
310        data: &Value,
311    ) -> Result<bool> {
312        // Short-circuit for simple boolean conditions
313        if let Value::Bool(b) = condition {
314            debug!("Evaluating simple boolean condition: {}", b);
315            return Ok(*b);
316        }
317
318        if let Some(index) = condition_index {
319            debug!("Using compiled logic at index {}", index);
320            if index < self.logic_cache.len() {
321                let logic = &self.logic_cache[index];
322                let data_value = self.datalogic.parse_data_json(data).map_err(|e| {
323                    DataflowError::LogicEvaluation(format!("Error parsing data: {}", e))
324                })?;
325
326                let result = self
327                    .datalogic
328                    .evaluate(logic, data_value)
329                    .map(|result| result.as_bool().unwrap_or(false))
330                    .map_err(|e| {
331                        DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
332                    });
333                debug!("Compiled logic evaluation result: {:?}", result);
334                result
335            } else {
336                Err(DataflowError::LogicEvaluation(format!(
337                    "Condition index {} out of bounds",
338                    index
339                )))
340            }
341        } else {
342            debug!(
343                "Evaluating condition directly (not compiled): {:?}",
344                condition
345            );
346            let result = self
347                .datalogic
348                .evaluate_json(condition, data)
349                .map(|result| result.as_bool().unwrap_or(false))
350                .map_err(|e| {
351                    DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
352                });
353            debug!("Direct evaluation result: {:?}", result);
354            result
355        }
356    }
357
358    /// Resolve target path for message field
359    fn resolve_target_path<'b>(
360        &self,
361        message: &'b mut Message,
362        target_path: &'b str,
363    ) -> (&'b mut Value, &'b str) {
364        if let Some(path) = target_path.strip_prefix("data.") {
365            (&mut message.data, path)
366        } else if let Some(path) = target_path.strip_prefix("metadata.") {
367            (&mut message.metadata, path)
368        } else if let Some(path) = target_path.strip_prefix("temp_data.") {
369            (&mut message.temp_data, path)
370        } else if target_path == "data" {
371            (&mut message.data, "")
372        } else if target_path == "metadata" {
373            (&mut message.metadata, "")
374        } else if target_path == "temp_data" {
375            (&mut message.temp_data, "")
376        } else {
377            // Default to data
378            (&mut message.data, target_path)
379        }
380    }
381
382    /// Record validation error in temp_data
383    fn record_validation_error(&self, message: &mut Message, error_message: &str) {
384        if !message.temp_data.is_object() {
385            message.temp_data = json!({});
386        }
387        if let Some(obj) = message.temp_data.as_object_mut() {
388            if !obj.contains_key("validation_errors") {
389                obj.insert("validation_errors".to_string(), json!([]));
390            }
391            if let Some(errors_array) = obj
392                .get_mut("validation_errors")
393                .and_then(|v| v.as_array_mut())
394            {
395                errors_array.push(json!(error_message));
396            }
397        }
398    }
399
400    /// Set a value at the specified path within the target object
401    pub fn set_value_at_path(
402        &self,
403        target: &mut Value,
404        path: &str,
405        value: &Value,
406    ) -> Result<Value> {
407        let mut current = target;
408        let mut old_value = Value::Null;
409        let path_parts: Vec<&str> = path.split('.').collect();
410
411        // Helper function to check if a string is a valid array index
412        fn is_numeric_index(s: &str) -> bool {
413            s.parse::<usize>().is_ok()
414        }
415
416        // Empty path means replace or merge the entire value
417        if path.is_empty() {
418            old_value = current.clone();
419            // If both current and new value are objects, merge them
420            if let (Value::Object(current_obj), Value::Object(new_obj)) =
421                (current.clone(), value.clone())
422            {
423                let mut merged = current_obj;
424                for (key, val) in new_obj {
425                    merged.insert(key, val);
426                }
427                *current = Value::Object(merged);
428            } else {
429                // Otherwise, replace entirely
430                *current = value.clone();
431            }
432            return Ok(old_value);
433        }
434
435        // Navigate to the parent of the target location
436        for (i, part) in path_parts.iter().enumerate() {
437            // Strip '#' prefix if present (used to indicate numeric keys that should be treated as strings)
438            let (part_clean, force_string) = if let Some(stripped) = part.strip_prefix('#') {
439                (stripped, true)
440            } else {
441                (*part, false)
442            };
443
444            // Check if it's numeric only if not forced to be a string
445            let is_numeric = !force_string && is_numeric_index(part_clean);
446
447            if i == path_parts.len() - 1 {
448                // We're at the last part, so set the value
449                if is_numeric {
450                    // Handle numeric index - ensure current is an array
451                    if !current.is_array() {
452                        *current = Value::Array(vec![]);
453                    }
454
455                    if let Ok(index) = part_clean.parse::<usize>()
456                        && let Value::Array(arr) = current
457                    {
458                        // Extend array if needed
459                        while arr.len() <= index {
460                            arr.push(Value::Null);
461                        }
462                        old_value = arr[index].clone();
463                        arr[index] = value.clone();
464                    }
465                } else {
466                    // Handle object key
467                    if !current.is_object() {
468                        *current = json!({});
469                    }
470
471                    if let Value::Object(map) = current {
472                        old_value = map.get(part_clean).cloned().unwrap_or(Value::Null);
473
474                        // If both the existing value and new value are objects, merge them
475                        if let (Some(Value::Object(existing_obj)), Value::Object(new_obj)) =
476                            (map.get(part_clean), value)
477                        {
478                            // Create a merged object by cloning the existing and adding new fields
479                            let mut merged = existing_obj.clone();
480                            for (key, val) in new_obj {
481                                merged.insert(key.clone(), val.clone());
482                            }
483                            map.insert(part_clean.to_string(), Value::Object(merged));
484                        } else {
485                            // Otherwise, just replace the value as before
486                            map.insert(part_clean.to_string(), value.clone());
487                        }
488                    }
489                }
490            } else {
491                // Navigate deeper
492                if is_numeric {
493                    // Handle array navigation
494                    if !current.is_array() {
495                        *current = Value::Array(vec![]);
496                    }
497
498                    if let Ok(index) = part_clean.parse::<usize>()
499                        && let Value::Array(arr) = current
500                    {
501                        while arr.len() <= index {
502                            arr.push(Value::Null);
503                        }
504                        current = &mut arr[index];
505                    }
506                } else {
507                    // Handle object navigation
508                    if !current.is_object() {
509                        *current = json!({});
510                    }
511
512                    if let Value::Object(map) = current {
513                        if !map.contains_key(part_clean) {
514                            // Look ahead to see if next part is numeric to decide what to create
515                            let next_part = path_parts.get(i + 1).unwrap_or(&"");
516                            // Strip '#' prefix from next part if present when checking
517                            let next_clean = if let Some(stripped) = next_part.strip_prefix('#') {
518                                stripped
519                            } else {
520                                next_part
521                            };
522                            if is_numeric_index(next_clean) && !next_part.starts_with('#') {
523                                map.insert(part_clean.to_string(), Value::Array(vec![]));
524                            } else {
525                                map.insert(part_clean.to_string(), json!({}));
526                            }
527                        }
528                        current = map.get_mut(part_clean).unwrap();
529                    }
530                }
531            }
532        }
533
534        Ok(old_value)
535    }
536}