mx_message/plugin/
parse.rs

1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4    AsyncFunctionHandler, FunctionConfig,
5    error::Result,
6    message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::{Value, json};
10use std::sync::Arc;
11use tracing::{debug, error, instrument};
12
13use super::common::{
14    detect_format, extract_message_type, extract_message_type_from_xml, extract_mx_content,
15};
16
17pub struct Parse;
18
19#[async_trait]
20impl AsyncFunctionHandler for Parse {
21    #[instrument(skip(self, message, config, _datalogic))]
22    async fn execute(
23        &self,
24        message: &mut Message,
25        config: &FunctionConfig,
26        _datalogic: Arc<DataLogic>,
27    ) -> Result<(usize, Vec<Change>)> {
28        debug!("Starting MX message parsing");
29
30        // Extract custom configuration
31        let input = match config {
32            FunctionConfig::Custom { input, .. } => input,
33            _ => {
34                return Err(DataflowError::Validation(
35                    "Invalid configuration type".to_string(),
36                ));
37            }
38        };
39
40        let mx_message_field =
41            input
42                .get("mx_message")
43                .and_then(Value::as_str)
44                .ok_or_else(|| {
45                    DataflowError::Validation("'mx_message' parameter is required".to_string())
46                })?;
47
48        let parsed_field = input.get("parsed").and_then(Value::as_str).ok_or_else(|| {
49            DataflowError::Validation("'parsed' parameter is required".to_string())
50        })?;
51
52        let format = input
53            .get("format")
54            .and_then(Value::as_str)
55            .unwrap_or("auto");
56
57        let payload = extract_mx_content(message.data(), mx_message_field, &message.payload)?
58            .replace("\\n", "\n");
59
60        debug!(
61            mx_message_field = %mx_message_field,
62            parsed_field = %parsed_field,
63            payload_length = payload.len(),
64            format = %format,
65            "Extracted MX payload for parsing"
66        );
67
68        self.parse_mx_message(message, &payload, parsed_field, format)
69    }
70}
71
72impl Parse {
73    fn parse_mx_message(
74        &self,
75        message: &mut Message,
76        payload: &str,
77        parsed_field: &str,
78        format_hint: &str,
79    ) -> Result<(usize, Vec<Change>)> {
80        debug!("Parsing MX message");
81
82        // Auto-detect format if needed
83        let format = if format_hint == "auto" {
84            detect_format(payload)
85        } else {
86            format_hint.to_string()
87        };
88
89        debug!(format = %format, "Detected/using format");
90
91        let parsed_data = match format.as_str() {
92            "xml" => self.parse_xml(payload)?,
93            "json" => self.parse_json(payload)?,
94            _ => {
95                return Err(DataflowError::Validation(format!(
96                    "Unsupported format: {}",
97                    format
98                )));
99            }
100        };
101
102        // Extract message type from parsed data
103        let message_type = extract_message_type(&parsed_data)?;
104
105        debug!(message_type = %message_type, "Successfully parsed MX message");
106
107        // Store the parsed result in message data
108        message
109            .data_mut()
110            .as_object_mut()
111            .unwrap()
112            .insert(parsed_field.to_string(), parsed_data.clone());
113
114        message.metadata_mut().as_object_mut().unwrap().insert(
115            parsed_field.to_string(),
116            json!({
117                "message_type": message_type,
118                "format": format,
119            }),
120        );
121
122        debug!(
123            message_type = %message_type,
124            format = %format,
125            parsed_field = %parsed_field,
126            "MX message parsing completed successfully"
127        );
128
129        // Important: invalidate cache after modifications
130        message.invalidate_context_cache();
131
132        Ok((
133            200,
134            vec![Change {
135                path: Arc::from(format!("data.{}", parsed_field)),
136                old_value: Arc::new(Value::Null),
137                new_value: Arc::new(parsed_data),
138            }],
139        ))
140    }
141
142    /// Parse XML to JSON
143    fn parse_xml(&self, xml_str: &str) -> Result<Value> {
144        // First, extract message type from XML to determine which typed struct to use
145        let message_type = extract_message_type_from_xml(xml_str)?;
146
147        debug!(message_type = %message_type, "Extracted message type from XML");
148
149        // Use the typed struct parser which correctly handles arrays
150        crate::xml::xml_to_json_via_document(xml_str, &message_type).map_err(|e| {
151            error!(error = ?e, "XML parsing failed");
152            DataflowError::Validation(format!("XML parsing error: {}", e))
153        })
154    }
155
156    /// Parse JSON string to Value
157    fn parse_json(&self, json_str: &str) -> Result<Value> {
158        serde_json::from_str(json_str).map_err(|e| {
159            error!(error = ?e, "JSON parsing failed");
160            DataflowError::Validation(format!("JSON parsing error: {}", e))
161        })
162    }
163}