swift_mt_message/plugin/
publish.rs

1use crate::{SwiftMessage, messages::*};
2use async_trait::async_trait;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5    AsyncFunctionHandler, FunctionConfig,
6    error::Result,
7    message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::Value;
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14/// Helper function to clean null values from fields before serialization
15fn clean_null_fields(data: &Value) -> Value {
16    let mut cleaned = data.clone();
17    if let Some(fields) = cleaned.get_mut("fields").and_then(|f| f.as_object_mut()) {
18        fields.retain(|_key, value| {
19            if let Some(obj) = value.as_object() {
20                // Remove fields where any nested values are null
21                !obj.values().any(|v| v.is_null())
22            } else {
23                true // Keep non-object values
24            }
25        });
26    }
27    cleaned
28}
29
30/// Parse JSON into a SwiftMessage and convert to MT format
31fn json_to_mt(
32    message_type: &str,
33    json_value: &Value,
34) -> std::result::Result<String, DataflowError> {
35    // Debug logging for MT104
36    if (message_type == "104" || message_type == "MT104")
37        && let Some(fields_obj) = json_value.get("fields")
38    {
39        eprintln!(
40            "DEBUG MT104 - fields keys: {:?}",
41            fields_obj.as_object().map(|o| o.keys().collect::<Vec<_>>())
42        );
43    }
44
45    macro_rules! convert_json {
46        ($mt_type:ty) => {{
47            let msg: SwiftMessage<$mt_type> =
48                serde_json::from_value(json_value.clone()).map_err(|e| {
49                    DataflowError::Validation(format!(
50                        "Failed to parse JSON as {}: {}",
51                        stringify!($mt_type),
52                        e
53                    ))
54                })?;
55            Ok(msg.to_mt_message())
56        }};
57    }
58
59    match message_type {
60        "101" | "MT101" => convert_json!(MT101),
61        "103" | "MT103" => convert_json!(MT103),
62        "104" | "MT104" => convert_json!(MT104),
63        "107" | "MT107" => convert_json!(MT107),
64        "110" | "MT110" => convert_json!(MT110),
65        "111" | "MT111" => convert_json!(MT111),
66        "112" | "MT112" => convert_json!(MT112),
67        "190" | "MT190" => convert_json!(MT190),
68        "191" | "MT191" => convert_json!(MT191),
69        "192" | "MT192" => convert_json!(MT192),
70        "196" | "MT196" => convert_json!(MT196),
71        "199" | "MT199" => convert_json!(MT199),
72        "200" | "MT200" => convert_json!(MT200),
73        "202" | "MT202" => convert_json!(MT202),
74        "204" | "MT204" => convert_json!(MT204),
75        "205" | "MT205" => convert_json!(MT205),
76        "210" | "MT210" => convert_json!(MT210),
77        "290" | "MT290" => convert_json!(MT290),
78        "291" | "MT291" => convert_json!(MT291),
79        "292" | "MT292" => convert_json!(MT292),
80        "296" | "MT296" => convert_json!(MT296),
81        "299" | "MT299" => convert_json!(MT299),
82        "900" | "MT900" => convert_json!(MT900),
83        "910" | "MT910" => convert_json!(MT910),
84        "920" | "MT920" => convert_json!(MT920),
85        "935" | "MT935" => convert_json!(MT935),
86        "940" | "MT940" => convert_json!(MT940),
87        "941" | "MT941" => convert_json!(MT941),
88        "942" | "MT942" => convert_json!(MT942),
89        "950" | "MT950" => convert_json!(MT950),
90        _ => Err(DataflowError::Validation(format!(
91            "Unsupported message type: {}",
92            message_type
93        ))),
94    }
95}
96
97pub struct Publish;
98
99#[async_trait]
100impl AsyncFunctionHandler for Publish {
101    #[instrument(skip(self, message, config, _datalogic))]
102    async fn execute(
103        &self,
104        message: &mut Message,
105        config: &FunctionConfig,
106        _datalogic: Arc<DataLogic>,
107    ) -> Result<(usize, Vec<Change>)> {
108        debug!("Starting JSON to MT message publishing");
109
110        // Extract custom configuration
111        let input = match config {
112            FunctionConfig::Custom { input, .. } => input,
113            _ => {
114                return Err(DataflowError::Validation(
115                    "Invalid configuration type".to_string(),
116                ));
117            }
118        };
119
120        // Get json_data and mt_message field names
121        let json_data_field = input
122            .get("json_data")
123            .and_then(Value::as_str)
124            .ok_or_else(|| {
125                DataflowError::Validation("'json_data' parameter is required".to_string())
126            })?;
127
128        let mt_message_field =
129            input
130                .get("mt_message")
131                .and_then(Value::as_str)
132                .ok_or_else(|| {
133                    DataflowError::Validation("'mt_message' parameter is required".to_string())
134                })?;
135
136        // Extract JSON data from the message
137        let json_data = message.data().get(json_data_field).cloned().ok_or_else(|| {
138            error!(
139                json_data_field = %json_data_field,
140                available_fields = ?message.data().as_object().map(|obj| obj.keys().collect::<Vec<_>>()),
141                "JSON data field not found in message data"
142            );
143            DataflowError::Validation(format!(
144                "Field '{}' not found in message data",
145                json_data_field
146            ))
147        })?;
148
149        debug!(
150            json_data_field = %json_data_field,
151            mt_message_field = %mt_message_field,
152            "Processing JSON to MT conversion"
153        );
154
155        // Extract the actual JSON data if it's wrapped in a generate result
156        let json_to_convert = if let Some(inner_json) = json_data.get("json_data") {
157            // This is output from the generate function
158            inner_json.clone()
159        } else {
160            // Direct JSON data
161            json_data.clone()
162        };
163
164        // Clean null values from fields before serialization (required for swift-mt-message library)
165        let cleaned_data = clean_null_fields(&json_to_convert);
166
167        // Extract message type from the JSON data
168        let message_type = json_data.get("message_type")
169            .and_then(Value::as_str)
170            .map(|mt| mt.trim_start_matches("MT").to_string())
171            .ok_or_else(|| {
172                DataflowError::Validation(
173                    "Missing 'message_type' field in JSON data. The message_type field is required at the root level.".to_string()
174                )
175            })?;
176
177        debug!(message_type = %message_type, "Converting JSON to MT{}", message_type);
178
179        // Convert JSON to MT message
180        let mt_message = json_to_mt(&message_type, &cleaned_data).map_err(|e| {
181            DataflowError::Validation(format!("Failed to convert to MT{}: {}", message_type, e))
182        })?;
183
184        debug!(
185            message_length = mt_message.len(),
186            "MT message published successfully"
187        );
188
189        // Store the MT message in the output field
190        let old_value = message
191            .data()
192            .get(mt_message_field)
193            .cloned()
194            .unwrap_or(Value::Null);
195
196        message.data_mut()[mt_message_field] = Value::String(mt_message.clone());
197
198        // Invalidate cache after modifications
199        message.invalidate_context_cache();
200
201        Ok((
202            200,
203            vec![Change {
204                path: Arc::from(format!("data.{}", mt_message_field)),
205                old_value: Arc::new(old_value),
206                new_value: Arc::new(Value::String(mt_message)),
207            }],
208        ))
209    }
210}