swift_mt_message/plugin/
publish.rs1use crate::{SwiftMessage, messages::*};
2use async_trait::async_trait;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5 AsyncFunctionHandler, FunctionConfig,
6 error::Result,
7 message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14fn clean_null_fields(data: &Value) -> Value {
16 let mut cleaned = data.clone();
17 if let Some(fields) = cleaned.get_mut("fields").and_then(|f| f.as_object_mut()) {
18 fields.retain(|_key, value| {
19 if let Some(obj) = value.as_object() {
20 !obj.values().any(|v| v.is_null())
22 } else {
23 true }
25 });
26 }
27 cleaned
28}
29
30fn json_to_mt(
32 message_type: &str,
33 json_value: &Value,
34) -> std::result::Result<String, DataflowError> {
35 if (message_type == "104" || message_type == "MT104")
37 && let Some(fields_obj) = json_value.get("fields")
38 {
39 eprintln!(
40 "DEBUG MT104 - fields keys: {:?}",
41 fields_obj.as_object().map(|o| o.keys().collect::<Vec<_>>())
42 );
43 }
44
45 macro_rules! convert_json {
46 ($mt_type:ty) => {{
47 let msg: SwiftMessage<$mt_type> =
48 serde_json::from_value(json_value.clone()).map_err(|e| {
49 DataflowError::Validation(format!(
50 "Failed to parse JSON as {}: {}",
51 stringify!($mt_type),
52 e
53 ))
54 })?;
55 Ok(msg.to_mt_message())
56 }};
57 }
58
59 match message_type {
60 "101" | "MT101" => convert_json!(MT101),
61 "103" | "MT103" => convert_json!(MT103),
62 "104" | "MT104" => convert_json!(MT104),
63 "107" | "MT107" => convert_json!(MT107),
64 "110" | "MT110" => convert_json!(MT110),
65 "111" | "MT111" => convert_json!(MT111),
66 "112" | "MT112" => convert_json!(MT112),
67 "190" | "MT190" => convert_json!(MT190),
68 "191" | "MT191" => convert_json!(MT191),
69 "192" | "MT192" => convert_json!(MT192),
70 "196" | "MT196" => convert_json!(MT196),
71 "199" | "MT199" => convert_json!(MT199),
72 "200" | "MT200" => convert_json!(MT200),
73 "202" | "MT202" => convert_json!(MT202),
74 "204" | "MT204" => convert_json!(MT204),
75 "205" | "MT205" => convert_json!(MT205),
76 "210" | "MT210" => convert_json!(MT210),
77 "290" | "MT290" => convert_json!(MT290),
78 "291" | "MT291" => convert_json!(MT291),
79 "292" | "MT292" => convert_json!(MT292),
80 "296" | "MT296" => convert_json!(MT296),
81 "299" | "MT299" => convert_json!(MT299),
82 "900" | "MT900" => convert_json!(MT900),
83 "910" | "MT910" => convert_json!(MT910),
84 "920" | "MT920" => convert_json!(MT920),
85 "935" | "MT935" => convert_json!(MT935),
86 "940" | "MT940" => convert_json!(MT940),
87 "941" | "MT941" => convert_json!(MT941),
88 "942" | "MT942" => convert_json!(MT942),
89 "950" | "MT950" => convert_json!(MT950),
90 _ => Err(DataflowError::Validation(format!(
91 "Unsupported message type: {}",
92 message_type
93 ))),
94 }
95}
96
97pub struct Publish;
98
99#[async_trait]
100impl AsyncFunctionHandler for Publish {
101 #[instrument(skip(self, message, config, _datalogic))]
102 async fn execute(
103 &self,
104 message: &mut Message,
105 config: &FunctionConfig,
106 _datalogic: Arc<DataLogic>,
107 ) -> Result<(usize, Vec<Change>)> {
108 debug!("Starting JSON to MT message publishing");
109
110 let input = match config {
112 FunctionConfig::Custom { input, .. } => input,
113 _ => {
114 return Err(DataflowError::Validation(
115 "Invalid configuration type".to_string(),
116 ));
117 }
118 };
119
120 let json_data_field = input
122 .get("json_data")
123 .and_then(Value::as_str)
124 .ok_or_else(|| {
125 DataflowError::Validation("'json_data' parameter is required".to_string())
126 })?;
127
128 let mt_message_field =
129 input
130 .get("mt_message")
131 .and_then(Value::as_str)
132 .ok_or_else(|| {
133 DataflowError::Validation("'mt_message' parameter is required".to_string())
134 })?;
135
136 let json_data = message.data().get(json_data_field).cloned().ok_or_else(|| {
138 error!(
139 json_data_field = %json_data_field,
140 available_fields = ?message.data().as_object().map(|obj| obj.keys().collect::<Vec<_>>()),
141 "JSON data field not found in message data"
142 );
143 DataflowError::Validation(format!(
144 "Field '{}' not found in message data",
145 json_data_field
146 ))
147 })?;
148
149 debug!(
150 json_data_field = %json_data_field,
151 mt_message_field = %mt_message_field,
152 "Processing JSON to MT conversion"
153 );
154
155 let json_to_convert = if let Some(inner_json) = json_data.get("json_data") {
157 inner_json.clone()
159 } else {
160 json_data.clone()
162 };
163
164 let cleaned_data = clean_null_fields(&json_to_convert);
166
167 let message_type = json_data.get("message_type")
169 .and_then(Value::as_str)
170 .map(|mt| mt.trim_start_matches("MT").to_string())
171 .ok_or_else(|| {
172 DataflowError::Validation(
173 "Missing 'message_type' field in JSON data. The message_type field is required at the root level.".to_string()
174 )
175 })?;
176
177 debug!(message_type = %message_type, "Converting JSON to MT{}", message_type);
178
179 let mt_message = json_to_mt(&message_type, &cleaned_data).map_err(|e| {
181 DataflowError::Validation(format!("Failed to convert to MT{}: {}", message_type, e))
182 })?;
183
184 debug!(
185 message_length = mt_message.len(),
186 "MT message published successfully"
187 );
188
189 let old_value = message
191 .data()
192 .get(mt_message_field)
193 .cloned()
194 .unwrap_or(Value::Null);
195
196 message.data_mut()[mt_message_field] = Value::String(mt_message.clone());
197
198 message.invalidate_context_cache();
200
201 Ok((
202 200,
203 vec![Change {
204 path: Arc::from(format!("data.{}", mt_message_field)),
205 old_value: Arc::new(old_value),
206 new_value: Arc::new(Value::String(mt_message)),
207 }],
208 ))
209 }
210}