dataflow_rs/engine/functions/
validation.rs1use crate::engine::error::{DataflowError, ErrorInfo, Result};
35use crate::engine::message::{Change, Message};
36use datalogic_rs::{CompiledLogic, DataLogic};
37use log::{debug, error};
38use serde::Deserialize;
39use serde_json::Value;
40use std::sync::Arc;
41
42#[derive(Debug, Clone, Deserialize)]
48pub struct ValidationConfig {
49 pub rules: Vec<ValidationRule>,
51}
52
53#[derive(Debug, Clone, Deserialize)]
58pub struct ValidationRule {
59 pub logic: Value,
62
63 pub message: String,
66
67 #[serde(skip)]
69 pub logic_index: Option<usize>,
70}
71
72impl ValidationConfig {
73 pub fn from_json(input: &Value) -> Result<Self> {
84 let rules = input.get("rules").ok_or_else(|| {
85 DataflowError::Validation("Missing 'rules' array in input".to_string())
86 })?;
87
88 let rules_arr = rules
89 .as_array()
90 .ok_or_else(|| DataflowError::Validation("'rules' must be an array".to_string()))?;
91
92 let mut parsed_rules = Vec::new();
93
94 for rule in rules_arr {
95 let logic = rule
96 .get("logic")
97 .ok_or_else(|| DataflowError::Validation("Missing 'logic' in rule".to_string()))?
98 .clone();
99
100 let message = rule
101 .get("message")
102 .and_then(Value::as_str)
103 .unwrap_or("Validation failed")
104 .to_string();
105
106 parsed_rules.push(ValidationRule {
107 logic,
108 message,
109 logic_index: None,
110 });
111 }
112
113 Ok(ValidationConfig {
114 rules: parsed_rules,
115 })
116 }
117
118 pub fn execute(
138 &self,
139 message: &mut Message,
140 datalogic: &Arc<DataLogic>,
141 logic_cache: &[Arc<CompiledLogic>],
142 ) -> Result<(usize, Vec<Change>)> {
143 let changes = Vec::new();
144 let mut validation_errors = Vec::new();
145
146 let context_arc = message.get_context_arc();
148
149 for (idx, rule) in self.rules.iter().enumerate() {
151 debug!("Processing validation rule {}: {}", idx, rule.message);
152
153 let compiled_logic = match rule.logic_index {
155 Some(index) => {
156 if index >= logic_cache.len() {
158 error!(
159 "Validation: Logic index {} out of bounds (cache size: {}) for rule at index {}",
160 index,
161 logic_cache.len(),
162 idx
163 );
164 validation_errors.push(ErrorInfo::simple_ref(
165 "COMPILATION_ERROR",
166 &format!(
167 "Logic index {} out of bounds for rule at index {}",
168 index, idx
169 ),
170 None,
171 ));
172 continue;
173 }
174 &logic_cache[index]
175 }
176 None => {
177 error!(
178 "Validation: Logic not compiled (no index) for rule at index {}",
179 idx
180 );
181 validation_errors.push(ErrorInfo::simple_ref(
182 "COMPILATION_ERROR",
183 &format!("Logic not compiled for rule at index: {}", idx),
184 None,
185 ));
186 continue;
187 }
188 };
189
190 let result = datalogic.evaluate(compiled_logic, Arc::clone(&context_arc));
193
194 match result {
195 Ok(value) => {
196 if value != Value::Bool(true) {
198 debug!("Validation failed for rule {}: {}", idx, rule.message);
199 validation_errors.push(ErrorInfo::simple_ref(
200 "VALIDATION_ERROR",
201 &rule.message,
202 None,
203 ));
204 } else {
205 debug!("Validation passed for rule {}", idx);
206 }
207 }
208 Err(e) => {
209 error!("Validation: Error evaluating rule {}: {:?}", idx, e);
210 validation_errors.push(ErrorInfo::simple_ref(
211 "EVALUATION_ERROR",
212 &format!("Failed to evaluate rule {}: {}", idx, e),
213 None,
214 ));
215 }
216 }
217 }
218
219 if !validation_errors.is_empty() {
221 message.errors.extend(validation_errors);
222 Ok((400, changes)) } else {
224 Ok((200, changes))
225 }
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use serde_json::json;
233
234 #[test]
235 fn test_validation_config_from_json() {
236 let input = json!({
237 "rules": [
238 {
239 "logic": {"!!": [{"var": "data.required_field"}]},
240 "path": "data",
241 "message": "Required field is missing"
242 },
243 {
244 "logic": {">": [{"var": "data.age"}, 18]},
245 "message": "Must be over 18"
246 }
247 ]
248 });
249
250 let config = ValidationConfig::from_json(&input).unwrap();
251 assert_eq!(config.rules.len(), 2);
252 assert_eq!(config.rules[0].message, "Required field is missing");
253 assert_eq!(config.rules[1].message, "Must be over 18");
254 }
255
256 #[test]
257 fn test_validation_config_missing_rules() {
258 let input = json!({});
259 let result = ValidationConfig::from_json(&input);
260 assert!(result.is_err());
261 }
262
263 #[test]
264 fn test_validation_config_invalid_rules() {
265 let input = json!({
266 "rules": "not_an_array"
267 });
268 let result = ValidationConfig::from_json(&input);
269 assert!(result.is_err());
270 }
271
272 #[test]
273 fn test_validation_config_missing_logic() {
274 let input = json!({
275 "rules": [
276 {
277 "path": "data",
278 "message": "Some error"
279 }
280 ]
281 });
282 let result = ValidationConfig::from_json(&input);
283 assert!(result.is_err());
284 }
285
286 #[test]
287 fn test_validation_config_defaults() {
288 let input = json!({
289 "rules": [
290 {
291 "logic": {"var": "data.field"}
292 }
293 ]
294 });
295
296 let config = ValidationConfig::from_json(&input).unwrap();
297 assert_eq!(config.rules[0].message, "Validation failed");
298 }
299
300 #[test]
301 fn test_validation_execute_passes() {
302 use crate::engine::message::Message;
303
304 let datalogic = Arc::new(DataLogic::with_preserve_structure());
305
306 let mut message = Message::new(Arc::new(json!({})));
308 message.context["data"] = json!({
309 "email": "test@example.com",
310 "age": 25
311 });
312
313 let mut config = ValidationConfig {
315 rules: vec![
316 ValidationRule {
317 logic: json!({"!!": [{"var": "data.email"}]}),
318 message: "Email is required".to_string(),
319 logic_index: None,
320 },
321 ValidationRule {
322 logic: json!({">": [{"var": "data.age"}, 18]}),
323 message: "Must be over 18".to_string(),
324 logic_index: None,
325 },
326 ],
327 };
328
329 let mut logic_cache = Vec::new();
331 for (i, rule) in config.rules.iter_mut().enumerate() {
332 logic_cache.push(datalogic.compile(&rule.logic).unwrap());
333 rule.logic_index = Some(i);
334 }
335
336 let result = config.execute(&mut message, &datalogic, &logic_cache);
338 assert!(result.is_ok());
339
340 let (status, changes) = result.unwrap();
341 assert_eq!(status, 200);
342 assert!(changes.is_empty()); assert!(message.errors.is_empty()); }
345
346 #[test]
347 fn test_validation_execute_fails() {
348 use crate::engine::message::Message;
349
350 let datalogic = Arc::new(DataLogic::with_preserve_structure());
351
352 let mut message = Message::new(Arc::new(json!({})));
354 message.context["data"] = json!({
355 "age": 15 });
357
358 let mut config = ValidationConfig {
360 rules: vec![
361 ValidationRule {
362 logic: json!({"!!": [{"var": "data.email"}]}),
363 message: "Email is required".to_string(),
364 logic_index: None,
365 },
366 ValidationRule {
367 logic: json!({">": [{"var": "data.age"}, 18]}),
368 message: "Must be over 18".to_string(),
369 logic_index: None,
370 },
371 ],
372 };
373
374 let mut logic_cache = Vec::new();
376 for (i, rule) in config.rules.iter_mut().enumerate() {
377 logic_cache.push(datalogic.compile(&rule.logic).unwrap());
378 rule.logic_index = Some(i);
379 }
380
381 let result = config.execute(&mut message, &datalogic, &logic_cache);
383 assert!(result.is_ok());
384
385 let (status, _changes) = result.unwrap();
386 assert_eq!(status, 400); assert_eq!(message.errors.len(), 2); let error_messages: Vec<&str> = message.errors.iter().map(|e| e.message.as_str()).collect();
391 assert!(error_messages.contains(&"Email is required"));
392 assert!(error_messages.contains(&"Must be over 18"));
393 }
394
395 #[test]
396 fn test_validation_uncompiled_logic() {
397 use crate::engine::message::Message;
398
399 let datalogic = Arc::new(DataLogic::with_preserve_structure());
400
401 let mut message = Message::new(Arc::new(json!({})));
402
403 let config = ValidationConfig {
405 rules: vec![ValidationRule {
406 logic: json!(true),
407 message: "Test".to_string(),
408 logic_index: None, }],
410 };
411
412 let logic_cache = Vec::new();
413 let result = config.execute(&mut message, &datalogic, &logic_cache);
414 assert!(result.is_ok());
415
416 let (status, _) = result.unwrap();
417 assert_eq!(status, 400); assert!(!message.errors.is_empty());
419 assert!(message.errors[0].code == "COMPILATION_ERROR");
420 }
421}