swift_mt_message/plugin/
parse.rs

1use crate::SwiftParser;
2use async_trait::async_trait;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5    AsyncFunctionHandler, FunctionConfig,
6    error::Result,
7    message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::{Value, json};
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Parse;
15
16#[async_trait]
17impl AsyncFunctionHandler for Parse {
18    #[instrument(skip(self, message, config, _datalogic))]
19    async fn execute(
20        &self,
21        message: &mut Message,
22        config: &FunctionConfig,
23        _datalogic: Arc<DataLogic>,
24    ) -> Result<(usize, Vec<Change>)> {
25        debug!("Starting MT message parsing for forward transformation");
26
27        // Extract custom configuration
28        let input = match config {
29            FunctionConfig::Custom { input, .. } => input,
30            _ => {
31                return Err(DataflowError::Validation(
32                    "Invalid configuration type".to_string(),
33                ));
34            }
35        };
36
37        let mt_message_field =
38            input
39                .get("mt_message")
40                .and_then(Value::as_str)
41                .ok_or_else(|| {
42                    DataflowError::Validation("'mt_message' parameter is required".to_string())
43                })?;
44
45        let parsed_field = input.get("parsed").and_then(Value::as_str).ok_or_else(|| {
46            DataflowError::Validation("'parsed' parameter is required".to_string())
47        })?;
48
49        let payload = if mt_message_field == "payload" {
50            message.payload.to_string().replace("\\n", "\n")
51        } else {
52            // Check if the field contains an object with mt_message (from generate_mt output)
53            let field_value = message.data().get(mt_message_field).ok_or_else(|| {
54                DataflowError::Validation(format!(
55                    "MT message field '{}' not found in message data",
56                    mt_message_field
57                ))
58            })?;
59
60            // If it's an object with mt_message field, extract that
61            if let Some(mt_msg) = field_value.get("mt_message").and_then(Value::as_str) {
62                mt_msg.to_string()
63            } else if let Some(s) = field_value.as_str() {
64                // If it's a direct string, use it
65                s.to_string()
66            } else {
67                return Err(DataflowError::Validation(format!(
68                    "Field '{}' does not contain a valid MT message",
69                    mt_message_field
70                )));
71            }
72        };
73
74        debug!(
75            mt_message_field = %mt_message_field,
76            parsed_field = %parsed_field,
77            payload_length = payload.len(),
78            "Extracted MT payload for parsing"
79        );
80
81        self.parse_swift_mt(message, &payload, parsed_field)
82    }
83}
84
85impl Parse {
86    fn parse_swift_mt(
87        &self,
88        message: &mut Message,
89        payload: &str,
90        parsed_field: &str,
91    ) -> Result<(usize, Vec<Change>)> {
92        debug!("Parsing SwiftMT message for forward transformation");
93
94        let payload = Parse::manual_unescape(payload);
95        debug!("Parsing MT message with payload length: {}", payload.len());
96        let parsed_message = SwiftParser::parse_auto(&payload).map_err(|e| {
97            error!(error = ?e, "SwiftMT parsing failed");
98            DataflowError::Validation(format!("SwiftMT parser error: {e:?}"))
99        })?;
100
101        let message_type = parsed_message.message_type().to_string();
102        debug!(message_type = %message_type, "Successfully parsed SwiftMT message");
103
104        let method: String;
105
106        let parsed_data = match message_type.as_str() {
107            "101" => {
108                let Some(mt101_message) = parsed_message.into_mt101() else {
109                    error!("Failed to convert SwiftMessage to MT101");
110                    return Err(DataflowError::Validation(
111                        "MT101 message not found in SwiftMT message".to_string(),
112                    ));
113                };
114
115                method = "normal".to_string();
116
117                serde_json::to_value(&mt101_message).map_err(|e| {
118                    error!(error = ?e, "MT101 JSON conversion failed");
119                    DataflowError::Validation(format!("MT101 JSON conversion failed: {e}"))
120                })?
121            }
122            "103" => {
123                let Some(mt103_message) = parsed_message.into_mt103() else {
124                    error!("Failed to convert SwiftMessage to MT103");
125                    return Err(DataflowError::Validation(
126                        "MT103 message not found in SwiftMT message".to_string(),
127                    ));
128                };
129
130                method = if mt103_message.has_reject_codes() {
131                    "reject".to_string()
132                } else if mt103_message.has_return_codes() {
133                    "return".to_string()
134                } else if mt103_message.is_stp_message() {
135                    "stp".to_string()
136                } else {
137                    "normal".to_string()
138                };
139
140                debug!(method = %method, "Determined MT103 processing method");
141
142                serde_json::to_value(&mt103_message).map_err(|e| {
143                    error!(error = ?e, "MT103 JSON conversion failed");
144                    DataflowError::Validation(format!("MT103 JSON conversion failed: {e}"))
145                })?
146            }
147            "200" => {
148                let Some(mt200_message) = parsed_message.into_mt200() else {
149                    error!("Failed to convert SwiftMessage to MT200");
150                    return Err(DataflowError::Validation(
151                        "MT200 message not found in SwiftMT message".to_string(),
152                    ));
153                };
154
155                method = "normal".to_string();
156                debug!("Processing MT200 with normal method");
157
158                serde_json::to_value(&mt200_message).map_err(|e| {
159                    error!(error = ?e, "MT200 JSON conversion failed");
160                    DataflowError::Validation(format!("MT200 JSON conversion failed: {e}"))
161                })?
162            }
163            "202" => {
164                let Some(mt202_message) = parsed_message.into_mt202() else {
165                    error!("Failed to convert SwiftMessage to MT202");
166                    return Err(DataflowError::Validation(
167                        "MT202 message not found in SwiftMT message".to_string(),
168                    ));
169                };
170
171                method = if mt202_message.has_reject_codes()
172                    || mt202_message
173                        .user_header
174                        .as_ref()
175                        .and_then(|h| h.validation_flag.as_ref())
176                        .map(|flag| flag.as_str() == "REJT")
177                        .unwrap_or(false)
178                {
179                    "reject".to_string()
180                } else if mt202_message.has_return_codes()
181                    || mt202_message
182                        .user_header
183                        .as_ref()
184                        .and_then(|h| h.validation_flag.as_ref())
185                        .map(|flag| flag.as_str() == "RETN")
186                        .unwrap_or(false)
187                {
188                    "return".to_string()
189                } else if mt202_message.is_cover_message()
190                    || mt202_message
191                        .user_header
192                        .as_ref()
193                        .and_then(|h| h.validation_flag.as_ref())
194                        .map(|flag| flag.as_str() == "COV")
195                        .unwrap_or(false)
196                {
197                    "cover".to_string()
198                } else {
199                    "normal".to_string()
200                };
201
202                debug!(method = %method, "Determined MT202 processing method");
203
204                serde_json::to_value(&mt202_message).map_err(|e| {
205                    error!(error = ?e, "MT202 JSON conversion failed");
206                    DataflowError::Validation(format!("MT202 JSON conversion failed: {e}"))
207                })?
208            }
209            "205" => {
210                let Some(mt205_message) = parsed_message.into_mt205() else {
211                    error!("Failed to convert SwiftMessage to MT205");
212                    return Err(DataflowError::Validation(
213                        "MT205 message not found in SwiftMT message".to_string(),
214                    ));
215                };
216
217                method = if mt205_message.has_reject_codes()
218                    || mt205_message
219                        .user_header
220                        .as_ref()
221                        .and_then(|h| h.validation_flag.as_ref())
222                        .map(|flag| flag.as_str() == "REJT")
223                        .unwrap_or(false)
224                {
225                    "reject".to_string()
226                } else if mt205_message.has_return_codes()
227                    || mt205_message
228                        .user_header
229                        .as_ref()
230                        .and_then(|h| h.validation_flag.as_ref())
231                        .map(|flag| flag.as_str() == "RETN")
232                        .unwrap_or(false)
233                {
234                    "return".to_string()
235                } else if mt205_message.is_cover_message()
236                    || mt205_message
237                        .user_header
238                        .as_ref()
239                        .and_then(|h| h.validation_flag.as_ref())
240                        .map(|flag| flag.as_str() == "COV")
241                        .unwrap_or(false)
242                {
243                    "cover".to_string()
244                } else {
245                    "normal".to_string()
246                };
247
248                debug!(method = %method, "Determined MT205 processing method");
249
250                serde_json::to_value(&mt205_message).map_err(|e| {
251                    error!(error = ?e, "MT205 JSON conversion failed");
252                    DataflowError::Validation(format!("MT205 JSON conversion failed: {e}"))
253                })?
254            }
255            "900" => {
256                let Some(mt900_message) = parsed_message.into_mt900() else {
257                    error!("Failed to convert SwiftMessage to MT900");
258                    return Err(DataflowError::Validation(
259                        "MT900 message not found in SwiftMT message".to_string(),
260                    ));
261                };
262
263                method = "normal".to_string();
264                debug!("Processing MT900 with normal method");
265
266                serde_json::to_value(&mt900_message).map_err(|e| {
267                    error!(error = ?e, "MT900 JSON conversion failed");
268                    DataflowError::Validation(format!("MT900 JSON conversion failed: {e}"))
269                })?
270            }
271            "910" => {
272                let Some(mt910_message) = parsed_message.into_mt910() else {
273                    error!("Failed to convert SwiftMessage to MT910");
274                    return Err(DataflowError::Validation(
275                        "MT910 message not found in SwiftMT message".to_string(),
276                    ));
277                };
278
279                method = "normal".to_string();
280                debug!("Processing MT910 with normal method");
281
282                serde_json::to_value(&mt910_message).map_err(|e| {
283                    error!(error = ?e, "MT910 JSON conversion failed");
284                    DataflowError::Validation(format!("MT910 JSON conversion failed: {e}"))
285                })?
286            }
287            "192" => {
288                let Some(mt192_message) = parsed_message.into_mt192() else {
289                    error!("Failed to convert SwiftMessage to MT192");
290                    return Err(DataflowError::Validation(
291                        "MT192 message not found in SwiftMT message".to_string(),
292                    ));
293                };
294
295                method = "normal".to_string();
296                debug!("Processing MT192 with normal method");
297
298                serde_json::to_value(&mt192_message).map_err(|e| {
299                    error!(error = ?e, "MT192 JSON conversion failed");
300                    DataflowError::Validation(format!("MT192 JSON conversion failed: {e}"))
301                })?
302            }
303            "292" => {
304                let Some(mt292_message) = parsed_message.into_mt292() else {
305                    error!("Failed to convert SwiftMessage to MT292");
306                    return Err(DataflowError::Validation(
307                        "MT292 message not found in SwiftMT message".to_string(),
308                    ));
309                };
310
311                method = "normal".to_string();
312                debug!("Processing MT292 with normal method");
313
314                serde_json::to_value(&mt292_message).map_err(|e| {
315                    error!(error = ?e, "MT292 JSON conversion failed");
316                    DataflowError::Validation(format!("MT292 JSON conversion failed: {e}"))
317                })?
318            }
319            "196" => {
320                let Some(mt196_message) = parsed_message.into_mt196() else {
321                    error!("Failed to convert SwiftMessage to MT196");
322                    return Err(DataflowError::Validation(
323                        "MT196 message not found in SwiftMT message".to_string(),
324                    ));
325                };
326
327                method = "normal".to_string();
328                debug!("Processing MT196 with normal method");
329
330                serde_json::to_value(&mt196_message).map_err(|e| {
331                    error!(error = ?e, "MT196 JSON conversion failed");
332                    DataflowError::Validation(format!("MT196 JSON conversion failed: {e}"))
333                })?
334            }
335            "296" => {
336                let Some(mt296_message) = parsed_message.into_mt296() else {
337                    error!("Failed to convert SwiftMessage to MT296");
338                    return Err(DataflowError::Validation(
339                        "MT296 message not found in SwiftMT message".to_string(),
340                    ));
341                };
342
343                method = "normal".to_string();
344                debug!("Processing MT296 with normal method");
345
346                serde_json::to_value(&mt296_message).map_err(|e| {
347                    error!(error = ?e, "MT296 JSON conversion failed");
348                    DataflowError::Validation(format!("MT296 JSON conversion failed: {e}"))
349                })?
350            }
351            "104" => {
352                let Some(mt104_message) = parsed_message.into_mt104() else {
353                    error!("Failed to convert SwiftMessage to MT104");
354                    return Err(DataflowError::Validation(
355                        "MT104 message not found in SwiftMT message".to_string(),
356                    ));
357                };
358
359                method = "normal".to_string();
360
361                serde_json::to_value(&mt104_message).map_err(|e| {
362                    error!(error = ?e, "MT104 JSON conversion failed");
363                    DataflowError::Validation(format!("MT104 JSON conversion failed: {e}"))
364                })?
365            }
366            "920" => {
367                let Some(mt920_message) = parsed_message.into_mt920() else {
368                    error!("Failed to convert SwiftMessage to MT920");
369                    return Err(DataflowError::Validation(
370                        "MT920 message not found in SwiftMT message".to_string(),
371                    ));
372                };
373
374                method = "normal".to_string();
375
376                serde_json::to_value(&mt920_message).map_err(|e| {
377                    error!(error = ?e, "MT920 JSON conversion failed");
378                    DataflowError::Validation(format!("MT920 JSON conversion failed: {e}"))
379                })?
380            }
381            "940" => {
382                let Some(mt940_message) = parsed_message.into_mt940() else {
383                    error!("Failed to convert SwiftMessage to MT940");
384                    return Err(DataflowError::Validation(
385                        "MT940 message not found in SwiftMT message".to_string(),
386                    ));
387                };
388
389                method = "normal".to_string();
390
391                serde_json::to_value(&mt940_message).map_err(|e| {
392                    error!(error = ?e, "MT940 JSON conversion failed");
393                    DataflowError::Validation(format!("MT940 JSON conversion failed: {e}"))
394                })?
395            }
396            "950" => {
397                let Some(mt950_message) = parsed_message.into_mt950() else {
398                    error!("Failed to convert SwiftMessage to MT950");
399                    return Err(DataflowError::Validation(
400                        "MT950 message not found in SwiftMT message".to_string(),
401                    ));
402                };
403
404                method = "normal".to_string();
405
406                serde_json::to_value(&mt950_message).map_err(|e| {
407                    error!(error = ?e, "MT950 JSON conversion failed");
408                    DataflowError::Validation(format!("MT950 JSON conversion failed: {e}"))
409                })?
410            }
411            "107" => {
412                let Some(mt107_message) = parsed_message.into_mt107() else {
413                    error!("Failed to convert SwiftMessage to MT107");
414                    return Err(DataflowError::Validation(
415                        "MT107 message not found in SwiftMT message".to_string(),
416                    ));
417                };
418                method = "normal".to_string();
419                serde_json::to_value(&mt107_message).map_err(|e| {
420                    error!(error = ?e, "MT107 JSON conversion failed");
421                    DataflowError::Validation(format!("MT107 JSON conversion failed: {e}"))
422                })?
423            }
424            "110" => {
425                let Some(mt110_message) = parsed_message.into_mt110() else {
426                    error!("Failed to convert SwiftMessage to MT110");
427                    return Err(DataflowError::Validation(
428                        "MT110 message not found in SwiftMT message".to_string(),
429                    ));
430                };
431                method = "normal".to_string();
432                serde_json::to_value(&mt110_message).map_err(|e| {
433                    error!(error = ?e, "MT110 JSON conversion failed");
434                    DataflowError::Validation(format!("MT110 JSON conversion failed: {e}"))
435                })?
436            }
437            "111" => {
438                let Some(mt111_message) = parsed_message.into_mt111() else {
439                    error!("Failed to convert SwiftMessage to MT111");
440                    return Err(DataflowError::Validation(
441                        "MT111 message not found in SwiftMT message".to_string(),
442                    ));
443                };
444                method = "normal".to_string();
445                serde_json::to_value(&mt111_message).map_err(|e| {
446                    error!(error = ?e, "MT111 JSON conversion failed");
447                    DataflowError::Validation(format!("MT111 JSON conversion failed: {e}"))
448                })?
449            }
450            "112" => {
451                let Some(mt112_message) = parsed_message.into_mt112() else {
452                    error!("Failed to convert SwiftMessage to MT112");
453                    return Err(DataflowError::Validation(
454                        "MT112 message not found in SwiftMT message".to_string(),
455                    ));
456                };
457                method = "normal".to_string();
458                serde_json::to_value(&mt112_message).map_err(|e| {
459                    error!(error = ?e, "MT112 JSON conversion failed");
460                    DataflowError::Validation(format!("MT112 JSON conversion failed: {e}"))
461                })?
462            }
463            "190" => {
464                let Some(mt190_message) = parsed_message.into_mt190() else {
465                    error!("Failed to convert SwiftMessage to MT190");
466                    return Err(DataflowError::Validation(
467                        "MT190 message not found in SwiftMT message".to_string(),
468                    ));
469                };
470                method = "normal".to_string();
471                serde_json::to_value(&mt190_message).map_err(|e| {
472                    error!(error = ?e, "MT190 JSON conversion failed");
473                    DataflowError::Validation(format!("MT190 JSON conversion failed: {e}"))
474                })?
475            }
476            "191" => {
477                let Some(mt191_message) = parsed_message.into_mt191() else {
478                    error!("Failed to convert SwiftMessage to MT191");
479                    return Err(DataflowError::Validation(
480                        "MT191 message not found in SwiftMT message".to_string(),
481                    ));
482                };
483                method = "normal".to_string();
484                serde_json::to_value(&mt191_message).map_err(|e| {
485                    error!(error = ?e, "MT191 JSON conversion failed");
486                    DataflowError::Validation(format!("MT191 JSON conversion failed: {e}"))
487                })?
488            }
489            "199" => {
490                let Some(mt199_message) = parsed_message.into_mt199() else {
491                    error!("Failed to convert SwiftMessage to MT199");
492                    return Err(DataflowError::Validation(
493                        "MT199 message not found in SwiftMT message".to_string(),
494                    ));
495                };
496                method = "normal".to_string();
497                serde_json::to_value(&mt199_message).map_err(|e| {
498                    error!(error = ?e, "MT199 JSON conversion failed");
499                    DataflowError::Validation(format!("MT199 JSON conversion failed: {e}"))
500                })?
501            }
502            "204" => {
503                let Some(mt204_message) = parsed_message.into_mt204() else {
504                    error!("Failed to convert SwiftMessage to MT204");
505                    return Err(DataflowError::Validation(
506                        "MT204 message not found in SwiftMT message".to_string(),
507                    ));
508                };
509                method = "normal".to_string();
510                serde_json::to_value(&mt204_message).map_err(|e| {
511                    error!(error = ?e, "MT204 JSON conversion failed");
512                    DataflowError::Validation(format!("MT204 JSON conversion failed: {e}"))
513                })?
514            }
515            "210" => {
516                let Some(mt210_message) = parsed_message.into_mt210() else {
517                    error!("Failed to convert SwiftMessage to MT210");
518                    return Err(DataflowError::Validation(
519                        "MT210 message not found in SwiftMT message".to_string(),
520                    ));
521                };
522                method = "normal".to_string();
523                serde_json::to_value(&mt210_message).map_err(|e| {
524                    error!(error = ?e, "MT210 JSON conversion failed");
525                    DataflowError::Validation(format!("MT210 JSON conversion failed: {e}"))
526                })?
527            }
528            "290" => {
529                let Some(mt290_message) = parsed_message.into_mt290() else {
530                    error!("Failed to convert SwiftMessage to MT290");
531                    return Err(DataflowError::Validation(
532                        "MT290 message not found in SwiftMT message".to_string(),
533                    ));
534                };
535                method = "normal".to_string();
536                serde_json::to_value(&mt290_message).map_err(|e| {
537                    error!(error = ?e, "MT290 JSON conversion failed");
538                    DataflowError::Validation(format!("MT290 JSON conversion failed: {e}"))
539                })?
540            }
541            "291" => {
542                let Some(mt291_message) = parsed_message.into_mt291() else {
543                    error!("Failed to convert SwiftMessage to MT291");
544                    return Err(DataflowError::Validation(
545                        "MT291 message not found in SwiftMT message".to_string(),
546                    ));
547                };
548                method = "normal".to_string();
549                serde_json::to_value(&mt291_message).map_err(|e| {
550                    error!(error = ?e, "MT291 JSON conversion failed");
551                    DataflowError::Validation(format!("MT291 JSON conversion failed: {e}"))
552                })?
553            }
554            "299" => {
555                let Some(mt299_message) = parsed_message.into_mt299() else {
556                    error!("Failed to convert SwiftMessage to MT299");
557                    return Err(DataflowError::Validation(
558                        "MT299 message not found in SwiftMT message".to_string(),
559                    ));
560                };
561                method = "normal".to_string();
562                serde_json::to_value(&mt299_message).map_err(|e| {
563                    error!(error = ?e, "MT299 JSON conversion failed");
564                    DataflowError::Validation(format!("MT299 JSON conversion failed: {e}"))
565                })?
566            }
567            "935" => {
568                let Some(mt935_message) = parsed_message.into_mt935() else {
569                    error!("Failed to convert SwiftMessage to MT935");
570                    return Err(DataflowError::Validation(
571                        "MT935 message not found in SwiftMT message".to_string(),
572                    ));
573                };
574                method = "normal".to_string();
575                serde_json::to_value(&mt935_message).map_err(|e| {
576                    error!(error = ?e, "MT935 JSON conversion failed");
577                    DataflowError::Validation(format!("MT935 JSON conversion failed: {e}"))
578                })?
579            }
580            "941" => {
581                let Some(mt941_message) = parsed_message.into_mt941() else {
582                    error!("Failed to convert SwiftMessage to MT941");
583                    return Err(DataflowError::Validation(
584                        "MT941 message not found in SwiftMT message".to_string(),
585                    ));
586                };
587                method = "normal".to_string();
588                serde_json::to_value(&mt941_message).map_err(|e| {
589                    error!(error = ?e, "MT941 JSON conversion failed");
590                    DataflowError::Validation(format!("MT941 JSON conversion failed: {e}"))
591                })?
592            }
593            "942" => {
594                let Some(mt942_message) = parsed_message.into_mt942() else {
595                    error!("Failed to convert SwiftMessage to MT942");
596                    return Err(DataflowError::Validation(
597                        "MT942 message not found in SwiftMT message".to_string(),
598                    ));
599                };
600                method = "normal".to_string();
601                serde_json::to_value(&mt942_message).map_err(|e| {
602                    error!(error = ?e, "MT942 JSON conversion failed");
603                    DataflowError::Validation(format!("MT942 JSON conversion failed: {e}"))
604                })?
605            }
606            _ => {
607                error!(message_type = %message_type, "Unsupported message type encountered");
608                return Err(DataflowError::Validation(format!(
609                    "Unsupported message type: {message_type}"
610                )));
611            }
612        };
613
614        // Store the parsed result in message data
615        message
616            .data_mut()
617            .as_object_mut()
618            .unwrap()
619            .insert(parsed_field.to_string(), parsed_data.clone());
620
621        message.metadata_mut().as_object_mut().unwrap().insert(
622            parsed_field.to_string(),
623            json!({
624                "message_type": message_type,
625                "method": method,
626            }),
627        );
628
629        debug!(
630            message_type = %message_type,
631            method = %method,
632            parsed_field = %parsed_field,
633            "MT message parsing completed successfully for forward transformation"
634        );
635
636        // Important: invalidate cache after modifications
637        message.invalidate_context_cache();
638
639        Ok((
640            200,
641            vec![Change {
642                path: Arc::from(format!("data.{}", parsed_field)),
643                old_value: Arc::new(Value::Null),
644                new_value: Arc::new(parsed_data),
645            }],
646        ))
647    }
648
649    /// Manual string unescaping for common escape sequences
650    fn manual_unescape(input: &str) -> String {
651        let mut result = input.trim();
652
653        // Remove surrounding double quotes if present
654        if result.starts_with('"') && result.ends_with('"') && result.len() > 1 {
655            result = &result[1..result.len() - 1];
656        }
657
658        // Now unescape the inner content
659        result
660            .replace("\\r\\n", "\n")
661            .replace("\\r", "\r")
662            .replace("\\n", "\n")
663            .replace("\\t", "\t")
664            .replace("\\\"", "\"")
665            .replace("\\'", "'")
666            .replace("\\\\", "\\")
667            .replace("\\u0020", " ")
668            .replace("\\u0022", "\"")
669            .replace("\\u003C", "<")
670            .replace("\\u003E", ">")
671            .replace("\\u003D", "=")
672            .replace("\\u002F", "/")
673    }
674}