dataflow_rs/engine/functions/
validation.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use serde_json::{json, Value};
7use std::vec;
8pub struct ValidationFunction {
13 }
15
16unsafe impl Send for ValidationFunction {}
21unsafe impl Sync for ValidationFunction {}
22
23impl Default for ValidationFunction {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl ValidationFunction {
30 pub fn new() -> Self {
31 Self { }
32 }
33}
34
35#[async_trait]
36impl AsyncFunctionHandler for ValidationFunction {
37 async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
38 let rules = input
40 .get("rules")
41 .ok_or_else(|| DataflowError::Validation("Missing rules array".to_string()))?;
42
43 let validation_result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
45 let mut data_logic = data_logic_cell.borrow_mut();
46 data_logic.reset_arena();
47
48 if let Some(rules_arr) = rules.as_array() {
49 for rule in rules_arr {
50 let rule_logic = rule.get("logic").ok_or_else(|| {
52 DataflowError::Validation("Missing logic in rule".to_string())
53 })?;
54
55 let rule_path = rule.get("path").and_then(Value::as_str).unwrap_or("data");
56
57 let data_to_validate = if rule_path == "data" {
58 &json!({rule_path: message.data})
59 } else if rule_path == "metadata" {
60 &json!({rule_path: message.metadata})
61 } else if rule_path == "temp_data" {
62 &json!({rule_path: message.temp_data})
63 } else {
64 &json!({rule_path: message.data})
65 };
66
67 match data_logic.evaluate_json(rule_logic, data_to_validate, None) {
69 Ok(v) => {
70 if !v.as_bool().unwrap_or(false) {
71 let message_key = rule
72 .get("message")
73 .and_then(Value::as_str)
74 .unwrap_or("Validation failed")
75 .to_string();
76
77 println!("Validation failed: {}", message_key);
78 return Ok((400, vec![]));
79 }
80 }
81 Err(e) => {
82 println!("Error evaluating rule: {}", e);
83 return Err(DataflowError::LogicEvaluation(format!(
84 "Error evaluating rule: {}",
85 e
86 )));
87 }
88 }
89 }
90 }
91
92 let changes = if message.temp_data.get("validation").is_some()
95 && !message.temp_data["validation"].is_null()
96 {
97 vec![Change {
98 path: "temp_data.validation".to_string(),
99 old_value: Value::Null,
100 new_value: message.temp_data["validation"].clone(),
101 }]
102 } else {
103 vec![]
105 };
106
107 Ok((200, changes))
108 });
109
110 validation_result
111 }
112}