mx_message/plugin/
validate.rs

1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4    AsyncFunctionHandler, FunctionConfig,
5    error::Result,
6    message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::{Value, json};
10use std::sync::Arc;
11use tracing::{debug, instrument};
12
13use super::common::extract_mx_content;
14
15pub struct Validate;
16
17#[async_trait]
18impl AsyncFunctionHandler for Validate {
19    #[instrument(skip(self, message, config, _datalogic))]
20    async fn execute(
21        &self,
22        message: &mut Message,
23        config: &FunctionConfig,
24        _datalogic: Arc<DataLogic>,
25    ) -> Result<(usize, Vec<Change>)> {
26        debug!("Starting MX message validation (XML)");
27
28        // Extract custom configuration
29        let input = match config {
30            FunctionConfig::Custom { input, .. } => input,
31            _ => {
32                return Err(DataflowError::Validation(
33                    "Invalid configuration type".to_string(),
34                ));
35            }
36        };
37
38        let source_field = input.get("source").and_then(Value::as_str).ok_or_else(|| {
39            DataflowError::Validation("'source' parameter is required".to_string())
40        })?;
41
42        let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
43            DataflowError::Validation("'target' parameter is required".to_string())
44        })?;
45
46        // Get the MX XML message to validate
47        let xml_content = extract_mx_content(message.data(), source_field, &message.payload)?;
48
49        debug!(
50            source_field = %source_field,
51            target_field = %target_field,
52            "Validating MX XML message"
53        );
54
55        // Perform XML validation
56        let validation_result = self.validate_xml(&xml_content)?;
57
58        // Store validation result
59        message
60            .data_mut()
61            .as_object_mut()
62            .unwrap()
63            .insert(target_field.to_string(), validation_result.clone());
64
65        // Update metadata with validation summary
66        message.metadata_mut().as_object_mut().unwrap().insert(
67            "validation".to_string(),
68            json!({
69                "validated": true,
70                "timestamp": chrono::Utc::now().to_rfc3339(),
71            }),
72        );
73
74        message.invalidate_context_cache();
75
76        Ok((
77            200,
78            vec![Change {
79                path: Arc::from(format!("data.{}", target_field)),
80                old_value: Arc::new(Value::Null),
81                new_value: Arc::new(validation_result),
82            }],
83        ))
84    }
85}
86
87impl Validate {
88    /// Validate XML MX message by attempting to parse into typed structs
89    /// This validates both structure and content according to ISO20022 schemas
90    fn validate_xml(&self, xml_content: &str) -> Result<Value> {
91        use crate::mx_envelope::MxMessage;
92
93        debug!("Validating XML MX message");
94
95        let mut errors: Vec<String> = Vec::new();
96        let warnings: Vec<String> = Vec::new();
97
98        // Check if XML has full envelope or just Document
99        let has_envelope = xml_content.contains("<AppHdr") || xml_content.contains("<Envelope");
100
101        if has_envelope {
102            debug!("Validating XML with full envelope using MxMessage");
103
104            // Validate by attempting to deserialize with MxMessage
105            match MxMessage::from_xml(xml_content) {
106                Ok(_) => {
107                    debug!("XML message with envelope validated successfully");
108                }
109                Err(e) => {
110                    errors.push(format!("XML validation failed: {}", e));
111                }
112            }
113        } else {
114            debug!("Validating Document-only XML using from_mx_xml_envelope_str");
115
116            // For Document-only XML, we need to determine the message type first
117            use super::common::extract_message_type_from_xml;
118
119            match extract_message_type_from_xml(xml_content) {
120                Ok(message_type) => {
121                    // Validate using the envelope validator which handles both cases
122                    match crate::xml::from_mx_xml_envelope_str(xml_content, &message_type) {
123                        Ok(_) => {
124                            debug!("Document-only XML validated successfully");
125                        }
126                        Err(e) => {
127                            errors.push(format!("XML validation failed: {}", e));
128                        }
129                    }
130                }
131                Err(e) => {
132                    errors.push(format!("Could not determine message type: {}", e));
133                }
134            }
135        }
136
137        let is_valid = errors.is_empty();
138
139        Ok(json!({
140            "valid": is_valid,
141            "errors": errors,
142            "warnings": warnings,
143            "timestamp": chrono::Utc::now().to_rfc3339(),
144        }))
145    }
146}