1use crate::engine::error::{DataflowError, ErrorInfo, Result};
12use crate::engine::functions::{MapConfig, ValidationConfig};
13use crate::engine::message::{Change, Message};
14use datalogic_rs::value::ToJson;
15use datalogic_rs::{DataLogic, Logic};
16use log::{debug, error};
17use serde_json::{Value, json};
18
19pub struct InternalExecutor<'a> {
30 datalogic: &'a DataLogic<'static>,
32 logic_cache: &'a Vec<Logic<'static>>,
34}
35
36impl<'a> InternalExecutor<'a> {
37 pub fn new(datalogic: &'a DataLogic<'static>, logic_cache: &'a Vec<Logic<'static>>) -> Self {
39 Self {
40 datalogic,
41 logic_cache,
42 }
43 }
44
45 pub fn execute_map(
47 &self,
48 message: &mut Message,
49 config: &MapConfig,
50 ) -> Result<(usize, Vec<Change>)> {
51 let mut changes = Vec::with_capacity(config.mappings.len());
52
53 let needs_parsed_data = config.mappings.iter().any(|m| m.logic_index.is_some());
55
56 let needs_data = needs_parsed_data
58 || config
59 .mappings
60 .iter()
61 .any(|m| m.logic.is_object() || m.logic.is_array());
62
63 let mut data_for_eval = if needs_data {
65 Some(json!({
66 "data": &message.data,
67 "metadata": &message.metadata,
68 "temp_data": &message.temp_data,
69 }))
70 } else {
71 None
72 };
73
74 for mapping in &config.mappings {
75 let target_path = &mapping.path;
76 let logic = &mapping.logic;
77
78 let parsed_data = if let Some(logic_index) = mapping.logic_index {
80 if logic_index < self.logic_cache.len() && data_for_eval.is_some() {
81 Some(
82 self.datalogic
83 .parse_data_json(data_for_eval.as_ref().unwrap())
84 .map_err(|e| {
85 error!("Failed to parse data for evaluation: {e:?}");
86 DataflowError::LogicEvaluation(format!("Error parsing data: {}", e))
87 })?,
88 )
89 } else {
90 None
91 }
92 } else {
93 None
94 };
95
96 let result = if let Some(logic_index) = mapping.logic_index {
98 if logic_index < self.logic_cache.len() && parsed_data.is_some() {
99 let compiled_logic = &self.logic_cache[logic_index];
100 let eval_result = self
101 .datalogic
102 .evaluate(compiled_logic, parsed_data.as_ref().unwrap())
103 .map_err(|e| {
104 error!("Failed to evaluate compiled logic: {e:?}");
105 DataflowError::LogicEvaluation(format!("Error evaluating logic: {}", e))
106 })?;
107 eval_result.to_json()
108 } else if let Some(ref data) = data_for_eval {
109 self.datalogic.evaluate_json(logic, data).map_err(|e| {
110 error!("Failed to evaluate logic: {e:?}");
111 DataflowError::LogicEvaluation(format!("Error evaluating logic: {}", e))
112 })?
113 } else {
114 logic.clone()
116 }
117 } else {
118 if !logic.is_object() && !logic.is_array() {
120 logic.clone()
121 } else if let Some(ref data) = data_for_eval {
122 self.datalogic.evaluate_json(logic, data).map_err(|e| {
123 error!("Failed to evaluate logic: {e:?}");
124 DataflowError::LogicEvaluation(format!("Error evaluating logic: {}", e))
125 })?
126 } else {
127 logic.clone()
128 }
129 };
130
131 if result.is_null() {
132 continue;
133 }
134
135 let (target_object, adjusted_path) = self.resolve_target_path(message, target_path);
137
138 let old_value = self.set_value_at_path(target_object, adjusted_path, &result)?;
140
141 changes.push(Change {
142 path: target_path.clone(),
143 old_value,
144 new_value: result.clone(),
145 });
146
147 if let Some(ref mut data) = data_for_eval {
149 if let Some(adjusted_path) = target_path.strip_prefix("data.") {
151 if let Some(data_obj) = data.get_mut("data") {
152 let _ = self.set_value_at_path(data_obj, adjusted_path, &result);
153 }
154 } else if let Some(adjusted_path) = target_path.strip_prefix("temp_data.") {
155 if let Some(temp_data_obj) = data.get_mut("temp_data") {
156 let _ = self.set_value_at_path(temp_data_obj, adjusted_path, &result);
157 }
158 } else if let Some(adjusted_path) = target_path.strip_prefix("metadata.")
159 && let Some(metadata_obj) = data.get_mut("metadata")
160 {
161 let _ = self.set_value_at_path(metadata_obj, adjusted_path, &result);
162 }
163 }
164 }
165
166 Ok((200, changes))
167 }
168
169 pub fn execute_validate(
171 &self,
172 message: &mut Message,
173 config: &ValidationConfig,
174 ) -> Result<(usize, Vec<Change>)> {
175 let data_json = json!({"data": &message.data});
177 let metadata_json = json!({"metadata": &message.metadata});
178 let temp_data_json = json!({"temp_data": &message.temp_data});
179
180 for rule in &config.rules {
184 let rule_logic = &rule.logic;
185 let rule_path = &rule.path;
186 let error_message = &rule.message;
187
188 let validation_result = if let Some(logic_index) = rule.logic_index {
190 if logic_index < self.logic_cache.len() {
191 let compiled_logic = &self.logic_cache[logic_index];
192 let data_to_validate = if rule_path == "data" || rule_path.starts_with("data.")
193 {
194 &data_json
195 } else if rule_path == "metadata" || rule_path.starts_with("metadata.") {
196 &metadata_json
197 } else {
198 &temp_data_json
199 };
200
201 if let Ok(data_val) = self.datalogic.parse_data_json(data_to_validate) {
202 self.datalogic
203 .evaluate(compiled_logic, data_val)
204 .map(|v| v.as_bool().unwrap_or(false))
205 .unwrap_or(false)
206 } else {
207 false
208 }
209 } else {
210 let data_to_validate = if rule_path == "data" || rule_path.starts_with("data.")
212 {
213 &data_json
214 } else if rule_path == "metadata" || rule_path.starts_with("metadata.") {
215 &metadata_json
216 } else {
217 &temp_data_json
218 };
219
220 self.datalogic
221 .evaluate_json(rule_logic, data_to_validate)
222 .map(|v| v.as_bool().unwrap_or(false))
223 .unwrap_or(false)
224 }
225 } else {
226 let data_to_validate = if rule_path == "data" || rule_path.starts_with("data.") {
228 &data_json
229 } else if rule_path == "metadata" || rule_path.starts_with("metadata.") {
230 &metadata_json
231 } else {
232 &temp_data_json
233 };
234
235 self.datalogic
236 .evaluate_json(rule_logic, data_to_validate)
237 .map(|v| v.as_bool().unwrap_or(false))
238 .unwrap_or(false)
239 };
240
241 if !validation_result {
242 debug!("Validation failed: {}", error_message);
243
244 message.errors.push(ErrorInfo::new(
246 None,
247 None,
248 DataflowError::Validation(error_message.clone()),
249 ));
250
251 self.record_validation_error(message, error_message);
253 }
254 }
255
256 let has_validation_errors = message
258 .temp_data
259 .get("validation_errors")
260 .and_then(|v| v.as_array())
261 .map(|arr| !arr.is_empty())
262 .unwrap_or(false);
263
264 let status = if has_validation_errors { 400 } else { 200 };
265 Ok((status, vec![]))
266 }
267
268 pub fn evaluate_logic(
270 &self,
271 logic_index: Option<usize>,
272 logic: &Value,
273 data: &Value,
274 ) -> Result<Value> {
275 if let Some(index) = logic_index {
276 debug!("Using compiled logic at index {}", index);
277 if index < self.logic_cache.len() {
278 let compiled_logic = &self.logic_cache[index];
279 let data_value = self.datalogic.parse_data_json(data).map_err(|e| {
280 DataflowError::LogicEvaluation(format!("Error parsing data: {}", e))
281 })?;
282
283 let result = self
284 .datalogic
285 .evaluate(compiled_logic, data_value)
286 .map_err(|e| {
287 DataflowError::LogicEvaluation(format!("Error evaluating logic: {}", e))
288 })?;
289
290 Ok(result.to_json())
291 } else {
292 Err(DataflowError::LogicEvaluation(format!(
293 "Logic index {} out of bounds",
294 index
295 )))
296 }
297 } else {
298 debug!("Evaluating logic directly (not compiled): {:?}", logic);
299 self.datalogic.evaluate_json(logic, data).map_err(|e| {
300 DataflowError::LogicEvaluation(format!("Error evaluating logic: {}", e))
301 })
302 }
303 }
304
305 pub fn evaluate_condition(
307 &self,
308 condition_index: Option<usize>,
309 condition: &Value,
310 data: &Value,
311 ) -> Result<bool> {
312 if let Value::Bool(b) = condition {
314 debug!("Evaluating simple boolean condition: {}", b);
315 return Ok(*b);
316 }
317
318 if let Some(index) = condition_index {
319 debug!("Using compiled logic at index {}", index);
320 if index < self.logic_cache.len() {
321 let logic = &self.logic_cache[index];
322 let data_value = self.datalogic.parse_data_json(data).map_err(|e| {
323 DataflowError::LogicEvaluation(format!("Error parsing data: {}", e))
324 })?;
325
326 let result = self
327 .datalogic
328 .evaluate(logic, data_value)
329 .map(|result| result.as_bool().unwrap_or(false))
330 .map_err(|e| {
331 DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
332 });
333 debug!("Compiled logic evaluation result: {:?}", result);
334 result
335 } else {
336 Err(DataflowError::LogicEvaluation(format!(
337 "Condition index {} out of bounds",
338 index
339 )))
340 }
341 } else {
342 debug!(
343 "Evaluating condition directly (not compiled): {:?}",
344 condition
345 );
346 let result = self
347 .datalogic
348 .evaluate_json(condition, data)
349 .map(|result| result.as_bool().unwrap_or(false))
350 .map_err(|e| {
351 DataflowError::LogicEvaluation(format!("Error evaluating condition: {}", e))
352 });
353 debug!("Direct evaluation result: {:?}", result);
354 result
355 }
356 }
357
358 fn resolve_target_path<'b>(
360 &self,
361 message: &'b mut Message,
362 target_path: &'b str,
363 ) -> (&'b mut Value, &'b str) {
364 if let Some(path) = target_path.strip_prefix("data.") {
365 (&mut message.data, path)
366 } else if let Some(path) = target_path.strip_prefix("metadata.") {
367 (&mut message.metadata, path)
368 } else if let Some(path) = target_path.strip_prefix("temp_data.") {
369 (&mut message.temp_data, path)
370 } else if target_path == "data" {
371 (&mut message.data, "")
372 } else if target_path == "metadata" {
373 (&mut message.metadata, "")
374 } else if target_path == "temp_data" {
375 (&mut message.temp_data, "")
376 } else {
377 (&mut message.data, target_path)
379 }
380 }
381
382 fn record_validation_error(&self, message: &mut Message, error_message: &str) {
384 if !message.temp_data.is_object() {
385 message.temp_data = json!({});
386 }
387 if let Some(obj) = message.temp_data.as_object_mut() {
388 if !obj.contains_key("validation_errors") {
389 obj.insert("validation_errors".to_string(), json!([]));
390 }
391 if let Some(errors_array) = obj
392 .get_mut("validation_errors")
393 .and_then(|v| v.as_array_mut())
394 {
395 errors_array.push(json!(error_message));
396 }
397 }
398 }
399
400 pub fn set_value_at_path(
402 &self,
403 target: &mut Value,
404 path: &str,
405 value: &Value,
406 ) -> Result<Value> {
407 let mut current = target;
408 let mut old_value = Value::Null;
409 let path_parts: Vec<&str> = path.split('.').collect();
410
411 fn is_numeric_index(s: &str) -> bool {
413 s.parse::<usize>().is_ok()
414 }
415
416 if path.is_empty() {
418 old_value = current.clone();
419 if let (Value::Object(current_obj), Value::Object(new_obj)) =
421 (current.clone(), value.clone())
422 {
423 let mut merged = current_obj;
424 for (key, val) in new_obj {
425 merged.insert(key, val);
426 }
427 *current = Value::Object(merged);
428 } else {
429 *current = value.clone();
431 }
432 return Ok(old_value);
433 }
434
435 for (i, part) in path_parts.iter().enumerate() {
437 let (part_clean, force_string) = if let Some(stripped) = part.strip_prefix('#') {
439 (stripped, true)
440 } else {
441 (*part, false)
442 };
443
444 let is_numeric = !force_string && is_numeric_index(part_clean);
446
447 if i == path_parts.len() - 1 {
448 if is_numeric {
450 if !current.is_array() {
452 *current = Value::Array(vec![]);
453 }
454
455 if let Ok(index) = part_clean.parse::<usize>()
456 && let Value::Array(arr) = current
457 {
458 while arr.len() <= index {
460 arr.push(Value::Null);
461 }
462 old_value = arr[index].clone();
463 arr[index] = value.clone();
464 }
465 } else {
466 if !current.is_object() {
468 *current = json!({});
469 }
470
471 if let Value::Object(map) = current {
472 old_value = map.get(part_clean).cloned().unwrap_or(Value::Null);
473
474 if let (Some(Value::Object(existing_obj)), Value::Object(new_obj)) =
476 (map.get(part_clean), value)
477 {
478 let mut merged = existing_obj.clone();
480 for (key, val) in new_obj {
481 merged.insert(key.clone(), val.clone());
482 }
483 map.insert(part_clean.to_string(), Value::Object(merged));
484 } else {
485 map.insert(part_clean.to_string(), value.clone());
487 }
488 }
489 }
490 } else {
491 if is_numeric {
493 if !current.is_array() {
495 *current = Value::Array(vec![]);
496 }
497
498 if let Ok(index) = part_clean.parse::<usize>()
499 && let Value::Array(arr) = current
500 {
501 while arr.len() <= index {
502 arr.push(Value::Null);
503 }
504 current = &mut arr[index];
505 }
506 } else {
507 if !current.is_object() {
509 *current = json!({});
510 }
511
512 if let Value::Object(map) = current {
513 if !map.contains_key(part_clean) {
514 let next_part = path_parts.get(i + 1).unwrap_or(&"");
516 let next_clean = if let Some(stripped) = next_part.strip_prefix('#') {
518 stripped
519 } else {
520 next_part
521 };
522 if is_numeric_index(next_clean) && !next_part.starts_with('#') {
523 map.insert(part_clean.to_string(), Value::Array(vec![]));
524 } else {
525 map.insert(part_clean.to_string(), json!({}));
526 }
527 }
528 current = map.get_mut(part_clean).unwrap();
529 }
530 }
531 }
532 }
533
534 Ok(old_value)
535 }
536}