dataflow_rs/engine/
compiler.rs

1//! # Workflow Compilation Module
2//!
3//! This module handles the pre-compilation of JSONLogic expressions used throughout
4//! the engine. By compiling all logic at initialization time with DataLogic v4, we achieve:
5//!
6//! - Zero runtime compilation overhead
7//! - Thread-safe compiled logic via Arc
8//! - Early validation of logic expressions
9//! - Efficient memory sharing across async tasks
10
11use crate::engine::functions::{MapConfig, ValidationConfig};
12use crate::engine::{FunctionConfig, Workflow};
13use datalogic_rs::{CompiledLogic, DataLogic};
14use log::{debug, error};
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19/// Compiles and caches JSONLogic expressions for optimal runtime performance.
20///
21/// The `LogicCompiler` is responsible for:
22/// - Pre-compiling all workflow conditions using DataLogic v4
23/// - Pre-compiling task-specific logic (map transformations, validation rules)
24/// - Maintaining Arc-wrapped compiled logic for thread-safe sharing
25/// - Providing early validation of logic expressions
26pub struct LogicCompiler {
27    /// Shared DataLogic instance for compilation
28    datalogic: Arc<DataLogic>,
29    /// Cache of compiled logic expressions indexed by their position
30    logic_cache: Vec<Arc<CompiledLogic>>,
31}
32
33impl Default for LogicCompiler {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39impl LogicCompiler {
40    /// Create a new LogicCompiler with DataLogic v4
41    pub fn new() -> Self {
42        Self {
43            datalogic: Arc::new(DataLogic::with_preserve_structure()),
44            logic_cache: Vec::new(),
45        }
46    }
47
48    /// Get the DataLogic instance
49    pub fn datalogic(&self) -> Arc<DataLogic> {
50        Arc::clone(&self.datalogic)
51    }
52
53    /// Get the logic cache
54    pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
55        &self.logic_cache
56    }
57
58    /// Consume the compiler and return its components
59    pub fn into_parts(self) -> (Arc<DataLogic>, Vec<Arc<CompiledLogic>>) {
60        (self.datalogic, self.logic_cache)
61    }
62
63    /// Compile all workflows and their tasks
64    pub fn compile_workflows(&mut self, workflows: Vec<Workflow>) -> HashMap<String, Workflow> {
65        let mut workflow_map = HashMap::new();
66
67        for mut workflow in workflows {
68            if let Err(e) = workflow.validate() {
69                error!("Invalid workflow {}: {:?}", workflow.id, e);
70                continue;
71            }
72
73            // Parse and cache workflow condition
74            debug!(
75                "Compiling condition for workflow {}: {:?}",
76                workflow.id, workflow.condition
77            );
78            match self.compile_logic(&workflow.condition) {
79                Ok(index) => {
80                    workflow.condition_index = index;
81                    debug!(
82                        "Workflow {} condition compiled at index {:?}",
83                        workflow.id, index
84                    );
85
86                    // Compile task conditions and function logic
87                    self.compile_workflow_tasks(&mut workflow);
88
89                    workflow_map.insert(workflow.id.clone(), workflow);
90                }
91                Err(e) => {
92                    error!(
93                        "Failed to parse condition for workflow {}: {:?}",
94                        workflow.id, e
95                    );
96                }
97            }
98        }
99
100        workflow_map
101    }
102
103    /// Compile task conditions and function logic for a workflow
104    fn compile_workflow_tasks(&mut self, workflow: &mut Workflow) {
105        for task in &mut workflow.tasks {
106            // Compile task condition
107            debug!(
108                "Compiling condition for task {} in workflow {}: {:?}",
109                task.id, workflow.id, task.condition
110            );
111            match self.compile_logic(&task.condition) {
112                Ok(index) => {
113                    task.condition_index = index;
114                    debug!("Task {} condition compiled at index {:?}", task.id, index);
115                }
116                Err(e) => {
117                    error!(
118                        "Failed to parse condition for task {} in workflow {}: {:?}",
119                        task.id, workflow.id, e
120                    );
121                }
122            }
123
124            // Compile function-specific logic (map transformations, validation rules)
125            self.compile_function_logic(&mut task.function, &task.id, &workflow.id);
126        }
127    }
128
129    /// Compile function-specific logic based on function type
130    fn compile_function_logic(
131        &mut self,
132        function: &mut FunctionConfig,
133        task_id: &str,
134        workflow_id: &str,
135    ) {
136        match function {
137            FunctionConfig::Map { input, .. } => {
138                self.compile_map_logic(input, task_id, workflow_id);
139            }
140            FunctionConfig::Validation { input, .. } => {
141                self.compile_validation_logic(input, task_id, workflow_id);
142            }
143            _ => {
144                // Custom functions don't need pre-compilation
145            }
146        }
147    }
148
149    /// Compile map transformation logic
150    fn compile_map_logic(&mut self, config: &mut MapConfig, task_id: &str, workflow_id: &str) {
151        for mapping in &mut config.mappings {
152            debug!(
153                "Compiling map logic for task {} in workflow {}: {:?}",
154                task_id, workflow_id, mapping.logic
155            );
156            match self.compile_logic(&mapping.logic) {
157                Ok(index) => {
158                    mapping.logic_index = index;
159                    debug!(
160                        "Map logic for task {} compiled at index {:?}",
161                        task_id, index
162                    );
163                }
164                Err(e) => {
165                    error!(
166                        "Failed to parse map logic for task {} in workflow {}: {:?}",
167                        task_id, workflow_id, e
168                    );
169                }
170            }
171        }
172    }
173
174    /// Compile validation rule logic
175    fn compile_validation_logic(
176        &mut self,
177        config: &mut ValidationConfig,
178        task_id: &str,
179        workflow_id: &str,
180    ) {
181        for rule in &mut config.rules {
182            debug!(
183                "Compiling validation logic for task {} in workflow {}: {:?}",
184                task_id, workflow_id, rule.logic
185            );
186            match self.compile_logic(&rule.logic) {
187                Ok(index) => {
188                    rule.logic_index = index;
189                    debug!(
190                        "Validation logic for task {} compiled at index {:?}",
191                        task_id, index
192                    );
193                }
194                Err(e) => {
195                    error!(
196                        "Failed to parse validation logic for task {} in workflow {}: {:?}",
197                        task_id, workflow_id, e
198                    );
199                }
200            }
201        }
202    }
203
204    /// Compile a logic expression and cache it
205    fn compile_logic(&mut self, logic: &Value) -> Result<Option<usize>, String> {
206        // DataLogic v4: compile returns Arc<CompiledLogic>
207        match self.datalogic.compile(logic) {
208            Ok(compiled) => {
209                let index = self.logic_cache.len();
210                self.logic_cache.push(compiled);
211                Ok(Some(index))
212            }
213            Err(e) => Err(format!("Failed to compile logic: {}", e)),
214        }
215    }
216}