mx_message/plugin/
validate.rs1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4 AsyncFunctionHandler, FunctionConfig,
5 error::Result,
6 message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::{Value, json};
10use std::sync::Arc;
11use tracing::{debug, instrument};
12
13use super::common::{detect_format, extract_message_type, extract_mx_content};
14
15pub struct Validate;
16
17#[async_trait]
18impl AsyncFunctionHandler for Validate {
19 #[instrument(skip(self, message, config, _datalogic))]
20 async fn execute(
21 &self,
22 message: &mut Message,
23 config: &FunctionConfig,
24 _datalogic: Arc<DataLogic>,
25 ) -> Result<(usize, Vec<Change>)> {
26 debug!("Starting MX message validation");
27
28 let input = match config {
30 FunctionConfig::Custom { input, .. } => input,
31 _ => {
32 return Err(DataflowError::Validation(
33 "Invalid configuration type".to_string(),
34 ));
35 }
36 };
37
38 let mx_message_field =
39 input
40 .get("mx_message")
41 .and_then(Value::as_str)
42 .ok_or_else(|| {
43 DataflowError::Validation("'mx_message' parameter is required".to_string())
44 })?;
45
46 let validation_result_field = input
47 .get("validation_result")
48 .and_then(Value::as_str)
49 .ok_or_else(|| {
50 DataflowError::Validation("'validation_result' parameter is required".to_string())
51 })?;
52
53 let format = input
54 .get("format")
55 .and_then(Value::as_str)
56 .unwrap_or("auto");
57
58 let mx_content = extract_mx_content(message.data(), mx_message_field, &message.payload)?;
60
61 debug!(
62 mx_message_field = %mx_message_field,
63 validation_result_field = %validation_result_field,
64 format = %format,
65 "Validating MX message"
66 );
67
68 let validation_result = self.validate_mx_message(&mx_content, format)?;
70
71 message.data_mut().as_object_mut().unwrap().insert(
73 validation_result_field.to_string(),
74 validation_result.clone(),
75 );
76
77 message.metadata_mut().as_object_mut().unwrap().insert(
79 "validation".to_string(),
80 json!({
81 "validated": true,
82 "timestamp": chrono::Utc::now().to_rfc3339(),
83 }),
84 );
85
86 message.invalidate_context_cache();
87
88 Ok((
89 200,
90 vec![Change {
91 path: Arc::from(format!("data.{}", validation_result_field)),
92 old_value: Arc::new(Value::Null),
93 new_value: Arc::new(validation_result),
94 }],
95 ))
96 }
97}
98
99impl Validate {
100 fn validate_mx_message(&self, mx_content: &str, format_hint: &str) -> Result<Value> {
101 let format = if format_hint == "auto" {
103 detect_format(mx_content)
104 } else {
105 format_hint.to_string()
106 };
107
108 debug!(format = %format, "Validating MX message");
109
110 let mut errors: Vec<String> = Vec::new();
111 let warnings: Vec<String> = Vec::new();
112
113 match format.as_str() {
114 "xml" => {
115 match self.parse_xml(mx_content) {
118 Ok(data) => {
119 match extract_message_type(&data) {
120 Ok(message_type) => {
121 match crate::xml::from_mx_xml_envelope_str(
124 mx_content,
125 &message_type,
126 ) {
127 Ok(_) => {
128 debug!(
130 "XML message validated successfully (with envelope support)"
131 );
132 }
133 Err(e) => {
134 errors.push(format!("XML deserialization failed: {}", e));
135 }
136 }
137 }
138 Err(e) => {
139 errors.push(format!("Could not determine message type: {}", e));
140 }
141 }
142 }
143 Err(e) => {
144 errors.push(format!("XML parsing failed: {}", e));
145 }
146 }
147 }
148 "json" => {
149 match self.parse_json(mx_content) {
151 Ok(data) => {
152 match extract_message_type(&data) {
153 Ok(message_type) => {
154 match crate::xml::json_to_typed_xml(&data, &message_type) {
156 Ok(_xml_str) => {
157 debug!("JSON message validated successfully");
158 }
159 Err(e) => {
160 errors.push(format!("JSON validation failed: {}", e));
161 }
162 }
163 }
164 Err(e) => {
165 errors.push(format!("Could not determine message type: {}", e));
166 }
167 }
168 }
169 Err(e) => {
170 errors.push(format!("JSON parsing failed: {}", e));
171 }
172 }
173 }
174 _ => {
175 errors.push(format!("Unsupported format: {}", format));
176 }
177 }
178
179 let is_valid = errors.is_empty();
180
181 Ok(json!({
182 "valid": is_valid,
183 "errors": errors,
184 "warnings": warnings,
185 "timestamp": chrono::Utc::now().to_rfc3339(),
186 }))
187 }
188
189 fn parse_xml(&self, xml_str: &str) -> std::result::Result<Value, String> {
191 crate::xml::from_mx_xml_to_json(xml_str).map_err(|e| format!("XML parsing error: {}", e))
192 }
193
194 fn parse_json(&self, json_str: &str) -> std::result::Result<Value, String> {
196 serde_json::from_str(json_str).map_err(|e| format!("JSON parsing error: {}", e))
197 }
198}