1use crate::engine::error::{DataflowError, Result};
11use crate::engine::functions::integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
12use crate::engine::functions::{FilterConfig, LogConfig, MapConfig, ValidationConfig};
13use crate::engine::{FunctionConfig, Workflow};
14use datalogic_rs::{Engine, Logic};
15use log::debug;
16use serde_json::Value;
17use std::sync::Arc;
18
19pub struct LogicCompiler {
22 engine: Arc<Engine>,
24}
25
26impl Default for LogicCompiler {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl LogicCompiler {
33 pub fn new() -> Self {
36 Self {
37 engine: Arc::new(Engine::builder().with_templating(true).build()),
38 }
39 }
40
41 pub fn engine(&self) -> Arc<Engine> {
43 Arc::clone(&self.engine)
44 }
45
46 pub fn into_engine(self) -> Arc<Engine> {
48 self.engine
49 }
50
51 pub fn compile_workflows(&self, workflows: Vec<Workflow>) -> Result<Vec<Workflow>> {
56 let mut compiled_workflows = Vec::with_capacity(workflows.len());
57
58 for mut workflow in workflows {
59 workflow.validate()?;
60
61 workflow.id_arc = Arc::from(workflow.id.as_str());
64 for task in &mut workflow.tasks {
65 task.id_arc = Arc::from(task.id.as_str());
66 }
67
68 let label = format!("workflow {} condition", workflow.id);
71 workflow.compiled_condition = self.compile_condition(&workflow.condition, &label)?;
72 debug!("Workflow {} condition compiled", workflow.id);
73
74 self.compile_workflow_tasks(&mut workflow)?;
76
77 workflow.fully_sync = workflow.tasks.iter().all(|t| t.function.is_sync_builtin());
84
85 compiled_workflows.push(workflow);
86 }
87
88 compiled_workflows.sort_by_key(|w| w.priority);
90 Ok(compiled_workflows)
91 }
92
93 fn compile_workflow_tasks(&self, workflow: &mut Workflow) -> Result<()> {
95 for task in &mut workflow.tasks {
96 let label = format!("task {} condition (workflow {})", task.id, workflow.id);
97 task.compiled_condition = self.compile_condition(&task.condition, &label)?;
98
99 self.compile_function_logic(&mut task.function, &task.id, &workflow.id)?;
101 }
102 Ok(())
103 }
104
105 fn compile_function_logic(
107 &self,
108 function: &mut FunctionConfig,
109 task_id: &str,
110 workflow_id: &str,
111 ) -> Result<()> {
112 match function {
113 FunctionConfig::Map { input, .. } => {
114 self.compile_map_logic(input, task_id, workflow_id)
115 }
116 FunctionConfig::Validation { input, .. } => {
117 self.compile_validation_logic(input, task_id, workflow_id)
118 }
119 FunctionConfig::Filter { input, .. } => {
120 self.compile_filter_logic(input, task_id, workflow_id)
121 }
122 FunctionConfig::Log { input, .. } => {
123 self.compile_log_logic(input, task_id, workflow_id)
124 }
125 FunctionConfig::HttpCall { input, .. } => {
126 self.compile_http_call_logic(input, task_id, workflow_id)
127 }
128 FunctionConfig::Enrich { input, .. } => {
129 self.compile_enrich_logic(input, task_id, workflow_id)
130 }
131 FunctionConfig::PublishKafka { input, .. } => {
132 self.compile_publish_kafka_logic(input, task_id, workflow_id)
133 }
134 _ => Ok(()),
136 }
137 }
138
139 fn compile(&self, logic: &Value, ctx_label: &str) -> Result<Arc<Logic>> {
143 self.engine
144 .compile_arc(logic)
145 .map_err(|e| DataflowError::LogicEvaluation(format!("{}: {}", ctx_label, e)))
146 }
147
148 fn compile_condition(&self, condition: &Value, ctx_label: &str) -> Result<Option<Arc<Logic>>> {
158 if matches!(condition, Value::Bool(true)) {
159 return Ok(None);
160 }
161 Ok(Some(self.compile(condition, ctx_label)?))
162 }
163
164 fn compile_map_logic(
166 &self,
167 config: &mut MapConfig,
168 task_id: &str,
169 workflow_id: &str,
170 ) -> Result<()> {
171 for mapping in &mut config.mappings {
172 let parts: Vec<Arc<str>> = mapping.path.split('.').map(Arc::from).collect();
178 mapping.path_parts = Arc::from(parts.into_boxed_slice());
179 mapping.path_arc = Arc::from(mapping.path.as_str());
180
181 let label = format!(
182 "map logic for task {} in workflow {} (path {})",
183 task_id, workflow_id, mapping.path
184 );
185 mapping.compiled_logic = Some(self.compile(&mapping.logic, &label)?);
186 }
187 Ok(())
188 }
189
190 fn compile_validation_logic(
192 &self,
193 config: &mut ValidationConfig,
194 task_id: &str,
195 workflow_id: &str,
196 ) -> Result<()> {
197 for (idx, rule) in config.rules.iter_mut().enumerate() {
198 let label = format!(
199 "validation rule {} for task {} in workflow {}",
200 idx, task_id, workflow_id
201 );
202 rule.compiled_logic = Some(self.compile(&rule.logic, &label)?);
203 }
204 Ok(())
205 }
206
207 fn compile_log_logic(
209 &self,
210 config: &mut LogConfig,
211 task_id: &str,
212 workflow_id: &str,
213 ) -> Result<()> {
214 let msg_label = format!(
215 "log message for task {} in workflow {}",
216 task_id, workflow_id
217 );
218 config.compiled_message = Some(self.compile(&config.message, &msg_label)?);
219
220 let mut compiled_fields = Vec::with_capacity(config.fields.len());
224 for (key, logic) in &config.fields {
225 let label = format!(
226 "log field '{}' for task {} in workflow {}",
227 key, task_id, workflow_id
228 );
229 compiled_fields.push((key.clone(), Some(self.compile(logic, &label)?)));
230 }
231 config.compiled_fields = compiled_fields;
232 Ok(())
233 }
234
235 fn compile_filter_logic(
237 &self,
238 config: &mut FilterConfig,
239 task_id: &str,
240 workflow_id: &str,
241 ) -> Result<()> {
242 let label = format!(
243 "filter condition for task {} in workflow {}",
244 task_id, workflow_id
245 );
246 config.compiled_condition = Some(self.compile(&config.condition, &label)?);
247 Ok(())
248 }
249
250 fn compile_http_call_logic(
252 &self,
253 config: &mut HttpCallConfig,
254 task_id: &str,
255 workflow_id: &str,
256 ) -> Result<()> {
257 if let Some(logic) = &config.path_logic {
258 let label = format!(
259 "http_call path_logic for task {} in workflow {}",
260 task_id, workflow_id
261 );
262 config.compiled_path_logic = Some(self.compile(logic, &label)?);
263 }
264 if let Some(logic) = &config.body_logic {
265 let label = format!(
266 "http_call body_logic for task {} in workflow {}",
267 task_id, workflow_id
268 );
269 config.compiled_body_logic = Some(self.compile(logic, &label)?);
270 }
271 Ok(())
272 }
273
274 fn compile_enrich_logic(
276 &self,
277 config: &mut EnrichConfig,
278 task_id: &str,
279 workflow_id: &str,
280 ) -> Result<()> {
281 if let Some(logic) = &config.path_logic {
282 let label = format!(
283 "enrich path_logic for task {} in workflow {}",
284 task_id, workflow_id
285 );
286 config.compiled_path_logic = Some(self.compile(logic, &label)?);
287 }
288 Ok(())
289 }
290
291 fn compile_publish_kafka_logic(
293 &self,
294 config: &mut PublishKafkaConfig,
295 task_id: &str,
296 workflow_id: &str,
297 ) -> Result<()> {
298 if let Some(logic) = &config.key_logic {
299 let label = format!(
300 "publish_kafka key_logic for task {} in workflow {}",
301 task_id, workflow_id
302 );
303 config.compiled_key_logic = Some(self.compile(logic, &label)?);
304 }
305 if let Some(logic) = &config.value_logic {
306 let label = format!(
307 "publish_kafka value_logic for task {} in workflow {}",
308 task_id, workflow_id
309 );
310 config.compiled_value_logic = Some(self.compile(logic, &label)?);
311 }
312 Ok(())
313 }
314}