mx_message/plugin/
publish.rs

1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4    AsyncFunctionHandler, FunctionConfig,
5    error::Result,
6    message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::{Value, json};
10use std::sync::Arc;
11use tracing::{debug, error, instrument};
12
13pub struct Publish;
14
15#[async_trait]
16impl AsyncFunctionHandler for Publish {
17    #[instrument(skip(self, message, config, _datalogic))]
18    async fn execute(
19        &self,
20        message: &mut Message,
21        config: &FunctionConfig,
22        _datalogic: Arc<DataLogic>,
23    ) -> Result<(usize, Vec<Change>)> {
24        eprintln!("🔍 PUBLISH PLUGIN: Starting JSON to MX message publishing");
25        debug!("Starting JSON to MX message publishing");
26
27        // Extract custom configuration
28        let input = match config {
29            FunctionConfig::Custom { input, .. } => input,
30            _ => {
31                return Err(DataflowError::Validation(
32                    "Invalid configuration type".to_string(),
33                ));
34            }
35        };
36
37        // Get json_data and mx_message field names
38        let json_data_field = input
39            .get("json_data")
40            .and_then(Value::as_str)
41            .ok_or_else(|| {
42                DataflowError::Validation("'json_data' parameter is required".to_string())
43            })?;
44
45        let mx_message_field =
46            input
47                .get("mx_message")
48                .and_then(Value::as_str)
49                .ok_or_else(|| {
50                    DataflowError::Validation("'mx_message' parameter is required".to_string())
51                })?;
52
53        let format = input.get("format").and_then(Value::as_str).unwrap_or("xml");
54
55        let include_bah = input
56            .get("include_bah")
57            .and_then(Value::as_bool)
58            .unwrap_or(false);
59
60        let namespace_prefix = input.get("namespace_prefix").and_then(Value::as_str);
61
62        // Extract JSON data from the message
63        let json_data = message.data().get(json_data_field).cloned().ok_or_else(|| {
64            error!(
65                json_data_field = %json_data_field,
66                available_fields = ?message.data().as_object().map(|obj| obj.keys().collect::<Vec<_>>()),
67                "JSON data field not found in message data"
68            );
69            DataflowError::Validation(format!(
70                "Field '{}' not found in message data",
71                json_data_field
72            ))
73        })?;
74
75        debug!(
76            json_data_field = %json_data_field,
77            mx_message_field = %mx_message_field,
78            format = %format,
79            include_bah = %include_bah,
80            "Processing JSON to MX conversion"
81        );
82
83        // Extract the actual JSON data if it's wrapped in a generate result
84        let json_to_convert = if let Some(inner_json) = json_data.get("json_data") {
85            // This is output from the generate function
86            eprintln!("🔍 PUBLISH: Found json_data wrapper, extracting inner JSON");
87            debug!("Found json_data wrapper, extracting inner JSON");
88            inner_json.clone()
89        } else {
90            // Direct JSON data
91            eprintln!("🔍 PUBLISH: Using JSON data directly");
92            debug!("Using JSON data directly");
93            json_data.clone()
94        };
95
96        // Debug: log the structure we're trying to convert
97        if let Some(obj) = json_to_convert.as_object() {
98            let keys: Vec<&String> = obj.keys().collect();
99            eprintln!(
100                "🔍 PUBLISH: JSON structure has {} keys: {:?}",
101                keys.len(),
102                keys
103            );
104            debug!("JSON structure has {} keys: {:?}", keys.len(), keys);
105        }
106
107        // Extract message type from the JSON data
108        let message_type = json_data
109            .get("message_type")
110            .and_then(Value::as_str)
111            .ok_or_else(|| {
112                DataflowError::Validation(
113                    "Missing 'message_type' field in JSON data. The message_type field is required at the root level.".to_string()
114                )
115            })?;
116
117        debug!(message_type = %message_type, format = %format, "Converting JSON to MX message");
118
119        eprintln!(
120            "🔍 PUBLISH: About to call json_to_mx with format={}",
121            format
122        );
123        // Convert JSON to MX message
124        let mx_message = self.json_to_mx(
125            &json_to_convert,
126            message_type,
127            format,
128            include_bah,
129            namespace_prefix,
130        )?;
131        eprintln!("🔍 PUBLISH: Successfully converted to MX message");
132
133        debug!(
134            message_length = mx_message.len(),
135            "MX message published successfully"
136        );
137
138        // Store the MX message in the output field
139        let old_value = message
140            .data()
141            .get(mx_message_field)
142            .cloned()
143            .unwrap_or(Value::Null);
144
145        message.data_mut()[mx_message_field] = Value::String(mx_message.clone());
146
147        // Invalidate cache after modifications
148        message.invalidate_context_cache();
149
150        Ok((
151            200,
152            vec![Change {
153                path: Arc::from(format!("data.{}", mx_message_field)),
154                old_value: Arc::new(old_value),
155                new_value: Arc::new(Value::String(mx_message)),
156            }],
157        ))
158    }
159}
160
161impl Publish {
162    /// Convert JSON to MX message (XML or JSON format)
163    fn json_to_mx(
164        &self,
165        json_data: &Value,
166        message_type: &str,
167        format: &str,
168        include_bah: bool,
169        namespace_prefix: Option<&str>,
170    ) -> Result<String> {
171        match format {
172            "xml" => self.json_to_xml(json_data, message_type, include_bah, namespace_prefix),
173            "json" => self.json_to_json(json_data),
174            _ => Err(DataflowError::Validation(format!(
175                "Unsupported format: {}. Supported formats are 'xml' and 'json'",
176                format
177            ))),
178        }
179    }
180
181    /// Convert JSON to XML format
182    fn json_to_xml(
183        &self,
184        json_data: &Value,
185        message_type: &str,
186        _include_bah: bool,
187        _namespace_prefix: Option<&str>,
188    ) -> Result<String> {
189        // Handle different JSON structures:
190        // 1. Just Document: { "Document": {...} }
191        // 2. Envelope with AppHdr and Document: { "AppHdr": {...}, "Document": {...} }
192        // For now, we only serialize the Document part (AppHdr will be supported later)
193
194        eprintln!("🔍 PUBLISH json_to_xml: Checking for Document element");
195        let data_to_serialize = if let Some(doc) = json_data.get("Document") {
196            // Has Document - check if there are other root keys
197            if let Some(obj) = json_data.as_object() {
198                eprintln!(
199                    "🔍 PUBLISH json_to_xml: Found Document. Object has {} keys",
200                    obj.len()
201                );
202                if obj.len() == 1 {
203                    // Only Document - serialize as-is
204                    eprintln!("🔍 PUBLISH json_to_xml: Only Document key, serializing as-is");
205                    debug!("Serializing Document only");
206                    json_data.clone()
207                } else {
208                    // Multiple root keys (e.g., AppHdr + Document) - extract Document only
209                    eprintln!(
210                        "🔍 PUBLISH json_to_xml: Multiple root keys detected ({:?}), extracting Document only",
211                        obj.keys().collect::<Vec<_>>()
212                    );
213                    debug!("Multiple root keys detected, extracting Document only");
214                    json!({ "Document": doc })
215                }
216            } else {
217                json_data.clone()
218            }
219        } else {
220            eprintln!("🔍 PUBLISH json_to_xml: ERROR - No Document element found");
221            return Err(DataflowError::Validation(
222                "JSON data must contain a 'Document' element for XML serialization".to_string(),
223            ));
224        };
225
226        // Create XML config (kept for future AppHdr support)
227        let _config = crate::xml::XmlConfig::default();
228
229        // Debug: show what we're about to serialize
230        debug!("About to serialize to XML. Data structure:");
231        if let Some(obj) = data_to_serialize.as_object() {
232            eprintln!(
233                "🔍 PUBLISH json_to_xml: About to serialize. Root keys: {:?}",
234                obj.keys().collect::<Vec<_>>()
235            );
236            debug!("  Root keys: {:?}", obj.keys().collect::<Vec<_>>());
237        }
238
239        // Use the new typed serialization function from the library
240        eprintln!("🔍 PUBLISH json_to_xml: Calling json_to_typed_xml...");
241        let result = crate::xml::json_to_typed_xml(&data_to_serialize, message_type);
242        match &result {
243            Ok(_) => eprintln!("🔍 PUBLISH json_to_xml: json_to_typed_xml succeeded"),
244            Err(e) => eprintln!("🔍 PUBLISH json_to_xml: json_to_typed_xml FAILED: {:?}", e),
245        }
246
247        result.map_err(|e| {
248            error!(error = ?e, data = ?data_to_serialize, message_type = %message_type, "XML serialization failed");
249            DataflowError::Validation(format!("XML serialization error: {}", e))
250        })
251    }
252
253    /// Convert JSON to JSON format (essentially just stringify with formatting)
254    fn json_to_json(&self, json_data: &Value) -> Result<String> {
255        serde_json::to_string_pretty(json_data).map_err(|e| {
256            error!(error = ?e, "JSON serialization failed");
257            DataflowError::Validation(format!("JSON serialization error: {}", e))
258        })
259    }
260}