swift_mt_message/plugin/
publish.rs1use crate::{SwiftMessage, messages::*};
2use async_trait::async_trait;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5    AsyncFunctionHandler, FunctionConfig,
6    error::Result,
7    message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14fn clean_null_fields(data: &Value) -> Value {
20    match data {
21        Value::Object(obj) => {
22            let mut cleaned = serde_json::Map::new();
23
24            for (key, value) in obj.iter() {
25                match value {
26                    Value::Null => {
27                        continue;
29                    }
30                    Value::Object(_) => {
31                        let cleaned_inner = clean_null_fields(value);
33                        if !cleaned_inner.is_null()
35                            && !cleaned_inner.as_object().is_some_and(|o| o.is_empty())
36                        {
37                            cleaned.insert(key.clone(), cleaned_inner);
38                        }
39                    }
40                    Value::Array(arr) => {
41                        let cleaned_array: Vec<Value> = arr
43                            .iter()
44                            .map(clean_null_fields)
45                            .filter(|item| !item.is_null())
46                            .collect();
47                        if !cleaned_array.is_empty() {
48                            cleaned.insert(key.clone(), Value::Array(cleaned_array));
49                        }
50                    }
51                    _ => {
52                        cleaned.insert(key.clone(), value.clone());
54                    }
55                }
56            }
57
58            Value::Object(cleaned)
59        }
60        Value::Array(arr) => {
61            let cleaned_array: Vec<Value> = arr
63                .iter()
64                .map(clean_null_fields)
65                .filter(|item| !item.is_null())
66                .collect();
67            Value::Array(cleaned_array)
68        }
69        other => other.clone(),
71    }
72}
73
74fn json_to_mt(
76    message_type: &str,
77    json_value: &Value,
78) -> std::result::Result<String, DataflowError> {
79    macro_rules! convert_json {
80        ($mt_type:ty) => {{
81            let msg: SwiftMessage<$mt_type> =
82                serde_json::from_value(json_value.clone()).map_err(|e| {
83                    DataflowError::Validation(format!(
84                        "Failed to parse JSON as {}: {}",
85                        stringify!($mt_type),
86                        e
87                    ))
88                })?;
89            Ok(msg.to_mt_message())
90        }};
91    }
92
93    match message_type {
94        "101" | "MT101" => convert_json!(MT101),
95        "103" | "MT103" => convert_json!(MT103),
96        "104" | "MT104" => convert_json!(MT104),
97        "107" | "MT107" => convert_json!(MT107),
98        "110" | "MT110" => convert_json!(MT110),
99        "111" | "MT111" => convert_json!(MT111),
100        "112" | "MT112" => convert_json!(MT112),
101        "190" | "MT190" => convert_json!(MT190),
102        "191" | "MT191" => convert_json!(MT191),
103        "192" | "MT192" => convert_json!(MT192),
104        "196" | "MT196" => convert_json!(MT196),
105        "199" | "MT199" => convert_json!(MT199),
106        "200" | "MT200" => convert_json!(MT200),
107        "202" | "MT202" => convert_json!(MT202),
108        "204" | "MT204" => convert_json!(MT204),
109        "205" | "MT205" => convert_json!(MT205),
110        "210" | "MT210" => convert_json!(MT210),
111        "290" | "MT290" => convert_json!(MT290),
112        "291" | "MT291" => convert_json!(MT291),
113        "292" | "MT292" => convert_json!(MT292),
114        "296" | "MT296" => convert_json!(MT296),
115        "299" | "MT299" => convert_json!(MT299),
116        "900" | "MT900" => convert_json!(MT900),
117        "910" | "MT910" => convert_json!(MT910),
118        "920" | "MT920" => convert_json!(MT920),
119        "935" | "MT935" => convert_json!(MT935),
120        "940" | "MT940" => convert_json!(MT940),
121        "941" | "MT941" => convert_json!(MT941),
122        "942" | "MT942" => convert_json!(MT942),
123        "950" | "MT950" => convert_json!(MT950),
124        _ => Err(DataflowError::Validation(format!(
125            "Unsupported message type: {}",
126            message_type
127        ))),
128    }
129}
130
131pub struct Publish;
132
133#[async_trait]
134impl AsyncFunctionHandler for Publish {
135    #[instrument(skip(self, message, config, _datalogic))]
136    async fn execute(
137        &self,
138        message: &mut Message,
139        config: &FunctionConfig,
140        _datalogic: Arc<DataLogic>,
141    ) -> Result<(usize, Vec<Change>)> {
142        debug!("Starting JSON to MT message publishing");
143
144        let input = match config {
146            FunctionConfig::Custom { input, .. } => input,
147            _ => {
148                return Err(DataflowError::Validation(
149                    "Invalid configuration type".to_string(),
150                ));
151            }
152        };
153
154        let source_field = input.get("source").and_then(Value::as_str).ok_or_else(|| {
156            DataflowError::Validation("'source' parameter is required".to_string())
157        })?;
158
159        let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
160            DataflowError::Validation("'target' parameter is required".to_string())
161        })?;
162
163        let json_data = message.data().get(source_field).cloned().ok_or_else(|| {
165            error!(
166                source_field = %source_field,
167                available_fields = ?message.data().as_object().map(|obj| obj.keys().collect::<Vec<_>>()),
168                "JSON data field not found in message data"
169            );
170            DataflowError::Validation(format!(
171                "Field '{}' not found in message data",
172                source_field
173            ))
174        })?;
175
176        debug!(
177            source_field = %source_field,
178            target_field = %target_field,
179            "Processing JSON to MT conversion"
180        );
181
182        let json_to_convert = if let Some(inner_json) = json_data.get("json_data") {
184            inner_json.clone()
186        } else {
187            json_data.clone()
189        };
190
191        let cleaned_data = clean_null_fields(&json_to_convert);
193
194        let message_type = json_data.get("message_type")
196            .and_then(Value::as_str)
197            .map(|mt| mt.trim_start_matches("MT").to_string())
198            .ok_or_else(|| {
199                DataflowError::Validation(
200                    "Missing 'message_type' field in JSON data. The message_type field is required at the root level.".to_string()
201                )
202            })?;
203
204        debug!(message_type = %message_type, "Converting JSON to MT{}", message_type);
205
206        let mt_message = json_to_mt(&message_type, &cleaned_data).map_err(|e| {
208            DataflowError::Validation(format!("Failed to convert to MT{}: {}", message_type, e))
209        })?;
210
211        debug!(
212            message_length = mt_message.len(),
213            "MT message published successfully"
214        );
215
216        let old_value = message
218            .data()
219            .get(target_field)
220            .cloned()
221            .unwrap_or(Value::Null);
222
223        message.data_mut()[target_field] = Value::String(mt_message.clone());
224
225        message.invalidate_context_cache();
227
228        Ok((
229            200,
230            vec![Change {
231                path: Arc::from(format!("data.{}", target_field)),
232                old_value: Arc::new(old_value),
233                new_value: Arc::new(Value::String(mt_message)),
234            }],
235        ))
236    }
237}