use async_trait::async_trait;
use dataflow_rs::engine::error::DataflowError;
use dataflow_rs::engine::{
AsyncFunctionHandler, FunctionConfig,
error::Result,
message::{Change, Message},
};
use datalogic_rs::DataLogic;
use serde_json::{Value, json};
use std::sync::Arc;
use tracing::{debug, error, instrument};
use super::common::{extract_message_type, extract_mx_content};
pub struct Parse;
#[async_trait]
impl AsyncFunctionHandler for Parse {
#[instrument(skip(self, message, config, _datalogic))]
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
_datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
debug!("Starting MX message parsing (XML to JSON)");
let input = match config {
FunctionConfig::Custom { input, .. } => input,
_ => {
return Err(DataflowError::Validation(
"Invalid configuration type".to_string(),
));
}
};
let source_field = input.get("source").and_then(Value::as_str).ok_or_else(|| {
DataflowError::Validation("'source' parameter is required".to_string())
})?;
let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
DataflowError::Validation("'target' parameter is required".to_string())
})?;
let xml_payload = extract_mx_content(message.data(), source_field, &message.payload)?
.replace("\\n", "\n");
debug!(
source_field = %source_field,
target_field = %target_field,
payload_length = xml_payload.len(),
"Extracted XML payload for parsing"
);
self.parse_xml_to_json(message, &xml_payload, target_field)
}
}
impl Parse {
fn parse_xml_to_json(
&self,
message: &mut Message,
xml_str: &str,
target_field: &str,
) -> Result<(usize, Vec<Change>)> {
use crate::mx_envelope::MxMessage;
debug!("Parsing XML to JSON using MxMessage API");
let has_envelope = xml_str.contains("<AppHdr") || xml_str.contains("<Envelope");
let parsed_data = if has_envelope {
debug!("XML has full envelope with AppHdr");
let mx_message = MxMessage::from_xml(xml_str).map_err(|e| {
error!(error = ?e, "Failed to parse XML with MxMessage");
DataflowError::Validation(format!("XML parsing error: {}", e))
})?;
let json_str = mx_message.to_json().map_err(|e| {
error!(error = ?e, "Failed to convert to JSON");
DataflowError::Validation(format!("JSON conversion error: {}", e))
})?;
serde_json::from_str(&json_str).map_err(|e| {
error!(error = ?e, "Failed to parse JSON");
DataflowError::Validation(format!("JSON parsing error: {}", e))
})?
} else {
debug!("XML has Document only, using typed parser");
use super::common::extract_message_type_from_xml;
let message_type = extract_message_type_from_xml(xml_str)?;
crate::xml::xml_to_json_via_document(xml_str, &message_type).map_err(|e| {
error!(error = ?e, "XML parsing failed");
DataflowError::Validation(format!("XML parsing error: {}", e))
})?
};
let message_type = extract_message_type(&parsed_data)?;
debug!(message_type = %message_type, "Successfully parsed XML to JSON");
message
.data_mut()
.as_object_mut()
.unwrap()
.insert(target_field.to_string(), parsed_data.clone());
message.metadata_mut().as_object_mut().unwrap().insert(
target_field.to_string(),
json!({
"message_type": message_type,
"format": "json",
}),
);
debug!(
message_type = %message_type,
target_field = %target_field,
"XML to JSON parsing completed successfully"
);
message.invalidate_context_cache();
Ok((
200,
vec![Change {
path: Arc::from(format!("data.{}", target_field)),
old_value: Arc::new(Value::Null),
new_value: Arc::new(parsed_data),
}],
))
}
}