Skip to main content

dataflow_rs/engine/functions/
validation.rs

1//! # Validation Function Module
2//!
3//! This module provides rule-based validation capabilities using JSONLogic expressions.
4//! The validation function evaluates a set of rules against message data and collects
5//! any validation errors that occur.
6//!
7//! ## Features
8//!
9//! - Define validation rules using JSONLogic expressions
10//! - Custom error messages for each rule
11//! - Non-destructive: validation is read-only and doesn't modify message data
12//! - Errors are collected in the message's error list
13//!
14//! ## Example Usage
15//!
16//! ```json
17//! {
18//!     "name": "validation",
19//!     "input": {
20//!         "rules": [
21//!             {
22//!                 "logic": {"!!": [{"var": "data.email"}]},
23//!                 "message": "Email is required"
24//!             },
25//!             {
26//!                 "logic": {">": [{"var": "data.age"}, 0]},
27//!                 "message": "Age must be positive"
28//!             }
29//!         ]
30//!     }
31//! }
32//! ```
33
34use crate::engine::error::{DataflowError, ErrorInfo, Result};
35use crate::engine::executor::{ArenaContext, with_arena};
36use crate::engine::message::{Change, Message};
37use crate::engine::task_outcome::TaskOutcome;
38use datalogic_rs::{Engine, Logic};
39use datavalue::DataValue;
40use log::{debug, error};
41use serde::Deserialize;
42use serde_json::Value;
43use std::sync::Arc;
44
45/// Configuration for the validation function containing a list of rules.
46///
47/// Each rule specifies a JSONLogic condition that must evaluate to `true`
48/// for the validation to pass. If a rule evaluates to anything other than
49/// `true`, its error message is added to the message's error list.
50#[derive(Debug, Clone, Deserialize)]
51pub struct ValidationConfig {
52    /// List of validation rules to evaluate.
53    pub rules: Vec<ValidationRule>,
54}
55
56/// A single validation rule with a condition and error message.
57///
58/// The rule's logic is evaluated against the message context. If it does not
59/// return exactly `true`, the validation fails and the error message is recorded.
60#[derive(Debug, Clone, Deserialize)]
61pub struct ValidationRule {
62    /// JSONLogic expression that must evaluate to `true` for validation to pass.
63    /// Any other result (false, null, etc.) is considered a validation failure.
64    pub logic: Value,
65
66    /// Error message to display if validation fails.
67    /// Defaults to "Validation failed" if not specified.
68    pub message: String,
69
70    /// Pre-compiled JSONLogic, populated by `LogicCompiler`. `None` is
71    /// recorded as a `COMPILATION_ERROR` at execute time.
72    #[serde(skip)]
73    pub compiled_logic: Option<Arc<Logic>>,
74}
75
76impl ValidationConfig {
77    /// Parses a `ValidationConfig` from a JSON value.
78    ///
79    /// # Arguments
80    /// * `input` - JSON object containing a "rules" array
81    ///
82    /// # Errors
83    /// Returns `DataflowError::Validation` if:
84    /// - The "rules" field is missing
85    /// - The "rules" field is not an array
86    /// - Any rule is missing the "logic" field
87    pub fn from_json(input: &Value) -> Result<Self> {
88        let rules = input.get("rules").ok_or_else(|| {
89            DataflowError::Validation("Missing 'rules' array in input".to_string())
90        })?;
91
92        let rules_arr = rules
93            .as_array()
94            .ok_or_else(|| DataflowError::Validation("'rules' must be an array".to_string()))?;
95
96        let mut parsed_rules = Vec::new();
97
98        for rule in rules_arr {
99            let logic = rule
100                .get("logic")
101                .ok_or_else(|| DataflowError::Validation("Missing 'logic' in rule".to_string()))?
102                .clone();
103
104            let message = rule
105                .get("message")
106                .and_then(Value::as_str)
107                .unwrap_or("Validation failed")
108                .to_string();
109
110            parsed_rules.push(ValidationRule {
111                logic,
112                message,
113                compiled_logic: None,
114            });
115        }
116
117        Ok(ValidationConfig {
118            rules: parsed_rules,
119        })
120    }
121
122    /// Executes all validation rules using pre-compiled logic.
123    ///
124    /// Evaluates each rule sequentially against the message context.
125    /// This is a read-only operation that does not modify message data.
126    ///
127    /// # Arguments
128    /// * `message` - The message to validate (errors are added to its error list)
129    /// * `engine` - Datalogic v5 engine for evaluation
130    ///
131    /// # Returns
132    /// * `Ok((TaskOutcome::Success, []))` — all rules passed
133    /// * `Ok((TaskOutcome::Status(400), []))` — one or more rules failed,
134    ///   `ErrorInfo` entries pushed onto `message.errors`
135    pub fn execute(
136        &self,
137        message: &mut Message,
138        engine: &Arc<Engine>,
139    ) -> Result<(TaskOutcome, Vec<Change>)> {
140        // Default path: open the arena and convert context once for this
141        // task call. When called from the workflow-level sync-stretch
142        // executor (`execute_in_arena`), the conversion is reused across
143        // multiple tasks in the same stretch.
144        with_arena(|arena| {
145            let ctx_av: DataValue<'_> = message.context.to_arena(arena);
146            self.run_rules(message, ctx_av, arena, engine)
147        })
148    }
149
150    /// Run validation rules against an externally-provided `ArenaContext`.
151    /// Reuses the cached arena form built by an earlier task in the same
152    /// workflow sync stretch — the heavy `data.input` subtree stays cached
153    /// across the parse_json → map → validation pipeline.
154    pub(crate) fn execute_in_arena(
155        &self,
156        message: &mut Message,
157        arena_ctx: &mut ArenaContext<'_>,
158        engine: &Arc<Engine>,
159    ) -> Result<(TaskOutcome, Vec<Change>)> {
160        let arena = arena_ctx.arena();
161        let ctx_av = arena_ctx.as_data_value();
162        self.run_rules(message, ctx_av, arena, engine)
163    }
164
165    /// Shared inner loop: evaluate each rule against `ctx_av` and record
166    /// `ErrorInfo` entries for any failures.
167    fn run_rules(
168        &self,
169        message: &mut Message,
170        ctx_av: DataValue<'_>,
171        arena: &bumpalo::Bump,
172        engine: &Arc<Engine>,
173    ) -> Result<(TaskOutcome, Vec<Change>)> {
174        let changes = Vec::new();
175        let mut validation_errors = Vec::new();
176
177        for (idx, rule) in self.rules.iter().enumerate() {
178            debug!("Processing validation rule {}: {}", idx, rule.message);
179
180            let compiled_logic = match &rule.compiled_logic {
181                Some(logic) => logic,
182                None => {
183                    error!("Validation: Logic not compiled for rule at index {}", idx);
184                    validation_errors.push(ErrorInfo::simple_ref(
185                        "COMPILATION_ERROR",
186                        &format!("Logic not compiled for rule at index: {}", idx),
187                        None,
188                    ));
189                    continue;
190                }
191            };
192
193            // Reuse the pre-converted `ctx_av` (DataValue is Copy). The
194            // result is `&DataValue<'_>` borrowed from the arena — we
195            // only need to peek at the discriminant so we skip the
196            // `to_owned()` deep-clone too.
197            match engine.evaluate(compiled_logic, ctx_av, arena) {
198                Ok(value) => {
199                    if !matches!(value, DataValue::Bool(true)) {
200                        debug!("Validation failed for rule {}: {}", idx, rule.message);
201                        validation_errors.push(ErrorInfo::simple_ref(
202                            "VALIDATION_ERROR",
203                            &rule.message,
204                            None,
205                        ));
206                    } else {
207                        debug!("Validation passed for rule {}", idx);
208                    }
209                }
210                Err(e) => {
211                    error!("Validation: Error evaluating rule {}: {:?}", idx, e);
212                    validation_errors.push(ErrorInfo::simple_ref(
213                        "EVALUATION_ERROR",
214                        &format!("Failed to evaluate rule {}: {}", idx, e),
215                        None,
216                    ));
217                }
218            }
219        }
220
221        if !validation_errors.is_empty() {
222            message.errors.extend(validation_errors);
223            Ok((TaskOutcome::Status(400), changes))
224        } else {
225            Ok((TaskOutcome::Success, changes))
226        }
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use datavalue::OwnedDataValue;
234    use serde_json::json;
235
236    #[test]
237    fn test_validation_config_from_json() {
238        let input = json!({
239            "rules": [
240                {
241                    "logic": {"!!": [{"var": "data.required_field"}]},
242                    "path": "data",
243                    "message": "Required field is missing"
244                },
245                {
246                    "logic": {">": [{"var": "data.age"}, 18]},
247                    "message": "Must be over 18"
248                }
249            ]
250        });
251
252        let config = ValidationConfig::from_json(&input).unwrap();
253        assert_eq!(config.rules.len(), 2);
254        assert_eq!(config.rules[0].message, "Required field is missing");
255        assert_eq!(config.rules[1].message, "Must be over 18");
256    }
257
258    #[test]
259    fn test_validation_config_missing_rules() {
260        let input = json!({});
261        let result = ValidationConfig::from_json(&input);
262        assert!(result.is_err());
263    }
264
265    #[test]
266    fn test_validation_config_invalid_rules() {
267        let input = json!({
268            "rules": "not_an_array"
269        });
270        let result = ValidationConfig::from_json(&input);
271        assert!(result.is_err());
272    }
273
274    #[test]
275    fn test_validation_config_missing_logic() {
276        let input = json!({
277            "rules": [
278                {
279                    "path": "data",
280                    "message": "Some error"
281                }
282            ]
283        });
284        let result = ValidationConfig::from_json(&input);
285        assert!(result.is_err());
286    }
287
288    #[test]
289    fn test_validation_config_defaults() {
290        let input = json!({
291            "rules": [
292                {
293                    "logic": {"var": "data.field"}
294                }
295            ]
296        });
297
298        let config = ValidationConfig::from_json(&input).unwrap();
299        assert_eq!(config.rules[0].message, "Validation failed");
300    }
301
302    fn dv(v: serde_json::Value) -> OwnedDataValue {
303        OwnedDataValue::from(&v)
304    }
305
306    fn message_with_data(initial: serde_json::Value) -> crate::engine::message::Message {
307        use crate::engine::message::Message;
308        use crate::engine::utils::set_nested_value;
309        let mut m = Message::new(Arc::new(dv(json!({}))));
310        set_nested_value(&mut m.context, "data", dv(initial));
311        m
312    }
313
314    /// Compile each rule's `logic` and stamp the resulting `Arc<Logic>` into
315    /// the `compiled_logic` slot — mirroring `LogicCompiler`.
316    fn compile_rules(engine: &Arc<Engine>, config: &mut ValidationConfig) {
317        for rule in &mut config.rules {
318            rule.compiled_logic = Some(engine.compile_arc(&rule.logic).unwrap());
319        }
320    }
321
322    #[test]
323    fn test_validation_execute_passes() {
324        let engine = Arc::new(Engine::builder().with_templating(true).build());
325
326        let mut message = message_with_data(json!({
327            "email": "test@example.com",
328            "age": 25
329        }));
330
331        let mut config = ValidationConfig {
332            rules: vec![
333                ValidationRule {
334                    logic: json!({"!!": [{"var": "data.email"}]}),
335                    message: "Email is required".to_string(),
336                    compiled_logic: None,
337                },
338                ValidationRule {
339                    logic: json!({">": [{"var": "data.age"}, 18]}),
340                    message: "Must be over 18".to_string(),
341                    compiled_logic: None,
342                },
343            ],
344        };
345        compile_rules(&engine, &mut config);
346
347        let result = config.execute(&mut message, &engine);
348        assert!(result.is_ok());
349
350        let (outcome, changes) = result.unwrap();
351        assert_eq!(outcome, TaskOutcome::Success);
352        assert!(changes.is_empty());
353        assert!(message.errors.is_empty());
354    }
355
356    #[test]
357    fn test_validation_execute_fails() {
358        let engine = Arc::new(Engine::builder().with_templating(true).build());
359
360        let mut message = message_with_data(json!({ "age": 15 }));
361
362        let mut config = ValidationConfig {
363            rules: vec![
364                ValidationRule {
365                    logic: json!({"!!": [{"var": "data.email"}]}),
366                    message: "Email is required".to_string(),
367                    compiled_logic: None,
368                },
369                ValidationRule {
370                    logic: json!({">": [{"var": "data.age"}, 18]}),
371                    message: "Must be over 18".to_string(),
372                    compiled_logic: None,
373                },
374            ],
375        };
376        compile_rules(&engine, &mut config);
377
378        let result = config.execute(&mut message, &engine);
379        assert!(result.is_ok());
380
381        let (outcome, _changes) = result.unwrap();
382        assert_eq!(outcome, TaskOutcome::Status(400));
383        assert_eq!(message.errors.len(), 2);
384
385        let error_messages: Vec<&str> = message.errors.iter().map(|e| e.message.as_str()).collect();
386        assert!(error_messages.contains(&"Email is required"));
387        assert!(error_messages.contains(&"Must be over 18"));
388    }
389
390    #[test]
391    fn test_validation_uncompiled_logic() {
392        use crate::engine::message::Message;
393
394        let engine = Arc::new(Engine::builder().with_templating(true).build());
395
396        let mut message = Message::new(Arc::new(dv(json!({}))));
397
398        let config = ValidationConfig {
399            rules: vec![ValidationRule {
400                logic: json!(true),
401                message: "Test".to_string(),
402                compiled_logic: None,
403            }],
404        };
405
406        let result = config.execute(&mut message, &engine);
407        assert!(result.is_ok());
408
409        let (outcome, _) = result.unwrap();
410        assert_eq!(outcome, TaskOutcome::Status(400));
411        assert!(!message.errors.is_empty());
412        assert!(message.errors[0].code == "COMPILATION_ERROR");
413    }
414}