dataflow_rs/engine/functions/
validation.rs

1use crate::engine::error::{DataflowError, ErrorInfo, Result};
2use crate::engine::message::{Change, Message};
3use datalogic_rs::{CompiledLogic, DataLogic};
4use log::{debug, error};
5use serde::Deserialize;
6use serde_json::Value;
7use std::sync::Arc;
8
9/// Pre-parsed configuration for validation function
10#[derive(Debug, Clone, Deserialize)]
11pub struct ValidationConfig {
12    pub rules: Vec<ValidationRule>,
13}
14
15#[derive(Debug, Clone, Deserialize)]
16pub struct ValidationRule {
17    pub logic: Value,
18    pub message: String,
19    #[serde(skip)]
20    pub logic_index: Option<usize>,
21}
22
23impl ValidationConfig {
24    pub fn from_json(input: &Value) -> Result<Self> {
25        let rules = input.get("rules").ok_or_else(|| {
26            DataflowError::Validation("Missing 'rules' array in input".to_string())
27        })?;
28
29        let rules_arr = rules
30            .as_array()
31            .ok_or_else(|| DataflowError::Validation("'rules' must be an array".to_string()))?;
32
33        let mut parsed_rules = Vec::new();
34
35        for rule in rules_arr {
36            let logic = rule
37                .get("logic")
38                .ok_or_else(|| DataflowError::Validation("Missing 'logic' in rule".to_string()))?
39                .clone();
40
41            let message = rule
42                .get("message")
43                .and_then(Value::as_str)
44                .unwrap_or("Validation failed")
45                .to_string();
46
47            parsed_rules.push(ValidationRule {
48                logic,
49                message,
50                logic_index: None,
51            });
52        }
53
54        Ok(ValidationConfig {
55            rules: parsed_rules,
56        })
57    }
58
59    /// Execute the validation rules using pre-compiled logic
60    pub fn execute(
61        &self,
62        message: &mut Message,
63        datalogic: &Arc<DataLogic>,
64        logic_cache: &[Arc<CompiledLogic>],
65    ) -> Result<(usize, Vec<Change>)> {
66        let changes = Vec::new();
67        let mut validation_errors = Vec::new();
68
69        // Use the cached context Arc from the message (validation is read-only)
70        let context_arc = message.get_context_arc();
71
72        // Process each validation rule
73        for (idx, rule) in self.rules.iter().enumerate() {
74            debug!("Processing validation rule {}: {}", idx, rule.message);
75
76            // Get the compiled logic from cache with proper bounds checking
77            let compiled_logic = match rule.logic_index {
78                Some(index) => {
79                    // Ensure index is valid before accessing
80                    if index >= logic_cache.len() {
81                        error!(
82                            "Validation: Logic index {} out of bounds (cache size: {}) for rule at index {}",
83                            index,
84                            logic_cache.len(),
85                            idx
86                        );
87                        validation_errors.push(ErrorInfo::simple_ref(
88                            "COMPILATION_ERROR",
89                            &format!(
90                                "Logic index {} out of bounds for rule at index {}",
91                                index, idx
92                            ),
93                            None,
94                        ));
95                        continue;
96                    }
97                    &logic_cache[index]
98                }
99                None => {
100                    error!(
101                        "Validation: Logic not compiled (no index) for rule at index {}",
102                        idx
103                    );
104                    validation_errors.push(ErrorInfo::simple_ref(
105                        "COMPILATION_ERROR",
106                        &format!("Logic not compiled for rule at index: {}", idx),
107                        None,
108                    ));
109                    continue;
110                }
111            };
112
113            // Evaluate the validation rule using DataLogic v4
114            // Reuse the same Arc for all rules - validation is read-only
115            let result = datalogic.evaluate(compiled_logic, Arc::clone(&context_arc));
116
117            match result {
118                Ok(value) => {
119                    // Check if validation passed (must be explicitly true)
120                    if value != Value::Bool(true) {
121                        debug!("Validation failed for rule {}: {}", idx, rule.message);
122                        validation_errors.push(ErrorInfo::simple_ref(
123                            "VALIDATION_ERROR",
124                            &rule.message,
125                            None,
126                        ));
127                    } else {
128                        debug!("Validation passed for rule {}", idx);
129                    }
130                }
131                Err(e) => {
132                    error!("Validation: Error evaluating rule {}: {:?}", idx, e);
133                    validation_errors.push(ErrorInfo::simple_ref(
134                        "EVALUATION_ERROR",
135                        &format!("Failed to evaluate rule {}: {}", idx, e),
136                        None,
137                    ));
138                }
139            }
140        }
141
142        // Add validation errors to message if any
143        if !validation_errors.is_empty() {
144            message.errors.extend(validation_errors);
145            Ok((400, changes)) // Return 400 for validation failures
146        } else {
147            Ok((200, changes))
148        }
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use serde_json::json;
156
157    #[test]
158    fn test_validation_config_from_json() {
159        let input = json!({
160            "rules": [
161                {
162                    "logic": {"!!": [{"var": "data.required_field"}]},
163                    "path": "data",
164                    "message": "Required field is missing"
165                },
166                {
167                    "logic": {">": [{"var": "data.age"}, 18]},
168                    "message": "Must be over 18"
169                }
170            ]
171        });
172
173        let config = ValidationConfig::from_json(&input).unwrap();
174        assert_eq!(config.rules.len(), 2);
175        assert_eq!(config.rules[0].message, "Required field is missing");
176        assert_eq!(config.rules[1].message, "Must be over 18");
177    }
178
179    #[test]
180    fn test_validation_config_missing_rules() {
181        let input = json!({});
182        let result = ValidationConfig::from_json(&input);
183        assert!(result.is_err());
184    }
185
186    #[test]
187    fn test_validation_config_invalid_rules() {
188        let input = json!({
189            "rules": "not_an_array"
190        });
191        let result = ValidationConfig::from_json(&input);
192        assert!(result.is_err());
193    }
194
195    #[test]
196    fn test_validation_config_missing_logic() {
197        let input = json!({
198            "rules": [
199                {
200                    "path": "data",
201                    "message": "Some error"
202                }
203            ]
204        });
205        let result = ValidationConfig::from_json(&input);
206        assert!(result.is_err());
207    }
208
209    #[test]
210    fn test_validation_config_defaults() {
211        let input = json!({
212            "rules": [
213                {
214                    "logic": {"var": "data.field"}
215                }
216            ]
217        });
218
219        let config = ValidationConfig::from_json(&input).unwrap();
220        assert_eq!(config.rules[0].message, "Validation failed");
221    }
222}