1use crate::engine::functions::integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
12use crate::engine::functions::{FilterConfig, LogConfig, MapConfig, ValidationConfig};
13use crate::engine::{FunctionConfig, Workflow};
14use datalogic_rs::{CompiledLogic, DataLogic};
15use log::{debug, error};
16use serde_json::Value;
17use std::sync::Arc;
18
19pub struct LogicCompiler {
27 datalogic: Arc<DataLogic>,
29 logic_cache: Vec<Arc<CompiledLogic>>,
31}
32
33impl Default for LogicCompiler {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl LogicCompiler {
40 pub fn new() -> Self {
42 Self {
43 datalogic: Arc::new(DataLogic::with_preserve_structure()),
44 logic_cache: Vec::new(),
45 }
46 }
47
48 pub fn datalogic(&self) -> Arc<DataLogic> {
50 Arc::clone(&self.datalogic)
51 }
52
53 pub fn logic_cache(&self) -> &Vec<Arc<CompiledLogic>> {
55 &self.logic_cache
56 }
57
58 pub fn into_parts(self) -> (Arc<DataLogic>, Vec<Arc<CompiledLogic>>) {
60 (self.datalogic, self.logic_cache)
61 }
62
63 pub fn compile_workflows(&mut self, workflows: Vec<Workflow>) -> Vec<Workflow> {
65 let mut compiled_workflows = Vec::new();
66
67 for mut workflow in workflows {
68 if let Err(e) = workflow.validate() {
69 error!("Invalid workflow {}: {:?}", workflow.id, e);
70 continue;
71 }
72
73 debug!(
75 "Compiling condition for workflow {}: {:?}",
76 workflow.id, workflow.condition
77 );
78 match self.compile_logic(&workflow.condition) {
79 Ok(index) => {
80 workflow.condition_index = index;
81 debug!(
82 "Workflow {} condition compiled at index {:?}",
83 workflow.id, index
84 );
85
86 self.compile_workflow_tasks(&mut workflow);
88
89 compiled_workflows.push(workflow);
90 }
91 Err(e) => {
92 error!(
93 "Failed to parse condition for workflow {}: {:?}",
94 workflow.id, e
95 );
96 }
97 }
98 }
99
100 compiled_workflows.sort_by_key(|w| w.priority);
102 compiled_workflows
103 }
104
105 fn compile_workflow_tasks(&mut self, workflow: &mut Workflow) {
107 for task in &mut workflow.tasks {
108 debug!(
110 "Compiling condition for task {} in workflow {}: {:?}",
111 task.id, workflow.id, task.condition
112 );
113 match self.compile_logic(&task.condition) {
114 Ok(index) => {
115 task.condition_index = index;
116 debug!("Task {} condition compiled at index {:?}", task.id, index);
117 }
118 Err(e) => {
119 error!(
120 "Failed to parse condition for task {} in workflow {}: {:?}",
121 task.id, workflow.id, e
122 );
123 }
124 }
125
126 self.compile_function_logic(&mut task.function, &task.id, &workflow.id);
128 }
129 }
130
131 fn compile_function_logic(
133 &mut self,
134 function: &mut FunctionConfig,
135 task_id: &str,
136 workflow_id: &str,
137 ) {
138 match function {
139 FunctionConfig::Map { input, .. } => {
140 self.compile_map_logic(input, task_id, workflow_id);
141 }
142 FunctionConfig::Validation { input, .. } => {
143 self.compile_validation_logic(input, task_id, workflow_id);
144 }
145 FunctionConfig::Filter { input, .. } => {
146 self.compile_filter_logic(input, task_id, workflow_id);
147 }
148 FunctionConfig::Log { input, .. } => {
149 self.compile_log_logic(input, task_id, workflow_id);
150 }
151 FunctionConfig::HttpCall { input, .. } => {
152 self.compile_http_call_logic(input, task_id, workflow_id);
153 }
154 FunctionConfig::Enrich { input, .. } => {
155 self.compile_enrich_logic(input, task_id, workflow_id);
156 }
157 FunctionConfig::PublishKafka { input, .. } => {
158 self.compile_publish_kafka_logic(input, task_id, workflow_id);
159 }
160 _ => {
161 }
163 }
164 }
165
166 fn compile_map_logic(&mut self, config: &mut MapConfig, task_id: &str, workflow_id: &str) {
168 for mapping in &mut config.mappings {
169 debug!(
170 "Compiling map logic for task {} in workflow {}: {:?}",
171 task_id, workflow_id, mapping.logic
172 );
173 match self.compile_logic(&mapping.logic) {
174 Ok(index) => {
175 mapping.logic_index = index;
176 debug!(
177 "Map logic for task {} compiled at index {:?}",
178 task_id, index
179 );
180 }
181 Err(e) => {
182 error!(
183 "Failed to parse map logic for task {} in workflow {}: {:?}",
184 task_id, workflow_id, e
185 );
186 }
187 }
188 }
189 }
190
191 fn compile_validation_logic(
193 &mut self,
194 config: &mut ValidationConfig,
195 task_id: &str,
196 workflow_id: &str,
197 ) {
198 for rule in &mut config.rules {
199 debug!(
200 "Compiling validation logic for task {} in workflow {}: {:?}",
201 task_id, workflow_id, rule.logic
202 );
203 match self.compile_logic(&rule.logic) {
204 Ok(index) => {
205 rule.logic_index = index;
206 debug!(
207 "Validation logic for task {} compiled at index {:?}",
208 task_id, index
209 );
210 }
211 Err(e) => {
212 error!(
213 "Failed to parse validation logic for task {} in workflow {}: {:?}",
214 task_id, workflow_id, e
215 );
216 }
217 }
218 }
219 }
220
221 fn compile_log_logic(&mut self, config: &mut LogConfig, task_id: &str, workflow_id: &str) {
223 debug!(
225 "Compiling log message for task {} in workflow {}: {:?}",
226 task_id, workflow_id, config.message
227 );
228 match self.compile_logic(&config.message) {
229 Ok(index) => {
230 config.message_index = index;
231 debug!(
232 "Log message for task {} compiled at index {:?}",
233 task_id, index
234 );
235 }
236 Err(e) => {
237 error!(
238 "Failed to compile log message for task {} in workflow {}: {:?}",
239 task_id, workflow_id, e
240 );
241 }
242 }
243
244 config.field_indices = config
246 .fields
247 .iter()
248 .map(|(key, logic)| {
249 let idx = match self.compile_logic(logic) {
250 Ok(index) => {
251 debug!(
252 "Log field '{}' for task {} compiled at index {:?}",
253 key, task_id, index
254 );
255 index
256 }
257 Err(e) => {
258 error!(
259 "Failed to compile log field '{}' for task {} in workflow {}: {:?}",
260 key, task_id, workflow_id, e
261 );
262 None
263 }
264 };
265 (key.clone(), idx)
266 })
267 .collect();
268 }
269
270 fn compile_filter_logic(
272 &mut self,
273 config: &mut FilterConfig,
274 task_id: &str,
275 workflow_id: &str,
276 ) {
277 debug!(
278 "Compiling filter condition for task {} in workflow {}: {:?}",
279 task_id, workflow_id, config.condition
280 );
281 match self.compile_logic(&config.condition) {
282 Ok(index) => {
283 config.condition_index = index;
284 debug!(
285 "Filter condition for task {} compiled at index {:?}",
286 task_id, index
287 );
288 }
289 Err(e) => {
290 error!(
291 "Failed to compile filter condition for task {} in workflow {}: {:?}",
292 task_id, workflow_id, e
293 );
294 }
295 }
296 }
297
298 fn compile_http_call_logic(
300 &mut self,
301 config: &mut HttpCallConfig,
302 task_id: &str,
303 workflow_id: &str,
304 ) {
305 if let Some(ref logic) = config.path_logic.clone() {
306 match self.compile_logic(logic) {
307 Ok(index) => config.path_logic_index = index,
308 Err(e) => error!(
309 "Failed to compile http_call path_logic for task {} in workflow {}: {:?}",
310 task_id, workflow_id, e
311 ),
312 }
313 }
314 if let Some(ref logic) = config.body_logic.clone() {
315 match self.compile_logic(logic) {
316 Ok(index) => config.body_logic_index = index,
317 Err(e) => error!(
318 "Failed to compile http_call body_logic for task {} in workflow {}: {:?}",
319 task_id, workflow_id, e
320 ),
321 }
322 }
323 }
324
325 fn compile_enrich_logic(
327 &mut self,
328 config: &mut EnrichConfig,
329 task_id: &str,
330 workflow_id: &str,
331 ) {
332 if let Some(ref logic) = config.path_logic.clone() {
333 match self.compile_logic(logic) {
334 Ok(index) => config.path_logic_index = index,
335 Err(e) => error!(
336 "Failed to compile enrich path_logic for task {} in workflow {}: {:?}",
337 task_id, workflow_id, e
338 ),
339 }
340 }
341 }
342
343 fn compile_publish_kafka_logic(
345 &mut self,
346 config: &mut PublishKafkaConfig,
347 task_id: &str,
348 workflow_id: &str,
349 ) {
350 if let Some(ref logic) = config.key_logic.clone() {
351 match self.compile_logic(logic) {
352 Ok(index) => config.key_logic_index = index,
353 Err(e) => error!(
354 "Failed to compile publish_kafka key_logic for task {} in workflow {}: {:?}",
355 task_id, workflow_id, e
356 ),
357 }
358 }
359 if let Some(ref logic) = config.value_logic.clone() {
360 match self.compile_logic(logic) {
361 Ok(index) => config.value_logic_index = index,
362 Err(e) => error!(
363 "Failed to compile publish_kafka value_logic for task {} in workflow {}: {:?}",
364 task_id, workflow_id, e
365 ),
366 }
367 }
368 }
369
370 fn compile_logic(&mut self, logic: &Value) -> Result<Option<usize>, String> {
372 match self.datalogic.compile(logic) {
374 Ok(compiled) => {
375 let index = self.logic_cache.len();
376 self.logic_cache.push(compiled);
377 Ok(Some(index))
378 }
379 Err(e) => Err(format!("Failed to compile logic: {}", e)),
380 }
381 }
382}