Skip to main content

dataflow_rs/engine/
compiler.rs

1//! # Workflow Compilation Module
2//!
3//! This module handles the pre-compilation of JSONLogic expressions used throughout
4//! the engine. By compiling all logic at initialization time with DataLogic v4, we achieve:
5//!
6//! - Zero runtime compilation overhead
7//! - Thread-safe compiled logic via Arc
8//! - Early validation of logic expressions
9//! - Efficient memory sharing across async tasks
10
11use crate::engine::functions::integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
12use crate::engine::functions::{FilterConfig, LogConfig, MapConfig, ValidationConfig};
13use crate::engine::{FunctionConfig, Workflow};
14use datalogic_rs::{CompiledLogic, DataLogic};
15use log::{debug, error};
16use serde_json::Value;
17use std::sync::Arc;
18
19/// Compiles and caches JSONLogic expressions for optimal runtime performance.
20///
21/// The `LogicCompiler` is responsible for:
22/// - Pre-compiling all workflow conditions using DataLogic v4
23/// - Pre-compiling task-specific logic (map transformations, validation rules)
24/// - Maintaining Arc-wrapped compiled logic for thread-safe sharing
25/// - Providing early validation of logic expressions
26pub struct LogicCompiler {
27    /// Shared DataLogic instance for compilation
28    datalogic: Arc<DataLogic>,
29    /// Cache of compiled logic expressions indexed by their position
30    logic_cache: Vec<Arc<CompiledLogic>>,
31}
32
33impl Default for LogicCompiler {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39impl LogicCompiler {
40    /// Create a new LogicCompiler with DataLogic v4
41    pub fn new() -> Self {
42        Self {
43            datalogic: Arc::new(DataLogic::with_preserve_structure()),
44            logic_cache: Vec::new(),
45        }
46    }
47
48    /// Get the DataLogic instance
49    pub fn datalogic(&self) -> Arc<DataLogic> {
50        Arc::clone(&self.datalogic)
51    }
52
53    /// Get the logic cache
54    pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
55        &self.logic_cache
56    }
57
58    /// Consume the compiler and return its components
59    pub fn into_parts(self) -> (Arc<DataLogic>, Vec<Arc<CompiledLogic>>) {
60        (self.datalogic, self.logic_cache)
61    }
62
63    /// Compile all workflows and their tasks, returning them sorted by priority
64    pub fn compile_workflows(&mut self, workflows: Vec<Workflow>) -> Vec<Workflow> {
65        let mut compiled_workflows = Vec::new();
66
67        for mut workflow in workflows {
68            if let Err(e) = workflow.validate() {
69                error!("Invalid workflow {}: {:?}", workflow.id, e);
70                continue;
71            }
72
73            // Parse and cache workflow condition
74            debug!(
75                "Compiling condition for workflow {}: {:?}",
76                workflow.id, workflow.condition
77            );
78            match self.compile_logic(&workflow.condition) {
79                Ok(index) => {
80                    workflow.condition_index = index;
81                    debug!(
82                        "Workflow {} condition compiled at index {:?}",
83                        workflow.id, index
84                    );
85
86                    // Compile task conditions and function logic
87                    self.compile_workflow_tasks(&mut workflow);
88
89                    compiled_workflows.push(workflow);
90                }
91                Err(e) => {
92                    error!(
93                        "Failed to parse condition for workflow {}: {:?}",
94                        workflow.id, e
95                    );
96                }
97            }
98        }
99
100        // Sort by priority once at construction time
101        compiled_workflows.sort_by_key(|w| w.priority);
102        compiled_workflows
103    }
104
105    /// Compile task conditions and function logic for a workflow
106    fn compile_workflow_tasks(&mut self, workflow: &mut Workflow) {
107        for task in &mut workflow.tasks {
108            // Compile task condition
109            debug!(
110                "Compiling condition for task {} in workflow {}: {:?}",
111                task.id, workflow.id, task.condition
112            );
113            match self.compile_logic(&task.condition) {
114                Ok(index) => {
115                    task.condition_index = index;
116                    debug!("Task {} condition compiled at index {:?}", task.id, index);
117                }
118                Err(e) => {
119                    error!(
120                        "Failed to parse condition for task {} in workflow {}: {:?}",
121                        task.id, workflow.id, e
122                    );
123                }
124            }
125
126            // Compile function-specific logic (map transformations, validation rules)
127            self.compile_function_logic(&mut task.function, &task.id, &workflow.id);
128        }
129    }
130
131    /// Compile function-specific logic based on function type
132    fn compile_function_logic(
133        &mut self,
134        function: &mut FunctionConfig,
135        task_id: &str,
136        workflow_id: &str,
137    ) {
138        match function {
139            FunctionConfig::Map { input, .. } => {
140                self.compile_map_logic(input, task_id, workflow_id);
141            }
142            FunctionConfig::Validation { input, .. } => {
143                self.compile_validation_logic(input, task_id, workflow_id);
144            }
145            FunctionConfig::Filter { input, .. } => {
146                self.compile_filter_logic(input, task_id, workflow_id);
147            }
148            FunctionConfig::Log { input, .. } => {
149                self.compile_log_logic(input, task_id, workflow_id);
150            }
151            FunctionConfig::HttpCall { input, .. } => {
152                self.compile_http_call_logic(input, task_id, workflow_id);
153            }
154            FunctionConfig::Enrich { input, .. } => {
155                self.compile_enrich_logic(input, task_id, workflow_id);
156            }
157            FunctionConfig::PublishKafka { input, .. } => {
158                self.compile_publish_kafka_logic(input, task_id, workflow_id);
159            }
160            _ => {
161                // Custom and other functions don't need pre-compilation
162            }
163        }
164    }
165
166    /// Compile map transformation logic
167    fn compile_map_logic(&mut self, config: &mut MapConfig, task_id: &str, workflow_id: &str) {
168        for mapping in &mut config.mappings {
169            debug!(
170                "Compiling map logic for task {} in workflow {}: {:?}",
171                task_id, workflow_id, mapping.logic
172            );
173            match self.compile_logic(&mapping.logic) {
174                Ok(index) => {
175                    mapping.logic_index = index;
176                    debug!(
177                        "Map logic for task {} compiled at index {:?}",
178                        task_id, index
179                    );
180                }
181                Err(e) => {
182                    error!(
183                        "Failed to parse map logic for task {} in workflow {}: {:?}",
184                        task_id, workflow_id, e
185                    );
186                }
187            }
188        }
189    }
190
191    /// Compile validation rule logic
192    fn compile_validation_logic(
193        &mut self,
194        config: &mut ValidationConfig,
195        task_id: &str,
196        workflow_id: &str,
197    ) {
198        for rule in &mut config.rules {
199            debug!(
200                "Compiling validation logic for task {} in workflow {}: {:?}",
201                task_id, workflow_id, rule.logic
202            );
203            match self.compile_logic(&rule.logic) {
204                Ok(index) => {
205                    rule.logic_index = index;
206                    debug!(
207                        "Validation logic for task {} compiled at index {:?}",
208                        task_id, index
209                    );
210                }
211                Err(e) => {
212                    error!(
213                        "Failed to parse validation logic for task {} in workflow {}: {:?}",
214                        task_id, workflow_id, e
215                    );
216                }
217            }
218        }
219    }
220
221    /// Compile log message and field expressions
222    fn compile_log_logic(&mut self, config: &mut LogConfig, task_id: &str, workflow_id: &str) {
223        // Compile the message expression
224        debug!(
225            "Compiling log message for task {} in workflow {}: {:?}",
226            task_id, workflow_id, config.message
227        );
228        match self.compile_logic(&config.message) {
229            Ok(index) => {
230                config.message_index = index;
231                debug!(
232                    "Log message for task {} compiled at index {:?}",
233                    task_id, index
234                );
235            }
236            Err(e) => {
237                error!(
238                    "Failed to compile log message for task {} in workflow {}: {:?}",
239                    task_id, workflow_id, e
240                );
241            }
242        }
243
244        // Compile each field expression
245        config.field_indices = config
246            .fields
247            .iter()
248            .map(|(key, logic)| {
249                let idx = match self.compile_logic(logic) {
250                    Ok(index) => {
251                        debug!(
252                            "Log field '{}' for task {} compiled at index {:?}",
253                            key, task_id, index
254                        );
255                        index
256                    }
257                    Err(e) => {
258                        error!(
259                            "Failed to compile log field '{}' for task {} in workflow {}: {:?}",
260                            key, task_id, workflow_id, e
261                        );
262                        None
263                    }
264                };
265                (key.clone(), idx)
266            })
267            .collect();
268    }
269
270    /// Compile filter condition logic
271    fn compile_filter_logic(
272        &mut self,
273        config: &mut FilterConfig,
274        task_id: &str,
275        workflow_id: &str,
276    ) {
277        debug!(
278            "Compiling filter condition for task {} in workflow {}: {:?}",
279            task_id, workflow_id, config.condition
280        );
281        match self.compile_logic(&config.condition) {
282            Ok(index) => {
283                config.condition_index = index;
284                debug!(
285                    "Filter condition for task {} compiled at index {:?}",
286                    task_id, index
287                );
288            }
289            Err(e) => {
290                error!(
291                    "Failed to compile filter condition for task {} in workflow {}: {:?}",
292                    task_id, workflow_id, e
293                );
294            }
295        }
296    }
297
298    /// Compile http_call JSONLogic expressions (path_logic, body_logic)
299    fn compile_http_call_logic(
300        &mut self,
301        config: &mut HttpCallConfig,
302        task_id: &str,
303        workflow_id: &str,
304    ) {
305        if let Some(ref logic) = config.path_logic.clone() {
306            match self.compile_logic(logic) {
307                Ok(index) => config.path_logic_index = index,
308                Err(e) => error!(
309                    "Failed to compile http_call path_logic for task {} in workflow {}: {:?}",
310                    task_id, workflow_id, e
311                ),
312            }
313        }
314        if let Some(ref logic) = config.body_logic.clone() {
315            match self.compile_logic(logic) {
316                Ok(index) => config.body_logic_index = index,
317                Err(e) => error!(
318                    "Failed to compile http_call body_logic for task {} in workflow {}: {:?}",
319                    task_id, workflow_id, e
320                ),
321            }
322        }
323    }
324
325    /// Compile enrich JSONLogic expressions (path_logic)
326    fn compile_enrich_logic(
327        &mut self,
328        config: &mut EnrichConfig,
329        task_id: &str,
330        workflow_id: &str,
331    ) {
332        if let Some(ref logic) = config.path_logic.clone() {
333            match self.compile_logic(logic) {
334                Ok(index) => config.path_logic_index = index,
335                Err(e) => error!(
336                    "Failed to compile enrich path_logic for task {} in workflow {}: {:?}",
337                    task_id, workflow_id, e
338                ),
339            }
340        }
341    }
342
343    /// Compile publish_kafka JSONLogic expressions (key_logic, value_logic)
344    fn compile_publish_kafka_logic(
345        &mut self,
346        config: &mut PublishKafkaConfig,
347        task_id: &str,
348        workflow_id: &str,
349    ) {
350        if let Some(ref logic) = config.key_logic.clone() {
351            match self.compile_logic(logic) {
352                Ok(index) => config.key_logic_index = index,
353                Err(e) => error!(
354                    "Failed to compile publish_kafka key_logic for task {} in workflow {}: {:?}",
355                    task_id, workflow_id, e
356                ),
357            }
358        }
359        if let Some(ref logic) = config.value_logic.clone() {
360            match self.compile_logic(logic) {
361                Ok(index) => config.value_logic_index = index,
362                Err(e) => error!(
363                    "Failed to compile publish_kafka value_logic for task {} in workflow {}: {:?}",
364                    task_id, workflow_id, e
365                ),
366            }
367        }
368    }
369
370    /// Compile a logic expression and cache it
371    fn compile_logic(&mut self, logic: &Value) -> Result<Option<usize>, String> {
372        // DataLogic v4: compile returns Arc<CompiledLogic>
373        match self.datalogic.compile(logic) {
374            Ok(compiled) => {
375                let index = self.logic_cache.len();
376                self.logic_cache.push(compiled);
377                Ok(Some(index))
378            }
379            Err(e) => Err(format!("Failed to compile logic: {}", e)),
380        }
381    }
382}