mx_message/plugin/
parse.rs1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4 AsyncFunctionHandler, FunctionConfig,
5 error::Result,
6 message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::{Value, json};
10use std::sync::Arc;
11use tracing::{debug, error, instrument};
12
13use super::common::{
14 detect_format, extract_message_type, extract_message_type_from_xml, extract_mx_content,
15};
16
17pub struct Parse;
18
19#[async_trait]
20impl AsyncFunctionHandler for Parse {
21 #[instrument(skip(self, message, config, _datalogic))]
22 async fn execute(
23 &self,
24 message: &mut Message,
25 config: &FunctionConfig,
26 _datalogic: Arc<DataLogic>,
27 ) -> Result<(usize, Vec<Change>)> {
28 debug!("Starting MX message parsing");
29
30 let input = match config {
32 FunctionConfig::Custom { input, .. } => input,
33 _ => {
34 return Err(DataflowError::Validation(
35 "Invalid configuration type".to_string(),
36 ));
37 }
38 };
39
40 let mx_message_field =
41 input
42 .get("mx_message")
43 .and_then(Value::as_str)
44 .ok_or_else(|| {
45 DataflowError::Validation("'mx_message' parameter is required".to_string())
46 })?;
47
48 let parsed_field = input.get("parsed").and_then(Value::as_str).ok_or_else(|| {
49 DataflowError::Validation("'parsed' parameter is required".to_string())
50 })?;
51
52 let format = input
53 .get("format")
54 .and_then(Value::as_str)
55 .unwrap_or("auto");
56
57 let payload = extract_mx_content(message.data(), mx_message_field, &message.payload)?
58 .replace("\\n", "\n");
59
60 debug!(
61 mx_message_field = %mx_message_field,
62 parsed_field = %parsed_field,
63 payload_length = payload.len(),
64 format = %format,
65 "Extracted MX payload for parsing"
66 );
67
68 self.parse_mx_message(message, &payload, parsed_field, format)
69 }
70}
71
72impl Parse {
73 fn parse_mx_message(
74 &self,
75 message: &mut Message,
76 payload: &str,
77 parsed_field: &str,
78 format_hint: &str,
79 ) -> Result<(usize, Vec<Change>)> {
80 debug!("Parsing MX message");
81
82 let format = if format_hint == "auto" {
84 detect_format(payload)
85 } else {
86 format_hint.to_string()
87 };
88
89 debug!(format = %format, "Detected/using format");
90
91 let parsed_data = match format.as_str() {
92 "xml" => self.parse_xml(payload)?,
93 "json" => self.parse_json(payload)?,
94 _ => {
95 return Err(DataflowError::Validation(format!(
96 "Unsupported format: {}",
97 format
98 )));
99 }
100 };
101
102 let message_type = extract_message_type(&parsed_data)?;
104
105 debug!(message_type = %message_type, "Successfully parsed MX message");
106
107 message
109 .data_mut()
110 .as_object_mut()
111 .unwrap()
112 .insert(parsed_field.to_string(), parsed_data.clone());
113
114 message.metadata_mut().as_object_mut().unwrap().insert(
115 parsed_field.to_string(),
116 json!({
117 "message_type": message_type,
118 "format": format,
119 }),
120 );
121
122 debug!(
123 message_type = %message_type,
124 format = %format,
125 parsed_field = %parsed_field,
126 "MX message parsing completed successfully"
127 );
128
129 message.invalidate_context_cache();
131
132 Ok((
133 200,
134 vec![Change {
135 path: Arc::from(format!("data.{}", parsed_field)),
136 old_value: Arc::new(Value::Null),
137 new_value: Arc::new(parsed_data),
138 }],
139 ))
140 }
141
142 fn parse_xml(&self, xml_str: &str) -> Result<Value> {
144 let message_type = extract_message_type_from_xml(xml_str)?;
146
147 debug!(message_type = %message_type, "Extracted message type from XML");
148
149 crate::xml::xml_to_json_via_document(xml_str, &message_type).map_err(|e| {
151 error!(error = ?e, "XML parsing failed");
152 DataflowError::Validation(format!("XML parsing error: {}", e))
153 })
154 }
155
156 fn parse_json(&self, json_str: &str) -> Result<Value> {
158 serde_json::from_str(json_str).map_err(|e| {
159 error!(error = ?e, "JSON parsing failed");
160 DataflowError::Validation(format!("JSON parsing error: {}", e))
161 })
162 }
163}