swift_mt_message/plugin/
publish.rs1use crate::{SwiftMessage, messages::*};
2use async_trait::async_trait;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5 AsyncFunctionHandler, FunctionConfig,
6 error::Result,
7 message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14fn clean_null_fields(data: &Value) -> Value {
20 match data {
21 Value::Object(obj) => {
22 let mut cleaned = serde_json::Map::new();
23
24 for (key, value) in obj.iter() {
25 match value {
26 Value::Null => {
27 continue;
29 }
30 Value::Object(_) => {
31 let cleaned_inner = clean_null_fields(value);
33 if !cleaned_inner.is_null()
35 && !cleaned_inner.as_object().is_some_and(|o| o.is_empty())
36 {
37 cleaned.insert(key.clone(), cleaned_inner);
38 }
39 }
40 Value::Array(arr) => {
41 let cleaned_array: Vec<Value> = arr
43 .iter()
44 .map(clean_null_fields)
45 .filter(|item| !item.is_null())
46 .collect();
47 if !cleaned_array.is_empty() {
48 cleaned.insert(key.clone(), Value::Array(cleaned_array));
49 }
50 }
51 _ => {
52 cleaned.insert(key.clone(), value.clone());
54 }
55 }
56 }
57
58 Value::Object(cleaned)
59 }
60 Value::Array(arr) => {
61 let cleaned_array: Vec<Value> = arr
63 .iter()
64 .map(clean_null_fields)
65 .filter(|item| !item.is_null())
66 .collect();
67 Value::Array(cleaned_array)
68 }
69 other => other.clone(),
71 }
72}
73
74fn json_to_mt(
76 message_type: &str,
77 json_value: &Value,
78) -> std::result::Result<String, DataflowError> {
79 macro_rules! convert_json {
80 ($mt_type:ty) => {{
81 let msg: SwiftMessage<$mt_type> =
82 serde_json::from_value(json_value.clone()).map_err(|e| {
83 DataflowError::Validation(format!(
84 "Failed to parse JSON as {}: {}",
85 stringify!($mt_type),
86 e
87 ))
88 })?;
89 Ok(msg.to_mt_message())
90 }};
91 }
92
93 match message_type {
94 "101" | "MT101" => convert_json!(MT101),
95 "103" | "MT103" => convert_json!(MT103),
96 "104" | "MT104" => convert_json!(MT104),
97 "107" | "MT107" => convert_json!(MT107),
98 "110" | "MT110" => convert_json!(MT110),
99 "111" | "MT111" => convert_json!(MT111),
100 "112" | "MT112" => convert_json!(MT112),
101 "190" | "MT190" => convert_json!(MT190),
102 "191" | "MT191" => convert_json!(MT191),
103 "192" | "MT192" => convert_json!(MT192),
104 "196" | "MT196" => convert_json!(MT196),
105 "199" | "MT199" => convert_json!(MT199),
106 "200" | "MT200" => convert_json!(MT200),
107 "202" | "MT202" => convert_json!(MT202),
108 "204" | "MT204" => convert_json!(MT204),
109 "205" | "MT205" => convert_json!(MT205),
110 "210" | "MT210" => convert_json!(MT210),
111 "290" | "MT290" => convert_json!(MT290),
112 "291" | "MT291" => convert_json!(MT291),
113 "292" | "MT292" => convert_json!(MT292),
114 "296" | "MT296" => convert_json!(MT296),
115 "299" | "MT299" => convert_json!(MT299),
116 "900" | "MT900" => convert_json!(MT900),
117 "910" | "MT910" => convert_json!(MT910),
118 "920" | "MT920" => convert_json!(MT920),
119 "935" | "MT935" => convert_json!(MT935),
120 "940" | "MT940" => convert_json!(MT940),
121 "941" | "MT941" => convert_json!(MT941),
122 "942" | "MT942" => convert_json!(MT942),
123 "950" | "MT950" => convert_json!(MT950),
124 _ => Err(DataflowError::Validation(format!(
125 "Unsupported message type: {}",
126 message_type
127 ))),
128 }
129}
130
131pub struct Publish;
132
133#[async_trait]
134impl AsyncFunctionHandler for Publish {
135 #[instrument(skip(self, message, config, _datalogic))]
136 async fn execute(
137 &self,
138 message: &mut Message,
139 config: &FunctionConfig,
140 _datalogic: Arc<DataLogic>,
141 ) -> Result<(usize, Vec<Change>)> {
142 debug!("Starting JSON to MT message publishing");
143
144 let input = match config {
146 FunctionConfig::Custom { input, .. } => input,
147 _ => {
148 return Err(DataflowError::Validation(
149 "Invalid configuration type".to_string(),
150 ));
151 }
152 };
153
154 let source_field = input.get("source").and_then(Value::as_str).ok_or_else(|| {
156 DataflowError::Validation("'source' parameter is required".to_string())
157 })?;
158
159 let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
160 DataflowError::Validation("'target' parameter is required".to_string())
161 })?;
162
163 let json_data = message.data().get(source_field).cloned().ok_or_else(|| {
165 error!(
166 source_field = %source_field,
167 available_fields = ?message.data().as_object().map(|obj| obj.keys().collect::<Vec<_>>()),
168 "JSON data field not found in message data"
169 );
170 DataflowError::Validation(format!(
171 "Field '{}' not found in message data",
172 source_field
173 ))
174 })?;
175
176 debug!(
177 source_field = %source_field,
178 target_field = %target_field,
179 "Processing JSON to MT conversion"
180 );
181
182 let json_to_convert = if let Some(inner_json) = json_data.get("json_data") {
184 inner_json.clone()
186 } else {
187 json_data.clone()
189 };
190
191 let cleaned_data = clean_null_fields(&json_to_convert);
193
194 let message_type = json_data.get("message_type")
196 .and_then(Value::as_str)
197 .map(|mt| mt.trim_start_matches("MT").to_string())
198 .ok_or_else(|| {
199 DataflowError::Validation(
200 "Missing 'message_type' field in JSON data. The message_type field is required at the root level.".to_string()
201 )
202 })?;
203
204 debug!(message_type = %message_type, "Converting JSON to MT{}", message_type);
205
206 let mt_message = json_to_mt(&message_type, &cleaned_data).map_err(|e| {
208 DataflowError::Validation(format!("Failed to convert to MT{}: {}", message_type, e))
209 })?;
210
211 debug!(
212 message_length = mt_message.len(),
213 "MT message published successfully"
214 );
215
216 let old_value = message
218 .data()
219 .get(target_field)
220 .cloned()
221 .unwrap_or(Value::Null);
222
223 message.data_mut()[target_field] = Value::String(mt_message.clone());
224
225 message.invalidate_context_cache();
227
228 Ok((
229 200,
230 vec![Change {
231 path: Arc::from(format!("data.{}", target_field)),
232 old_value: Arc::new(old_value),
233 new_value: Arc::new(Value::String(mt_message)),
234 }],
235 ))
236 }
237}