mx_message/plugin/
validate.rs1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4 AsyncFunctionHandler, FunctionConfig,
5 error::Result,
6 message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::{Value, json};
10use std::sync::Arc;
11use tracing::{debug, instrument};
12
13use super::common::extract_mx_content;
14
15pub struct Validate;
16
17#[async_trait]
18impl AsyncFunctionHandler for Validate {
19 #[instrument(skip(self, message, config, _datalogic))]
20 async fn execute(
21 &self,
22 message: &mut Message,
23 config: &FunctionConfig,
24 _datalogic: Arc<DataLogic>,
25 ) -> Result<(usize, Vec<Change>)> {
26 debug!("Starting MX message validation (XML)");
27
28 let input = match config {
30 FunctionConfig::Custom { input, .. } => input,
31 _ => {
32 return Err(DataflowError::Validation(
33 "Invalid configuration type".to_string(),
34 ));
35 }
36 };
37
38 let source_field = input.get("source").and_then(Value::as_str).ok_or_else(|| {
39 DataflowError::Validation("'source' parameter is required".to_string())
40 })?;
41
42 let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
43 DataflowError::Validation("'target' parameter is required".to_string())
44 })?;
45
46 let xml_content = extract_mx_content(message.data(), source_field, &message.payload)?;
48
49 debug!(
50 source_field = %source_field,
51 target_field = %target_field,
52 "Validating MX XML message"
53 );
54
55 let validation_result = self.validate_xml(&xml_content)?;
57
58 message
60 .data_mut()
61 .as_object_mut()
62 .unwrap()
63 .insert(target_field.to_string(), validation_result.clone());
64
65 message.metadata_mut().as_object_mut().unwrap().insert(
67 "validation".to_string(),
68 json!({
69 "validated": true,
70 "timestamp": chrono::Utc::now().to_rfc3339(),
71 }),
72 );
73
74 message.invalidate_context_cache();
75
76 Ok((
77 200,
78 vec![Change {
79 path: Arc::from(format!("data.{}", target_field)),
80 old_value: Arc::new(Value::Null),
81 new_value: Arc::new(validation_result),
82 }],
83 ))
84 }
85}
86
87impl Validate {
88 fn validate_xml(&self, xml_content: &str) -> Result<Value> {
91 use crate::mx_envelope::MxMessage;
92
93 debug!("Validating XML MX message");
94
95 let mut errors: Vec<String> = Vec::new();
96 let warnings: Vec<String> = Vec::new();
97
98 let has_envelope = xml_content.contains("<AppHdr") || xml_content.contains("<Envelope");
100
101 if has_envelope {
102 debug!("Validating XML with full envelope using MxMessage");
103
104 match MxMessage::from_xml(xml_content) {
106 Ok(_) => {
107 debug!("XML message with envelope validated successfully");
108 }
109 Err(e) => {
110 errors.push(format!("XML validation failed: {}", e));
111 }
112 }
113 } else {
114 debug!("Validating Document-only XML using from_mx_xml_envelope_str");
115
116 use super::common::extract_message_type_from_xml;
118
119 match extract_message_type_from_xml(xml_content) {
120 Ok(message_type) => {
121 match crate::xml::from_mx_xml_envelope_str(xml_content, &message_type) {
123 Ok(_) => {
124 debug!("Document-only XML validated successfully");
125 }
126 Err(e) => {
127 errors.push(format!("XML validation failed: {}", e));
128 }
129 }
130 }
131 Err(e) => {
132 errors.push(format!("Could not determine message type: {}", e));
133 }
134 }
135 }
136
137 let is_valid = errors.is_empty();
138
139 Ok(json!({
140 "valid": is_valid,
141 "errors": errors,
142 "warnings": warnings,
143 "timestamp": chrono::Utc::now().to_rfc3339(),
144 }))
145 }
146}