1use crate::engine::error::{DataflowError, Result};
11use crate::engine::functions::integration::{EnrichConfig, HttpCallConfig, PublishKafkaConfig};
12use crate::engine::functions::{FilterConfig, LogConfig, MapConfig, ValidationConfig};
13use crate::engine::{FunctionConfig, Workflow};
14use datalogic_rs::{Engine, Logic};
15use log::debug;
16use serde_json::Value;
17use std::sync::Arc;
18
19pub struct LogicCompiler {
22 engine: Arc<Engine>,
24}
25
26impl Default for LogicCompiler {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl LogicCompiler {
33 pub fn new() -> Self {
36 Self {
37 engine: Arc::new(Engine::builder().with_templating(true).build()),
38 }
39 }
40
41 pub fn engine(&self) -> Arc<Engine> {
43 Arc::clone(&self.engine)
44 }
45
46 pub fn into_engine(self) -> Arc<Engine> {
48 self.engine
49 }
50
51 pub fn compile_workflows(&self, workflows: Vec<Workflow>) -> Result<Vec<Workflow>> {
56 let mut compiled_workflows = Vec::with_capacity(workflows.len());
57
58 for mut workflow in workflows {
59 workflow.validate()?;
60
61 workflow.id_arc = Arc::from(workflow.id.as_str());
64 for task in &mut workflow.tasks {
65 task.id_arc = Arc::from(task.id.as_str());
66 }
67
68 let label = format!("workflow {} condition", workflow.id);
70 workflow.compiled_condition = Some(self.compile(&workflow.condition, &label)?);
71 debug!("Workflow {} condition compiled", workflow.id);
72
73 self.compile_workflow_tasks(&mut workflow)?;
75
76 compiled_workflows.push(workflow);
77 }
78
79 compiled_workflows.sort_by_key(|w| w.priority);
81 Ok(compiled_workflows)
82 }
83
84 fn compile_workflow_tasks(&self, workflow: &mut Workflow) -> Result<()> {
86 for task in &mut workflow.tasks {
87 let label = format!("task {} condition (workflow {})", task.id, workflow.id);
88 task.compiled_condition = Some(self.compile(&task.condition, &label)?);
89
90 self.compile_function_logic(&mut task.function, &task.id, &workflow.id)?;
92 }
93 Ok(())
94 }
95
96 fn compile_function_logic(
98 &self,
99 function: &mut FunctionConfig,
100 task_id: &str,
101 workflow_id: &str,
102 ) -> Result<()> {
103 match function {
104 FunctionConfig::Map { input, .. } => {
105 self.compile_map_logic(input, task_id, workflow_id)
106 }
107 FunctionConfig::Validation { input, .. } => {
108 self.compile_validation_logic(input, task_id, workflow_id)
109 }
110 FunctionConfig::Filter { input, .. } => {
111 self.compile_filter_logic(input, task_id, workflow_id)
112 }
113 FunctionConfig::Log { input, .. } => {
114 self.compile_log_logic(input, task_id, workflow_id)
115 }
116 FunctionConfig::HttpCall { input, .. } => {
117 self.compile_http_call_logic(input, task_id, workflow_id)
118 }
119 FunctionConfig::Enrich { input, .. } => {
120 self.compile_enrich_logic(input, task_id, workflow_id)
121 }
122 FunctionConfig::PublishKafka { input, .. } => {
123 self.compile_publish_kafka_logic(input, task_id, workflow_id)
124 }
125 _ => Ok(()),
127 }
128 }
129
130 fn compile(&self, logic: &Value, ctx_label: &str) -> Result<Arc<Logic>> {
134 self.engine
135 .compile_arc(logic)
136 .map_err(|e| DataflowError::LogicEvaluation(format!("{}: {}", ctx_label, e)))
137 }
138
139 fn compile_map_logic(
141 &self,
142 config: &mut MapConfig,
143 task_id: &str,
144 workflow_id: &str,
145 ) -> Result<()> {
146 for mapping in &mut config.mappings {
147 let parts: Vec<Arc<str>> = mapping.path.split('.').map(Arc::from).collect();
153 mapping.path_parts = Arc::from(parts.into_boxed_slice());
154 mapping.path_arc = Arc::from(mapping.path.as_str());
155
156 let label = format!(
157 "map logic for task {} in workflow {} (path {})",
158 task_id, workflow_id, mapping.path
159 );
160 mapping.compiled_logic = Some(self.compile(&mapping.logic, &label)?);
161 }
162 Ok(())
163 }
164
165 fn compile_validation_logic(
167 &self,
168 config: &mut ValidationConfig,
169 task_id: &str,
170 workflow_id: &str,
171 ) -> Result<()> {
172 for (idx, rule) in config.rules.iter_mut().enumerate() {
173 let label = format!(
174 "validation rule {} for task {} in workflow {}",
175 idx, task_id, workflow_id
176 );
177 rule.compiled_logic = Some(self.compile(&rule.logic, &label)?);
178 }
179 Ok(())
180 }
181
182 fn compile_log_logic(
184 &self,
185 config: &mut LogConfig,
186 task_id: &str,
187 workflow_id: &str,
188 ) -> Result<()> {
189 let msg_label = format!(
190 "log message for task {} in workflow {}",
191 task_id, workflow_id
192 );
193 config.compiled_message = Some(self.compile(&config.message, &msg_label)?);
194
195 let mut compiled_fields = Vec::with_capacity(config.fields.len());
199 for (key, logic) in &config.fields {
200 let label = format!(
201 "log field '{}' for task {} in workflow {}",
202 key, task_id, workflow_id
203 );
204 compiled_fields.push((key.clone(), Some(self.compile(logic, &label)?)));
205 }
206 config.compiled_fields = compiled_fields;
207 Ok(())
208 }
209
210 fn compile_filter_logic(
212 &self,
213 config: &mut FilterConfig,
214 task_id: &str,
215 workflow_id: &str,
216 ) -> Result<()> {
217 let label = format!(
218 "filter condition for task {} in workflow {}",
219 task_id, workflow_id
220 );
221 config.compiled_condition = Some(self.compile(&config.condition, &label)?);
222 Ok(())
223 }
224
225 fn compile_http_call_logic(
227 &self,
228 config: &mut HttpCallConfig,
229 task_id: &str,
230 workflow_id: &str,
231 ) -> Result<()> {
232 if let Some(logic) = &config.path_logic {
233 let label = format!(
234 "http_call path_logic for task {} in workflow {}",
235 task_id, workflow_id
236 );
237 config.compiled_path_logic = Some(self.compile(logic, &label)?);
238 }
239 if let Some(logic) = &config.body_logic {
240 let label = format!(
241 "http_call body_logic for task {} in workflow {}",
242 task_id, workflow_id
243 );
244 config.compiled_body_logic = Some(self.compile(logic, &label)?);
245 }
246 Ok(())
247 }
248
249 fn compile_enrich_logic(
251 &self,
252 config: &mut EnrichConfig,
253 task_id: &str,
254 workflow_id: &str,
255 ) -> Result<()> {
256 if let Some(logic) = &config.path_logic {
257 let label = format!(
258 "enrich path_logic for task {} in workflow {}",
259 task_id, workflow_id
260 );
261 config.compiled_path_logic = Some(self.compile(logic, &label)?);
262 }
263 Ok(())
264 }
265
266 fn compile_publish_kafka_logic(
268 &self,
269 config: &mut PublishKafkaConfig,
270 task_id: &str,
271 workflow_id: &str,
272 ) -> Result<()> {
273 if let Some(logic) = &config.key_logic {
274 let label = format!(
275 "publish_kafka key_logic for task {} in workflow {}",
276 task_id, workflow_id
277 );
278 config.compiled_key_logic = Some(self.compile(logic, &label)?);
279 }
280 if let Some(logic) = &config.value_logic {
281 let label = format!(
282 "publish_kafka value_logic for task {} in workflow {}",
283 task_id, workflow_id
284 );
285 config.compiled_value_logic = Some(self.compile(logic, &label)?);
286 }
287 Ok(())
288 }
289}