mx_message/plugin/
parse.rs

1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4    AsyncFunctionHandler, FunctionConfig,
5    error::Result,
6    message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::{Value, json};
10use std::sync::Arc;
11use tracing::{debug, error, instrument};
12
13use super::common::{extract_message_type, extract_mx_content};
14
15pub struct Parse;
16
17#[async_trait]
18impl AsyncFunctionHandler for Parse {
19    #[instrument(skip(self, message, config, _datalogic))]
20    async fn execute(
21        &self,
22        message: &mut Message,
23        config: &FunctionConfig,
24        _datalogic: Arc<DataLogic>,
25    ) -> Result<(usize, Vec<Change>)> {
26        debug!("Starting MX message parsing (XML to JSON)");
27
28        // Extract custom configuration
29        let input = match config {
30            FunctionConfig::Custom { input, .. } => input,
31            _ => {
32                return Err(DataflowError::Validation(
33                    "Invalid configuration type".to_string(),
34                ));
35            }
36        };
37
38        let source_field = input.get("source").and_then(Value::as_str).ok_or_else(|| {
39            DataflowError::Validation("'source' parameter is required".to_string())
40        })?;
41
42        let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
43            DataflowError::Validation("'target' parameter is required".to_string())
44        })?;
45
46        let xml_payload = extract_mx_content(message.data(), source_field, &message.payload)?
47            .replace("\\n", "\n");
48
49        debug!(
50            source_field = %source_field,
51            target_field = %target_field,
52            payload_length = xml_payload.len(),
53            "Extracted XML payload for parsing"
54        );
55
56        self.parse_xml_to_json(message, &xml_payload, target_field)
57    }
58}
59
60impl Parse {
61    /// Parse XML to JSON using MxMessage API
62    /// This is the primary parsing method - always XML input to JSON output
63    fn parse_xml_to_json(
64        &self,
65        message: &mut Message,
66        xml_str: &str,
67        target_field: &str,
68    ) -> Result<(usize, Vec<Change>)> {
69        use crate::mx_envelope::MxMessage;
70
71        debug!("Parsing XML to JSON using MxMessage API");
72
73        // Check if XML has full envelope or just Document
74        let has_envelope = xml_str.contains("<AppHdr") || xml_str.contains("<Envelope");
75
76        let parsed_data = if has_envelope {
77            debug!("XML has full envelope with AppHdr");
78
79            // Use MxMessage to deserialize XML with envelope
80            let mx_message = MxMessage::from_xml(xml_str).map_err(|e| {
81                error!(error = ?e, "Failed to parse XML with MxMessage");
82                DataflowError::Validation(format!("XML parsing error: {}", e))
83            })?;
84
85            // Convert to JSON string then parse to Value
86            let json_str = mx_message.to_json().map_err(|e| {
87                error!(error = ?e, "Failed to convert to JSON");
88                DataflowError::Validation(format!("JSON conversion error: {}", e))
89            })?;
90
91            serde_json::from_str(&json_str).map_err(|e| {
92                error!(error = ?e, "Failed to parse JSON");
93                DataflowError::Validation(format!("JSON parsing error: {}", e))
94            })?
95        } else {
96            debug!("XML has Document only, using typed parser");
97
98            // Fall back to Document-only parser for XML without envelope
99            use super::common::extract_message_type_from_xml;
100            let message_type = extract_message_type_from_xml(xml_str)?;
101
102            crate::xml::xml_to_json_via_document(xml_str, &message_type).map_err(|e| {
103                error!(error = ?e, "XML parsing failed");
104                DataflowError::Validation(format!("XML parsing error: {}", e))
105            })?
106        };
107
108        // Extract message type from parsed data
109        let message_type = extract_message_type(&parsed_data)?;
110
111        debug!(message_type = %message_type, "Successfully parsed XML to JSON");
112
113        // Store the parsed result in message data
114        message
115            .data_mut()
116            .as_object_mut()
117            .unwrap()
118            .insert(target_field.to_string(), parsed_data.clone());
119
120        message.metadata_mut().as_object_mut().unwrap().insert(
121            target_field.to_string(),
122            json!({
123                "message_type": message_type,
124                "format": "json",
125            }),
126        );
127
128        debug!(
129            message_type = %message_type,
130            target_field = %target_field,
131            "XML to JSON parsing completed successfully"
132        );
133
134        // Important: invalidate cache after modifications
135        message.invalidate_context_cache();
136
137        Ok((
138            200,
139            vec![Change {
140                path: Arc::from(format!("data.{}", target_field)),
141                old_value: Arc::new(Value::Null),
142                new_value: Arc::new(parsed_data),
143            }],
144        ))
145    }
146}