mx_message/plugin/
publish.rs1use async_trait::async_trait;
2use dataflow_rs::engine::error::DataflowError;
3use dataflow_rs::engine::{
4 AsyncFunctionHandler, FunctionConfig,
5 error::Result,
6 message::{Change, Message},
7};
8use datalogic_rs::DataLogic;
9use serde_json::{Value, json};
10use std::sync::Arc;
11use tracing::{debug, error, instrument};
12
13pub struct Publish;
14
15#[async_trait]
16impl AsyncFunctionHandler for Publish {
17 #[instrument(skip(self, message, config, _datalogic))]
18 async fn execute(
19 &self,
20 message: &mut Message,
21 config: &FunctionConfig,
22 _datalogic: Arc<DataLogic>,
23 ) -> Result<(usize, Vec<Change>)> {
24 eprintln!("🔍 PUBLISH PLUGIN: Starting JSON to MX message publishing");
25 debug!("Starting JSON to MX message publishing");
26
27 let input = match config {
29 FunctionConfig::Custom { input, .. } => input,
30 _ => {
31 return Err(DataflowError::Validation(
32 "Invalid configuration type".to_string(),
33 ));
34 }
35 };
36
37 let json_data_field = input
39 .get("json_data")
40 .and_then(Value::as_str)
41 .ok_or_else(|| {
42 DataflowError::Validation("'json_data' parameter is required".to_string())
43 })?;
44
45 let mx_message_field =
46 input
47 .get("mx_message")
48 .and_then(Value::as_str)
49 .ok_or_else(|| {
50 DataflowError::Validation("'mx_message' parameter is required".to_string())
51 })?;
52
53 let format = input.get("format").and_then(Value::as_str).unwrap_or("xml");
54
55 let include_bah = input
56 .get("include_bah")
57 .and_then(Value::as_bool)
58 .unwrap_or(false);
59
60 let namespace_prefix = input.get("namespace_prefix").and_then(Value::as_str);
61
62 let json_data = message.data().get(json_data_field).cloned().ok_or_else(|| {
64 error!(
65 json_data_field = %json_data_field,
66 available_fields = ?message.data().as_object().map(|obj| obj.keys().collect::<Vec<_>>()),
67 "JSON data field not found in message data"
68 );
69 DataflowError::Validation(format!(
70 "Field '{}' not found in message data",
71 json_data_field
72 ))
73 })?;
74
75 debug!(
76 json_data_field = %json_data_field,
77 mx_message_field = %mx_message_field,
78 format = %format,
79 include_bah = %include_bah,
80 "Processing JSON to MX conversion"
81 );
82
83 let json_to_convert = if let Some(inner_json) = json_data.get("json_data") {
85 eprintln!("🔍 PUBLISH: Found json_data wrapper, extracting inner JSON");
87 debug!("Found json_data wrapper, extracting inner JSON");
88 inner_json.clone()
89 } else {
90 eprintln!("🔍 PUBLISH: Using JSON data directly");
92 debug!("Using JSON data directly");
93 json_data.clone()
94 };
95
96 if let Some(obj) = json_to_convert.as_object() {
98 let keys: Vec<&String> = obj.keys().collect();
99 eprintln!(
100 "🔍 PUBLISH: JSON structure has {} keys: {:?}",
101 keys.len(),
102 keys
103 );
104 debug!("JSON structure has {} keys: {:?}", keys.len(), keys);
105 }
106
107 let message_type = json_data
109 .get("message_type")
110 .and_then(Value::as_str)
111 .ok_or_else(|| {
112 DataflowError::Validation(
113 "Missing 'message_type' field in JSON data. The message_type field is required at the root level.".to_string()
114 )
115 })?;
116
117 debug!(message_type = %message_type, format = %format, "Converting JSON to MX message");
118
119 eprintln!(
120 "🔍 PUBLISH: About to call json_to_mx with format={}",
121 format
122 );
123 let mx_message = self.json_to_mx(
125 &json_to_convert,
126 message_type,
127 format,
128 include_bah,
129 namespace_prefix,
130 )?;
131 eprintln!("🔍 PUBLISH: Successfully converted to MX message");
132
133 debug!(
134 message_length = mx_message.len(),
135 "MX message published successfully"
136 );
137
138 let old_value = message
140 .data()
141 .get(mx_message_field)
142 .cloned()
143 .unwrap_or(Value::Null);
144
145 message.data_mut()[mx_message_field] = Value::String(mx_message.clone());
146
147 message.invalidate_context_cache();
149
150 Ok((
151 200,
152 vec![Change {
153 path: Arc::from(format!("data.{}", mx_message_field)),
154 old_value: Arc::new(old_value),
155 new_value: Arc::new(Value::String(mx_message)),
156 }],
157 ))
158 }
159}
160
161impl Publish {
162 fn json_to_mx(
164 &self,
165 json_data: &Value,
166 message_type: &str,
167 format: &str,
168 include_bah: bool,
169 namespace_prefix: Option<&str>,
170 ) -> Result<String> {
171 match format {
172 "xml" => self.json_to_xml(json_data, message_type, include_bah, namespace_prefix),
173 "json" => self.json_to_json(json_data),
174 _ => Err(DataflowError::Validation(format!(
175 "Unsupported format: {}. Supported formats are 'xml' and 'json'",
176 format
177 ))),
178 }
179 }
180
181 fn json_to_xml(
183 &self,
184 json_data: &Value,
185 message_type: &str,
186 _include_bah: bool,
187 _namespace_prefix: Option<&str>,
188 ) -> Result<String> {
189 eprintln!("🔍 PUBLISH json_to_xml: Checking for Document element");
195 let data_to_serialize = if let Some(doc) = json_data.get("Document") {
196 if let Some(obj) = json_data.as_object() {
198 eprintln!(
199 "🔍 PUBLISH json_to_xml: Found Document. Object has {} keys",
200 obj.len()
201 );
202 if obj.len() == 1 {
203 eprintln!("🔍 PUBLISH json_to_xml: Only Document key, serializing as-is");
205 debug!("Serializing Document only");
206 json_data.clone()
207 } else {
208 eprintln!(
210 "🔍 PUBLISH json_to_xml: Multiple root keys detected ({:?}), extracting Document only",
211 obj.keys().collect::<Vec<_>>()
212 );
213 debug!("Multiple root keys detected, extracting Document only");
214 json!({ "Document": doc })
215 }
216 } else {
217 json_data.clone()
218 }
219 } else {
220 eprintln!("🔍 PUBLISH json_to_xml: ERROR - No Document element found");
221 return Err(DataflowError::Validation(
222 "JSON data must contain a 'Document' element for XML serialization".to_string(),
223 ));
224 };
225
226 let _config = crate::xml::XmlConfig::default();
228
229 debug!("About to serialize to XML. Data structure:");
231 if let Some(obj) = data_to_serialize.as_object() {
232 eprintln!(
233 "🔍 PUBLISH json_to_xml: About to serialize. Root keys: {:?}",
234 obj.keys().collect::<Vec<_>>()
235 );
236 debug!(" Root keys: {:?}", obj.keys().collect::<Vec<_>>());
237 }
238
239 eprintln!("🔍 PUBLISH json_to_xml: Calling json_to_typed_xml...");
241 let result = crate::xml::json_to_typed_xml(&data_to_serialize, message_type);
242 match &result {
243 Ok(_) => eprintln!("🔍 PUBLISH json_to_xml: json_to_typed_xml succeeded"),
244 Err(e) => eprintln!("🔍 PUBLISH json_to_xml: json_to_typed_xml FAILED: {:?}", e),
245 }
246
247 result.map_err(|e| {
248 error!(error = ?e, data = ?data_to_serialize, message_type = %message_type, "XML serialization failed");
249 DataflowError::Validation(format!("XML serialization error: {}", e))
250 })
251 }
252
253 fn json_to_json(&self, json_data: &Value) -> Result<String> {
255 serde_json::to_string_pretty(json_data).map_err(|e| {
256 error!(error = ?e, "JSON serialization failed");
257 DataflowError::Validation(format!("JSON serialization error: {}", e))
258 })
259 }
260}