dataflow_rs/engine/functions/
validation.rs1use crate::engine::error::{DataflowError, ErrorInfo, Result};
35use crate::engine::executor::{ArenaContext, with_arena};
36use crate::engine::message::{Change, Message};
37use crate::engine::task_outcome::TaskOutcome;
38use datalogic_rs::{Engine, Logic};
39use datavalue::DataValue;
40use log::{debug, error};
41use serde::Deserialize;
42use serde_json::Value;
43use std::sync::Arc;
44
45#[derive(Debug, Clone, Deserialize)]
51pub struct ValidationConfig {
52 pub rules: Vec<ValidationRule>,
54}
55
56#[derive(Debug, Clone, Deserialize)]
61pub struct ValidationRule {
62 pub logic: Value,
65
66 pub message: String,
69
70 #[serde(skip)]
73 pub compiled_logic: Option<Arc<Logic>>,
74}
75
76impl ValidationConfig {
77 pub fn from_json(input: &Value) -> Result<Self> {
88 let rules = input.get("rules").ok_or_else(|| {
89 DataflowError::Validation("Missing 'rules' array in input".to_string())
90 })?;
91
92 let rules_arr = rules
93 .as_array()
94 .ok_or_else(|| DataflowError::Validation("'rules' must be an array".to_string()))?;
95
96 let mut parsed_rules = Vec::new();
97
98 for rule in rules_arr {
99 let logic = rule
100 .get("logic")
101 .ok_or_else(|| DataflowError::Validation("Missing 'logic' in rule".to_string()))?
102 .clone();
103
104 let message = rule
105 .get("message")
106 .and_then(Value::as_str)
107 .unwrap_or("Validation failed")
108 .to_string();
109
110 parsed_rules.push(ValidationRule {
111 logic,
112 message,
113 compiled_logic: None,
114 });
115 }
116
117 Ok(ValidationConfig {
118 rules: parsed_rules,
119 })
120 }
121
122 pub fn execute(
136 &self,
137 message: &mut Message,
138 engine: &Arc<Engine>,
139 ) -> Result<(TaskOutcome, Vec<Change>)> {
140 with_arena(|arena| {
145 let ctx_av: DataValue<'_> = message.context.to_arena(arena);
146 self.run_rules(message, ctx_av, arena, engine)
147 })
148 }
149
150 pub(crate) fn execute_in_arena(
155 &self,
156 message: &mut Message,
157 arena_ctx: &mut ArenaContext<'_>,
158 engine: &Arc<Engine>,
159 ) -> Result<(TaskOutcome, Vec<Change>)> {
160 let arena = arena_ctx.arena();
161 let ctx_av = arena_ctx.as_data_value();
162 self.run_rules(message, ctx_av, arena, engine)
163 }
164
165 fn run_rules(
168 &self,
169 message: &mut Message,
170 ctx_av: DataValue<'_>,
171 arena: &bumpalo::Bump,
172 engine: &Arc<Engine>,
173 ) -> Result<(TaskOutcome, Vec<Change>)> {
174 let changes = Vec::new();
175 let mut validation_errors = Vec::new();
176
177 for (idx, rule) in self.rules.iter().enumerate() {
178 debug!("Processing validation rule {}: {}", idx, rule.message);
179
180 let compiled_logic = match &rule.compiled_logic {
181 Some(logic) => logic,
182 None => {
183 error!("Validation: Logic not compiled for rule at index {}", idx);
184 validation_errors.push(ErrorInfo::simple_ref(
185 "COMPILATION_ERROR",
186 &format!("Logic not compiled for rule at index: {}", idx),
187 None,
188 ));
189 continue;
190 }
191 };
192
193 match engine.evaluate(compiled_logic, ctx_av, arena) {
198 Ok(value) => {
199 if !matches!(value, DataValue::Bool(true)) {
200 debug!("Validation failed for rule {}: {}", idx, rule.message);
201 validation_errors.push(ErrorInfo::simple_ref(
202 "VALIDATION_ERROR",
203 &rule.message,
204 None,
205 ));
206 } else {
207 debug!("Validation passed for rule {}", idx);
208 }
209 }
210 Err(e) => {
211 error!("Validation: Error evaluating rule {}: {:?}", idx, e);
212 validation_errors.push(ErrorInfo::simple_ref(
213 "EVALUATION_ERROR",
214 &format!("Failed to evaluate rule {}: {}", idx, e),
215 None,
216 ));
217 }
218 }
219 }
220
221 if !validation_errors.is_empty() {
222 message.errors.extend(validation_errors);
223 Ok((TaskOutcome::Status(400), changes))
224 } else {
225 Ok((TaskOutcome::Success, changes))
226 }
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use datavalue::OwnedDataValue;
234 use serde_json::json;
235
236 #[test]
237 fn test_validation_config_from_json() {
238 let input = json!({
239 "rules": [
240 {
241 "logic": {"!!": [{"var": "data.required_field"}]},
242 "path": "data",
243 "message": "Required field is missing"
244 },
245 {
246 "logic": {">": [{"var": "data.age"}, 18]},
247 "message": "Must be over 18"
248 }
249 ]
250 });
251
252 let config = ValidationConfig::from_json(&input).unwrap();
253 assert_eq!(config.rules.len(), 2);
254 assert_eq!(config.rules[0].message, "Required field is missing");
255 assert_eq!(config.rules[1].message, "Must be over 18");
256 }
257
258 #[test]
259 fn test_validation_config_missing_rules() {
260 let input = json!({});
261 let result = ValidationConfig::from_json(&input);
262 assert!(result.is_err());
263 }
264
265 #[test]
266 fn test_validation_config_invalid_rules() {
267 let input = json!({
268 "rules": "not_an_array"
269 });
270 let result = ValidationConfig::from_json(&input);
271 assert!(result.is_err());
272 }
273
274 #[test]
275 fn test_validation_config_missing_logic() {
276 let input = json!({
277 "rules": [
278 {
279 "path": "data",
280 "message": "Some error"
281 }
282 ]
283 });
284 let result = ValidationConfig::from_json(&input);
285 assert!(result.is_err());
286 }
287
288 #[test]
289 fn test_validation_config_defaults() {
290 let input = json!({
291 "rules": [
292 {
293 "logic": {"var": "data.field"}
294 }
295 ]
296 });
297
298 let config = ValidationConfig::from_json(&input).unwrap();
299 assert_eq!(config.rules[0].message, "Validation failed");
300 }
301
302 fn dv(v: serde_json::Value) -> OwnedDataValue {
303 OwnedDataValue::from(&v)
304 }
305
306 fn message_with_data(initial: serde_json::Value) -> crate::engine::message::Message {
307 use crate::engine::message::Message;
308 use crate::engine::utils::set_nested_value;
309 let mut m = Message::new(Arc::new(dv(json!({}))));
310 set_nested_value(&mut m.context, "data", dv(initial));
311 m
312 }
313
314 fn compile_rules(engine: &Arc<Engine>, config: &mut ValidationConfig) {
317 for rule in &mut config.rules {
318 rule.compiled_logic = Some(engine.compile_arc(&rule.logic).unwrap());
319 }
320 }
321
322 #[test]
323 fn test_validation_execute_passes() {
324 let engine = Arc::new(Engine::builder().with_templating(true).build());
325
326 let mut message = message_with_data(json!({
327 "email": "test@example.com",
328 "age": 25
329 }));
330
331 let mut config = ValidationConfig {
332 rules: vec![
333 ValidationRule {
334 logic: json!({"!!": [{"var": "data.email"}]}),
335 message: "Email is required".to_string(),
336 compiled_logic: None,
337 },
338 ValidationRule {
339 logic: json!({">": [{"var": "data.age"}, 18]}),
340 message: "Must be over 18".to_string(),
341 compiled_logic: None,
342 },
343 ],
344 };
345 compile_rules(&engine, &mut config);
346
347 let result = config.execute(&mut message, &engine);
348 assert!(result.is_ok());
349
350 let (outcome, changes) = result.unwrap();
351 assert_eq!(outcome, TaskOutcome::Success);
352 assert!(changes.is_empty());
353 assert!(message.errors.is_empty());
354 }
355
356 #[test]
357 fn test_validation_execute_fails() {
358 let engine = Arc::new(Engine::builder().with_templating(true).build());
359
360 let mut message = message_with_data(json!({ "age": 15 }));
361
362 let mut config = ValidationConfig {
363 rules: vec![
364 ValidationRule {
365 logic: json!({"!!": [{"var": "data.email"}]}),
366 message: "Email is required".to_string(),
367 compiled_logic: None,
368 },
369 ValidationRule {
370 logic: json!({">": [{"var": "data.age"}, 18]}),
371 message: "Must be over 18".to_string(),
372 compiled_logic: None,
373 },
374 ],
375 };
376 compile_rules(&engine, &mut config);
377
378 let result = config.execute(&mut message, &engine);
379 assert!(result.is_ok());
380
381 let (outcome, _changes) = result.unwrap();
382 assert_eq!(outcome, TaskOutcome::Status(400));
383 assert_eq!(message.errors.len(), 2);
384
385 let error_messages: Vec<&str> = message.errors.iter().map(|e| e.message.as_str()).collect();
386 assert!(error_messages.contains(&"Email is required"));
387 assert!(error_messages.contains(&"Must be over 18"));
388 }
389
390 #[test]
391 fn test_validation_uncompiled_logic() {
392 use crate::engine::message::Message;
393
394 let engine = Arc::new(Engine::builder().with_templating(true).build());
395
396 let mut message = Message::new(Arc::new(dv(json!({}))));
397
398 let config = ValidationConfig {
399 rules: vec![ValidationRule {
400 logic: json!(true),
401 message: "Test".to_string(),
402 compiled_logic: None,
403 }],
404 };
405
406 let result = config.execute(&mut message, &engine);
407 assert!(result.is_ok());
408
409 let (outcome, _) = result.unwrap();
410 assert_eq!(outcome, TaskOutcome::Status(400));
411 assert!(!message.errors.is_empty());
412 assert!(message.errors[0].code == "COMPILATION_ERROR");
413 }
414}