dataflow_rs/engine/
compiler.rs1use crate::engine::functions::{MapConfig, ValidationConfig};
12use crate::engine::{FunctionConfig, Workflow};
13use datalogic_rs::{CompiledLogic, DataLogic};
14use log::{debug, error};
15use serde_json::Value;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19pub struct LogicCompiler {
27 datalogic: Arc<DataLogic>,
29 logic_cache: Vec<Arc<CompiledLogic>>,
31}
32
33impl Default for LogicCompiler {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl LogicCompiler {
40 pub fn new() -> Self {
42 Self {
43 datalogic: Arc::new(DataLogic::with_preserve_structure()),
44 logic_cache: Vec::new(),
45 }
46 }
47
48 pub fn datalogic(&self) -> Arc<DataLogic> {
50 Arc::clone(&self.datalogic)
51 }
52
53 pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
55 &self.logic_cache
56 }
57
58 pub fn into_parts(self) -> (Arc<DataLogic>, Vec<Arc<CompiledLogic>>) {
60 (self.datalogic, self.logic_cache)
61 }
62
63 pub fn compile_workflows(&mut self, workflows: Vec<Workflow>) -> HashMap<String, Workflow> {
65 let mut workflow_map = HashMap::new();
66
67 for mut workflow in workflows {
68 if let Err(e) = workflow.validate() {
69 error!("Invalid workflow {}: {:?}", workflow.id, e);
70 continue;
71 }
72
73 debug!(
75 "Compiling condition for workflow {}: {:?}",
76 workflow.id, workflow.condition
77 );
78 match self.compile_logic(&workflow.condition) {
79 Ok(index) => {
80 workflow.condition_index = index;
81 debug!(
82 "Workflow {} condition compiled at index {:?}",
83 workflow.id, index
84 );
85
86 self.compile_workflow_tasks(&mut workflow);
88
89 workflow_map.insert(workflow.id.clone(), workflow);
90 }
91 Err(e) => {
92 error!(
93 "Failed to parse condition for workflow {}: {:?}",
94 workflow.id, e
95 );
96 }
97 }
98 }
99
100 workflow_map
101 }
102
103 fn compile_workflow_tasks(&mut self, workflow: &mut Workflow) {
105 for task in &mut workflow.tasks {
106 debug!(
108 "Compiling condition for task {} in workflow {}: {:?}",
109 task.id, workflow.id, task.condition
110 );
111 match self.compile_logic(&task.condition) {
112 Ok(index) => {
113 task.condition_index = index;
114 debug!("Task {} condition compiled at index {:?}", task.id, index);
115 }
116 Err(e) => {
117 error!(
118 "Failed to parse condition for task {} in workflow {}: {:?}",
119 task.id, workflow.id, e
120 );
121 }
122 }
123
124 self.compile_function_logic(&mut task.function, &task.id, &workflow.id);
126 }
127 }
128
129 fn compile_function_logic(
131 &mut self,
132 function: &mut FunctionConfig,
133 task_id: &str,
134 workflow_id: &str,
135 ) {
136 match function {
137 FunctionConfig::Map { input, .. } => {
138 self.compile_map_logic(input, task_id, workflow_id);
139 }
140 FunctionConfig::Validation { input, .. } => {
141 self.compile_validation_logic(input, task_id, workflow_id);
142 }
143 _ => {
144 }
146 }
147 }
148
149 fn compile_map_logic(&mut self, config: &mut MapConfig, task_id: &str, workflow_id: &str) {
151 for mapping in &mut config.mappings {
152 debug!(
153 "Compiling map logic for task {} in workflow {}: {:?}",
154 task_id, workflow_id, mapping.logic
155 );
156 match self.compile_logic(&mapping.logic) {
157 Ok(index) => {
158 mapping.logic_index = index;
159 debug!(
160 "Map logic for task {} compiled at index {:?}",
161 task_id, index
162 );
163 }
164 Err(e) => {
165 error!(
166 "Failed to parse map logic for task {} in workflow {}: {:?}",
167 task_id, workflow_id, e
168 );
169 }
170 }
171 }
172 }
173
174 fn compile_validation_logic(
176 &mut self,
177 config: &mut ValidationConfig,
178 task_id: &str,
179 workflow_id: &str,
180 ) {
181 for rule in &mut config.rules {
182 debug!(
183 "Compiling validation logic for task {} in workflow {}: {:?}",
184 task_id, workflow_id, rule.logic
185 );
186 match self.compile_logic(&rule.logic) {
187 Ok(index) => {
188 rule.logic_index = index;
189 debug!(
190 "Validation logic for task {} compiled at index {:?}",
191 task_id, index
192 );
193 }
194 Err(e) => {
195 error!(
196 "Failed to parse validation logic for task {} in workflow {}: {:?}",
197 task_id, workflow_id, e
198 );
199 }
200 }
201 }
202 }
203
204 fn compile_logic(&mut self, logic: &Value) -> Result<Option<usize>, String> {
206 match self.datalogic.compile(logic) {
208 Ok(compiled) => {
209 let index = self.logic_cache.len();
210 self.logic_cache.push(compiled);
211 Ok(Some(index))
212 }
213 Err(e) => Err(format!("Failed to compile logic: {}", e)),
214 }
215 }
216}