mx_message/plugin/
validate.rs

1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4    AsyncFunctionHandler, FunctionConfig,
5    error::Result,
6    message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::{Value, json};
10use std::sync::Arc;
11use tracing::{debug, instrument};
12
13use super::common::{detect_format, extract_message_type, extract_mx_content};
14
15pub struct Validate;
16
17#[async_trait]
18impl AsyncFunctionHandler for Validate {
19    #[instrument(skip(self, message, config, _datalogic))]
20    async fn execute(
21        &self,
22        message: &mut Message,
23        config: &FunctionConfig,
24        _datalogic: Arc<DataLogic>,
25    ) -> Result<(usize, Vec<Change>)> {
26        debug!("Starting MX message validation");
27
28        // Extract custom configuration
29        let input = match config {
30            FunctionConfig::Custom { input, .. } => input,
31            _ => {
32                return Err(DataflowError::Validation(
33                    "Invalid configuration type".to_string(),
34                ));
35            }
36        };
37
38        let mx_message_field =
39            input
40                .get("mx_message")
41                .and_then(Value::as_str)
42                .ok_or_else(|| {
43                    DataflowError::Validation("'mx_message' parameter is required".to_string())
44                })?;
45
46        let validation_result_field = input
47            .get("validation_result")
48            .and_then(Value::as_str)
49            .ok_or_else(|| {
50                DataflowError::Validation("'validation_result' parameter is required".to_string())
51            })?;
52
53        let format = input
54            .get("format")
55            .and_then(Value::as_str)
56            .unwrap_or("auto");
57
58        // Get the MX message to validate
59        let mx_content = extract_mx_content(message.data(), mx_message_field, &message.payload)?;
60
61        debug!(
62            mx_message_field = %mx_message_field,
63            validation_result_field = %validation_result_field,
64            format = %format,
65            "Validating MX message"
66        );
67
68        // Perform validation
69        let validation_result = self.validate_mx_message(&mx_content, format)?;
70
71        // Store validation result
72        message.data_mut().as_object_mut().unwrap().insert(
73            validation_result_field.to_string(),
74            validation_result.clone(),
75        );
76
77        // Update metadata with validation summary
78        message.metadata_mut().as_object_mut().unwrap().insert(
79            "validation".to_string(),
80            json!({
81                "validated": true,
82                "timestamp": chrono::Utc::now().to_rfc3339(),
83            }),
84        );
85
86        message.invalidate_context_cache();
87
88        Ok((
89            200,
90            vec![Change {
91                path: Arc::from(format!("data.{}", validation_result_field)),
92                old_value: Arc::new(Value::Null),
93                new_value: Arc::new(validation_result),
94            }],
95        ))
96    }
97}
98
99impl Validate {
100    fn validate_mx_message(&self, mx_content: &str, format_hint: &str) -> Result<Value> {
101        // Auto-detect format if needed
102        let format = if format_hint == "auto" {
103            detect_format(mx_content)
104        } else {
105            format_hint.to_string()
106        };
107
108        debug!(format = %format, "Validating MX message");
109
110        let mut errors: Vec<String> = Vec::new();
111        let warnings: Vec<String> = Vec::new();
112
113        match format.as_str() {
114            "xml" => {
115                // For XML validation, try to deserialize directly to typed structs
116                // First parse to get message type
117                match self.parse_xml(mx_content) {
118                    Ok(data) => {
119                        match extract_message_type(&data) {
120                            Ok(message_type) => {
121                                // Try to deserialize XML directly to typed struct using envelope-aware parsing
122                                // This validates both AppHdr (if present) and Document structure
123                                match crate::xml::from_mx_xml_envelope_str(
124                                    mx_content,
125                                    &message_type,
126                                ) {
127                                    Ok(_) => {
128                                        // Successfully deserialized - message is valid
129                                        debug!(
130                                            "XML message validated successfully (with envelope support)"
131                                        );
132                                    }
133                                    Err(e) => {
134                                        errors.push(format!("XML deserialization failed: {}", e));
135                                    }
136                                }
137                            }
138                            Err(e) => {
139                                errors.push(format!("Could not determine message type: {}", e));
140                            }
141                        }
142                    }
143                    Err(e) => {
144                        errors.push(format!("XML parsing failed: {}", e));
145                    }
146                }
147            }
148            "json" => {
149                // For JSON validation, parse and deserialize to typed struct
150                match self.parse_json(mx_content) {
151                    Ok(data) => {
152                        match extract_message_type(&data) {
153                            Ok(message_type) => {
154                                // Try to serialize to XML (validates structure)
155                                match crate::xml::json_to_typed_xml(&data, &message_type) {
156                                    Ok(_xml_str) => {
157                                        debug!("JSON message validated successfully");
158                                    }
159                                    Err(e) => {
160                                        errors.push(format!("JSON validation failed: {}", e));
161                                    }
162                                }
163                            }
164                            Err(e) => {
165                                errors.push(format!("Could not determine message type: {}", e));
166                            }
167                        }
168                    }
169                    Err(e) => {
170                        errors.push(format!("JSON parsing failed: {}", e));
171                    }
172                }
173            }
174            _ => {
175                errors.push(format!("Unsupported format: {}", format));
176            }
177        }
178
179        let is_valid = errors.is_empty();
180
181        Ok(json!({
182            "valid": is_valid,
183            "errors": errors,
184            "warnings": warnings,
185            "timestamp": chrono::Utc::now().to_rfc3339(),
186        }))
187    }
188
189    /// Parse XML to JSON
190    fn parse_xml(&self, xml_str: &str) -> std::result::Result<Value, String> {
191        crate::xml::from_mx_xml_to_json(xml_str).map_err(|e| format!("XML parsing error: {}", e))
192    }
193
194    /// Parse JSON string to Value
195    fn parse_json(&self, json_str: &str) -> std::result::Result<Value, String> {
196        serde_json::from_str(json_str).map_err(|e| format!("JSON parsing error: {}", e))
197    }
198}