mx_message/plugin/
parse.rs1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4 AsyncFunctionHandler, FunctionConfig,
5 error::Result,
6 message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::{Value, json};
10use std::sync::Arc;
11use tracing::{debug, error, instrument};
12
13use super::common::{extract_message_type, extract_mx_content};
14
15pub struct Parse;
16
17#[async_trait]
18impl AsyncFunctionHandler for Parse {
19 #[instrument(skip(self, message, config, _datalogic))]
20 async fn execute(
21 &self,
22 message: &mut Message,
23 config: &FunctionConfig,
24 _datalogic: Arc<DataLogic>,
25 ) -> Result<(usize, Vec<Change>)> {
26 debug!("Starting MX message parsing (XML to JSON)");
27
28 let input = match config {
30 FunctionConfig::Custom { input, .. } => input,
31 _ => {
32 return Err(DataflowError::Validation(
33 "Invalid configuration type".to_string(),
34 ));
35 }
36 };
37
38 let source_field = input.get("source").and_then(Value::as_str).ok_or_else(|| {
39 DataflowError::Validation("'source' parameter is required".to_string())
40 })?;
41
42 let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
43 DataflowError::Validation("'target' parameter is required".to_string())
44 })?;
45
46 let xml_payload = extract_mx_content(message.data(), source_field, &message.payload)?
47 .replace("\\n", "\n");
48
49 debug!(
50 source_field = %source_field,
51 target_field = %target_field,
52 payload_length = xml_payload.len(),
53 "Extracted XML payload for parsing"
54 );
55
56 self.parse_xml_to_json(message, &xml_payload, target_field)
57 }
58}
59
60impl Parse {
61 fn parse_xml_to_json(
64 &self,
65 message: &mut Message,
66 xml_str: &str,
67 target_field: &str,
68 ) -> Result<(usize, Vec<Change>)> {
69 use crate::mx_envelope::MxMessage;
70
71 debug!("Parsing XML to JSON using MxMessage API");
72
73 let has_envelope = xml_str.contains("<AppHdr") || xml_str.contains("<Envelope");
75
76 let parsed_data = if has_envelope {
77 debug!("XML has full envelope with AppHdr");
78
79 let mx_message = MxMessage::from_xml(xml_str).map_err(|e| {
81 error!(error = ?e, "Failed to parse XML with MxMessage");
82 DataflowError::Validation(format!("XML parsing error: {}", e))
83 })?;
84
85 let json_str = mx_message.to_json().map_err(|e| {
87 error!(error = ?e, "Failed to convert to JSON");
88 DataflowError::Validation(format!("JSON conversion error: {}", e))
89 })?;
90
91 serde_json::from_str(&json_str).map_err(|e| {
92 error!(error = ?e, "Failed to parse JSON");
93 DataflowError::Validation(format!("JSON parsing error: {}", e))
94 })?
95 } else {
96 debug!("XML has Document only, using typed parser");
97
98 use super::common::extract_message_type_from_xml;
100 let message_type = extract_message_type_from_xml(xml_str)?;
101
102 crate::xml::xml_to_json_via_document(xml_str, &message_type).map_err(|e| {
103 error!(error = ?e, "XML parsing failed");
104 DataflowError::Validation(format!("XML parsing error: {}", e))
105 })?
106 };
107
108 let message_type = extract_message_type(&parsed_data)?;
110
111 debug!(message_type = %message_type, "Successfully parsed XML to JSON");
112
113 message
115 .data_mut()
116 .as_object_mut()
117 .unwrap()
118 .insert(target_field.to_string(), parsed_data.clone());
119
120 message.metadata_mut().as_object_mut().unwrap().insert(
121 target_field.to_string(),
122 json!({
123 "message_type": message_type,
124 "format": "json",
125 }),
126 );
127
128 debug!(
129 message_type = %message_type,
130 target_field = %target_field,
131 "XML to JSON parsing completed successfully"
132 );
133
134 message.invalidate_context_cache();
136
137 Ok((
138 200,
139 vec![Change {
140 path: Arc::from(format!("data.{}", target_field)),
141 old_value: Arc::new(Value::Null),
142 new_value: Arc::new(parsed_data),
143 }],
144 ))
145 }
146}