dataflow_rs/engine/functions/
validation.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use serde_json::{json, Value};
7use std::vec;
8/// A validation task function that uses JsonLogic expressions to validate message data.
9///
10/// This function executes validation rules defined in JsonLogic against data in the message,
11/// and reports validation failures by adding errors to the message's errors array.
12pub struct ValidationFunction {
13    // No longer needs data_logic field
14}
15
16// SAFETY: These implementations are technically unsound because DataLogic contains
17// RefCell and Cell which are not thread-safe. In practice, we'll ensure that
18// ValidationTask is only used in a single-threaded context, or we'll use thread-local
19// instances of DataLogic.
20unsafe impl Send for ValidationFunction {}
21unsafe impl Sync for ValidationFunction {}
22
23impl Default for ValidationFunction {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl ValidationFunction {
30    pub fn new() -> Self {
31        Self { /* no fields */ }
32    }
33}
34
35#[async_trait]
36impl AsyncFunctionHandler for ValidationFunction {
37    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
38        // Extract rules from input
39        let rules = input
40            .get("rules")
41            .ok_or_else(|| DataflowError::Validation("Missing rules array".to_string()))?;
42
43        // Use thread-local DataLogic
44        let validation_result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
45            let mut data_logic = data_logic_cell.borrow_mut();
46            data_logic.reset_arena();
47
48            if let Some(rules_arr) = rules.as_array() {
49                for rule in rules_arr {
50                    // Extract rule components
51                    let rule_logic = rule.get("logic").ok_or_else(|| {
52                        DataflowError::Validation("Missing logic in rule".to_string())
53                    })?;
54
55                    let rule_path = rule.get("path").and_then(Value::as_str).unwrap_or("data");
56
57                    let data_to_validate = if rule_path == "data" {
58                        &json!({rule_path: message.data})
59                    } else if rule_path == "metadata" {
60                        &json!({rule_path: message.metadata})
61                    } else if rule_path == "temp_data" {
62                        &json!({rule_path: message.temp_data})
63                    } else {
64                        &json!({rule_path: message.data})
65                    };
66
67                    // Evaluate the rule
68                    match data_logic.evaluate_json(rule_logic, data_to_validate, None) {
69                        Ok(v) => {
70                            if !v.as_bool().unwrap_or(false) {
71                                let message_key = rule
72                                    .get("message")
73                                    .and_then(Value::as_str)
74                                    .unwrap_or("Validation failed")
75                                    .to_string();
76
77                                println!("Validation failed: {}", message_key);
78                                return Ok((400, vec![]));
79                            }
80                        }
81                        Err(e) => {
82                            println!("Error evaluating rule: {}", e);
83                            return Err(DataflowError::LogicEvaluation(format!(
84                                "Error evaluating rule: {}",
85                                e
86                            )));
87                        }
88                    }
89                }
90            }
91
92            // Only create changes if there are actual validation results to report
93            // Don't create a change from null to null as this causes duplicate audit entries
94            let changes = if message.temp_data.get("validation").is_some()
95                && !message.temp_data["validation"].is_null()
96            {
97                vec![Change {
98                    path: "temp_data.validation".to_string(),
99                    old_value: Value::Null,
100                    new_value: message.temp_data["validation"].clone(),
101                }]
102            } else {
103                // No validation results to record - return empty changes
104                vec![]
105            };
106
107            Ok((200, changes))
108        });
109
110        validation_result
111    }
112}