dataflow_rs/engine/functions/
validation.rs1use crate::engine::error::{DataflowError, ErrorInfo, Result};
2use crate::engine::message::{Change, Message};
3use datalogic_rs::{CompiledLogic, DataLogic};
4use log::{debug, error};
5use serde::Deserialize;
6use serde_json::Value;
7use std::sync::Arc;
8
9#[derive(Debug, Clone, Deserialize)]
11pub struct ValidationConfig {
12 pub rules: Vec<ValidationRule>,
13}
14
15#[derive(Debug, Clone, Deserialize)]
16pub struct ValidationRule {
17 pub logic: Value,
18 pub path: String,
19 pub message: String,
20 #[serde(skip)]
21 pub logic_index: Option<usize>,
22}
23
24impl ValidationConfig {
25 pub fn from_json(input: &Value) -> Result<Self> {
26 let rules = input.get("rules").ok_or_else(|| {
27 DataflowError::Validation("Missing 'rules' array in input".to_string())
28 })?;
29
30 let rules_arr = rules
31 .as_array()
32 .ok_or_else(|| DataflowError::Validation("'rules' must be an array".to_string()))?;
33
34 let mut parsed_rules = Vec::new();
35
36 for rule in rules_arr {
37 let logic = rule
38 .get("logic")
39 .ok_or_else(|| DataflowError::Validation("Missing 'logic' in rule".to_string()))?
40 .clone();
41
42 let path = rule
43 .get("path")
44 .and_then(Value::as_str)
45 .unwrap_or("data")
46 .to_string();
47
48 let message = rule
49 .get("message")
50 .and_then(Value::as_str)
51 .unwrap_or("Validation failed")
52 .to_string();
53
54 parsed_rules.push(ValidationRule {
55 logic,
56 path,
57 message,
58 logic_index: None,
59 });
60 }
61
62 Ok(ValidationConfig {
63 rules: parsed_rules,
64 })
65 }
66
67 pub fn execute(
69 &self,
70 message: &mut Message,
71 datalogic: &Arc<DataLogic>,
72 logic_cache: &[Arc<CompiledLogic>],
73 ) -> Result<(usize, Vec<Change>)> {
74 let changes = Vec::new();
75 let mut validation_errors = Vec::new();
76
77 let context_arc = message.get_context_arc();
79
80 for (idx, rule) in self.rules.iter().enumerate() {
82 debug!("Processing validation rule {}: {}", idx, rule.message);
83
84 let compiled_logic = match rule.logic_index {
86 Some(index) => {
87 if index >= logic_cache.len() {
89 error!(
90 "Validation: Logic index {} out of bounds (cache size: {}) for rule at index {}",
91 index,
92 logic_cache.len(),
93 idx
94 );
95 validation_errors.push(ErrorInfo::simple_ref(
96 "COMPILATION_ERROR",
97 &format!(
98 "Logic index {} out of bounds for rule at index {}",
99 index, idx
100 ),
101 None,
102 ));
103 continue;
104 }
105 &logic_cache[index]
106 }
107 None => {
108 error!(
109 "Validation: Logic not compiled (no index) for rule at index {}",
110 idx
111 );
112 validation_errors.push(ErrorInfo::simple_ref(
113 "COMPILATION_ERROR",
114 &format!("Logic not compiled for rule at index: {}", idx),
115 None,
116 ));
117 continue;
118 }
119 };
120
121 let result = datalogic.evaluate(compiled_logic, Arc::clone(&context_arc));
124
125 match result {
126 Ok(value) => {
127 if value != Value::Bool(true) {
129 debug!("Validation failed for rule {}: {}", idx, rule.message);
130 validation_errors.push(ErrorInfo::simple_ref(
131 "VALIDATION_ERROR",
132 &rule.message,
133 Some(&rule.path),
134 ));
135 } else {
136 debug!("Validation passed for rule {}", idx);
137 }
138 }
139 Err(e) => {
140 error!("Validation: Error evaluating rule {}: {:?}", idx, e);
141 validation_errors.push(ErrorInfo::simple_ref(
142 "EVALUATION_ERROR",
143 &format!("Failed to evaluate rule {}: {}", idx, e),
144 None,
145 ));
146 }
147 }
148 }
149
150 if !validation_errors.is_empty() {
152 message.errors.extend(validation_errors);
153 Ok((400, changes)) } else {
155 Ok((200, changes))
156 }
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use serde_json::json;
164
165 #[test]
166 fn test_validation_config_from_json() {
167 let input = json!({
168 "rules": [
169 {
170 "logic": {"!!": [{"var": "data.required_field"}]},
171 "path": "data",
172 "message": "Required field is missing"
173 },
174 {
175 "logic": {">": [{"var": "data.age"}, 18]},
176 "message": "Must be over 18"
177 }
178 ]
179 });
180
181 let config = ValidationConfig::from_json(&input).unwrap();
182 assert_eq!(config.rules.len(), 2);
183 assert_eq!(config.rules[0].path, "data");
184 assert_eq!(config.rules[0].message, "Required field is missing");
185 assert_eq!(config.rules[1].path, "data"); assert_eq!(config.rules[1].message, "Must be over 18");
187 }
188
189 #[test]
190 fn test_validation_config_missing_rules() {
191 let input = json!({});
192 let result = ValidationConfig::from_json(&input);
193 assert!(result.is_err());
194 }
195
196 #[test]
197 fn test_validation_config_invalid_rules() {
198 let input = json!({
199 "rules": "not_an_array"
200 });
201 let result = ValidationConfig::from_json(&input);
202 assert!(result.is_err());
203 }
204
205 #[test]
206 fn test_validation_config_missing_logic() {
207 let input = json!({
208 "rules": [
209 {
210 "path": "data",
211 "message": "Some error"
212 }
213 ]
214 });
215 let result = ValidationConfig::from_json(&input);
216 assert!(result.is_err());
217 }
218
219 #[test]
220 fn test_validation_config_defaults() {
221 let input = json!({
222 "rules": [
223 {
224 "logic": {"var": "data.field"}
225 }
226 ]
227 });
228
229 let config = ValidationConfig::from_json(&input).unwrap();
230 assert_eq!(config.rules[0].path, "data");
231 assert_eq!(config.rules[0].message, "Validation failed");
232 }
233}