swift_mt_message/plugin/
parse.rs

1use crate::SwiftParser;
2use async_trait::async_trait;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5    AsyncFunctionHandler, FunctionConfig,
6    error::Result,
7    message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::{Value, json};
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Parse;
15
16#[async_trait]
17impl AsyncFunctionHandler for Parse {
18    #[instrument(skip(self, message, config, _datalogic))]
19    async fn execute(
20        &self,
21        message: &mut Message,
22        config: &FunctionConfig,
23        _datalogic: Arc<DataLogic>,
24    ) -> Result<(usize, Vec<Change>)> {
25        debug!("Starting MT message parsing for forward transformation");
26
27        // Extract custom configuration
28        let input = match config {
29            FunctionConfig::Custom { input, .. } => input,
30            _ => {
31                return Err(DataflowError::Validation(
32                    "Invalid configuration type".to_string(),
33                ));
34            }
35        };
36
37        let source_field = input.get("source").and_then(Value::as_str).ok_or_else(|| {
38            DataflowError::Validation("'source' parameter is required".to_string())
39        })?;
40
41        let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
42            DataflowError::Validation("'target' parameter is required".to_string())
43        })?;
44
45        let payload = if source_field == "payload" {
46            message.payload.to_string().replace("\\n", "\n")
47        } else {
48            // Check if the field contains an object with mt_message (from generate_mt output)
49            let field_value = message.data().get(source_field).ok_or_else(|| {
50                DataflowError::Validation(format!(
51                    "MT message field '{}' not found in message data",
52                    source_field
53                ))
54            })?;
55
56            // If it's an object with mt_message field, extract that
57            if let Some(mt_msg) = field_value.get("mt_message").and_then(Value::as_str) {
58                mt_msg.to_string()
59            } else if let Some(s) = field_value.as_str() {
60                // If it's a direct string, use it
61                s.to_string()
62            } else {
63                return Err(DataflowError::Validation(format!(
64                    "Field '{}' does not contain a valid MT message",
65                    source_field
66                )));
67            }
68        };
69
70        debug!(
71            source_field = %source_field,
72            target_field = %target_field,
73            payload_length = payload.len(),
74            "Extracted MT payload for parsing"
75        );
76
77        self.parse_swift_mt(message, &payload, target_field)
78    }
79}
80
81impl Parse {
82    fn parse_swift_mt(
83        &self,
84        message: &mut Message,
85        payload: &str,
86        target_field: &str,
87    ) -> Result<(usize, Vec<Change>)> {
88        debug!("Parsing SwiftMT message for forward transformation");
89
90        let payload = Parse::manual_unescape(payload);
91        debug!("Parsing MT message with payload length: {}", payload.len());
92        let parsed_message = SwiftParser::parse_auto(&payload).map_err(|e| {
93            error!(error = ?e, "SwiftMT parsing failed");
94            DataflowError::Validation(format!("SwiftMT parser error: {e:?}"))
95        })?;
96
97        let message_type = parsed_message.message_type().to_string();
98        debug!(message_type = %message_type, "Successfully parsed SwiftMT message");
99
100        let method: String;
101
102        let parsed_data = match message_type.as_str() {
103            "101" => {
104                let Some(mt101_message) = parsed_message.into_mt101() else {
105                    error!("Failed to convert SwiftMessage to MT101");
106                    return Err(DataflowError::Validation(
107                        "MT101 message not found in SwiftMT message".to_string(),
108                    ));
109                };
110
111                method = "normal".to_string();
112
113                serde_json::to_value(&mt101_message).map_err(|e| {
114                    error!(error = ?e, "MT101 JSON conversion failed");
115                    DataflowError::Validation(format!("MT101 JSON conversion failed: {e}"))
116                })?
117            }
118            "103" => {
119                let Some(mt103_message) = parsed_message.into_mt103() else {
120                    error!("Failed to convert SwiftMessage to MT103");
121                    return Err(DataflowError::Validation(
122                        "MT103 message not found in SwiftMT message".to_string(),
123                    ));
124                };
125
126                method = if mt103_message.has_reject_codes() {
127                    "reject".to_string()
128                } else if mt103_message.has_return_codes() {
129                    "return".to_string()
130                } else if mt103_message.is_stp_message() {
131                    "stp".to_string()
132                } else {
133                    "normal".to_string()
134                };
135
136                debug!(method = %method, "Determined MT103 processing method");
137
138                serde_json::to_value(&mt103_message).map_err(|e| {
139                    error!(error = ?e, "MT103 JSON conversion failed");
140                    DataflowError::Validation(format!("MT103 JSON conversion failed: {e}"))
141                })?
142            }
143            "200" => {
144                let Some(mt200_message) = parsed_message.into_mt200() else {
145                    error!("Failed to convert SwiftMessage to MT200");
146                    return Err(DataflowError::Validation(
147                        "MT200 message not found in SwiftMT message".to_string(),
148                    ));
149                };
150
151                method = "normal".to_string();
152                debug!("Processing MT200 with normal method");
153
154                serde_json::to_value(&mt200_message).map_err(|e| {
155                    error!(error = ?e, "MT200 JSON conversion failed");
156                    DataflowError::Validation(format!("MT200 JSON conversion failed: {e}"))
157                })?
158            }
159            "202" => {
160                let Some(mt202_message) = parsed_message.into_mt202() else {
161                    error!("Failed to convert SwiftMessage to MT202");
162                    return Err(DataflowError::Validation(
163                        "MT202 message not found in SwiftMT message".to_string(),
164                    ));
165                };
166
167                method = if mt202_message.has_reject_codes()
168                    || mt202_message
169                        .user_header
170                        .as_ref()
171                        .and_then(|h| h.validation_flag.as_ref())
172                        .map(|flag| flag.as_str() == "REJT")
173                        .unwrap_or(false)
174                {
175                    "reject".to_string()
176                } else if mt202_message.has_return_codes()
177                    || mt202_message
178                        .user_header
179                        .as_ref()
180                        .and_then(|h| h.validation_flag.as_ref())
181                        .map(|flag| flag.as_str() == "RETN")
182                        .unwrap_or(false)
183                {
184                    "return".to_string()
185                } else if mt202_message.is_cover_message()
186                    || mt202_message
187                        .user_header
188                        .as_ref()
189                        .and_then(|h| h.validation_flag.as_ref())
190                        .map(|flag| flag.as_str() == "COV")
191                        .unwrap_or(false)
192                {
193                    "cover".to_string()
194                } else {
195                    "normal".to_string()
196                };
197
198                debug!(method = %method, "Determined MT202 processing method");
199
200                serde_json::to_value(&mt202_message).map_err(|e| {
201                    error!(error = ?e, "MT202 JSON conversion failed");
202                    DataflowError::Validation(format!("MT202 JSON conversion failed: {e}"))
203                })?
204            }
205            "205" => {
206                let Some(mt205_message) = parsed_message.into_mt205() else {
207                    error!("Failed to convert SwiftMessage to MT205");
208                    return Err(DataflowError::Validation(
209                        "MT205 message not found in SwiftMT message".to_string(),
210                    ));
211                };
212
213                method = if mt205_message.has_reject_codes()
214                    || mt205_message
215                        .user_header
216                        .as_ref()
217                        .and_then(|h| h.validation_flag.as_ref())
218                        .map(|flag| flag.as_str() == "REJT")
219                        .unwrap_or(false)
220                {
221                    "reject".to_string()
222                } else if mt205_message.has_return_codes()
223                    || mt205_message
224                        .user_header
225                        .as_ref()
226                        .and_then(|h| h.validation_flag.as_ref())
227                        .map(|flag| flag.as_str() == "RETN")
228                        .unwrap_or(false)
229                {
230                    "return".to_string()
231                } else if mt205_message.is_cover_message()
232                    || mt205_message
233                        .user_header
234                        .as_ref()
235                        .and_then(|h| h.validation_flag.as_ref())
236                        .map(|flag| flag.as_str() == "COV")
237                        .unwrap_or(false)
238                {
239                    "cover".to_string()
240                } else {
241                    "normal".to_string()
242                };
243
244                debug!(method = %method, "Determined MT205 processing method");
245
246                serde_json::to_value(&mt205_message).map_err(|e| {
247                    error!(error = ?e, "MT205 JSON conversion failed");
248                    DataflowError::Validation(format!("MT205 JSON conversion failed: {e}"))
249                })?
250            }
251            "900" => {
252                let Some(mt900_message) = parsed_message.into_mt900() else {
253                    error!("Failed to convert SwiftMessage to MT900");
254                    return Err(DataflowError::Validation(
255                        "MT900 message not found in SwiftMT message".to_string(),
256                    ));
257                };
258
259                method = "normal".to_string();
260                debug!("Processing MT900 with normal method");
261
262                serde_json::to_value(&mt900_message).map_err(|e| {
263                    error!(error = ?e, "MT900 JSON conversion failed");
264                    DataflowError::Validation(format!("MT900 JSON conversion failed: {e}"))
265                })?
266            }
267            "910" => {
268                let Some(mt910_message) = parsed_message.into_mt910() else {
269                    error!("Failed to convert SwiftMessage to MT910");
270                    return Err(DataflowError::Validation(
271                        "MT910 message not found in SwiftMT message".to_string(),
272                    ));
273                };
274
275                method = "normal".to_string();
276                debug!("Processing MT910 with normal method");
277
278                serde_json::to_value(&mt910_message).map_err(|e| {
279                    error!(error = ?e, "MT910 JSON conversion failed");
280                    DataflowError::Validation(format!("MT910 JSON conversion failed: {e}"))
281                })?
282            }
283            "192" => {
284                let Some(mt192_message) = parsed_message.into_mt192() else {
285                    error!("Failed to convert SwiftMessage to MT192");
286                    return Err(DataflowError::Validation(
287                        "MT192 message not found in SwiftMT message".to_string(),
288                    ));
289                };
290
291                method = "normal".to_string();
292                debug!("Processing MT192 with normal method");
293
294                serde_json::to_value(&mt192_message).map_err(|e| {
295                    error!(error = ?e, "MT192 JSON conversion failed");
296                    DataflowError::Validation(format!("MT192 JSON conversion failed: {e}"))
297                })?
298            }
299            "292" => {
300                let Some(mt292_message) = parsed_message.into_mt292() else {
301                    error!("Failed to convert SwiftMessage to MT292");
302                    return Err(DataflowError::Validation(
303                        "MT292 message not found in SwiftMT message".to_string(),
304                    ));
305                };
306
307                method = "normal".to_string();
308                debug!("Processing MT292 with normal method");
309
310                serde_json::to_value(&mt292_message).map_err(|e| {
311                    error!(error = ?e, "MT292 JSON conversion failed");
312                    DataflowError::Validation(format!("MT292 JSON conversion failed: {e}"))
313                })?
314            }
315            "196" => {
316                let Some(mt196_message) = parsed_message.into_mt196() else {
317                    error!("Failed to convert SwiftMessage to MT196");
318                    return Err(DataflowError::Validation(
319                        "MT196 message not found in SwiftMT message".to_string(),
320                    ));
321                };
322
323                method = "normal".to_string();
324                debug!("Processing MT196 with normal method");
325
326                serde_json::to_value(&mt196_message).map_err(|e| {
327                    error!(error = ?e, "MT196 JSON conversion failed");
328                    DataflowError::Validation(format!("MT196 JSON conversion failed: {e}"))
329                })?
330            }
331            "296" => {
332                let Some(mt296_message) = parsed_message.into_mt296() else {
333                    error!("Failed to convert SwiftMessage to MT296");
334                    return Err(DataflowError::Validation(
335                        "MT296 message not found in SwiftMT message".to_string(),
336                    ));
337                };
338
339                method = "normal".to_string();
340                debug!("Processing MT296 with normal method");
341
342                serde_json::to_value(&mt296_message).map_err(|e| {
343                    error!(error = ?e, "MT296 JSON conversion failed");
344                    DataflowError::Validation(format!("MT296 JSON conversion failed: {e}"))
345                })?
346            }
347            "104" => {
348                let Some(mt104_message) = parsed_message.into_mt104() else {
349                    error!("Failed to convert SwiftMessage to MT104");
350                    return Err(DataflowError::Validation(
351                        "MT104 message not found in SwiftMT message".to_string(),
352                    ));
353                };
354
355                method = "normal".to_string();
356
357                serde_json::to_value(&mt104_message).map_err(|e| {
358                    error!(error = ?e, "MT104 JSON conversion failed");
359                    DataflowError::Validation(format!("MT104 JSON conversion failed: {e}"))
360                })?
361            }
362            "920" => {
363                let Some(mt920_message) = parsed_message.into_mt920() else {
364                    error!("Failed to convert SwiftMessage to MT920");
365                    return Err(DataflowError::Validation(
366                        "MT920 message not found in SwiftMT message".to_string(),
367                    ));
368                };
369
370                method = "normal".to_string();
371
372                serde_json::to_value(&mt920_message).map_err(|e| {
373                    error!(error = ?e, "MT920 JSON conversion failed");
374                    DataflowError::Validation(format!("MT920 JSON conversion failed: {e}"))
375                })?
376            }
377            "940" => {
378                let Some(mt940_message) = parsed_message.into_mt940() else {
379                    error!("Failed to convert SwiftMessage to MT940");
380                    return Err(DataflowError::Validation(
381                        "MT940 message not found in SwiftMT message".to_string(),
382                    ));
383                };
384
385                method = "normal".to_string();
386
387                serde_json::to_value(&mt940_message).map_err(|e| {
388                    error!(error = ?e, "MT940 JSON conversion failed");
389                    DataflowError::Validation(format!("MT940 JSON conversion failed: {e}"))
390                })?
391            }
392            "950" => {
393                let Some(mt950_message) = parsed_message.into_mt950() else {
394                    error!("Failed to convert SwiftMessage to MT950");
395                    return Err(DataflowError::Validation(
396                        "MT950 message not found in SwiftMT message".to_string(),
397                    ));
398                };
399
400                method = "normal".to_string();
401
402                serde_json::to_value(&mt950_message).map_err(|e| {
403                    error!(error = ?e, "MT950 JSON conversion failed");
404                    DataflowError::Validation(format!("MT950 JSON conversion failed: {e}"))
405                })?
406            }
407            "107" => {
408                let Some(mt107_message) = parsed_message.into_mt107() else {
409                    error!("Failed to convert SwiftMessage to MT107");
410                    return Err(DataflowError::Validation(
411                        "MT107 message not found in SwiftMT message".to_string(),
412                    ));
413                };
414                method = "normal".to_string();
415                serde_json::to_value(&mt107_message).map_err(|e| {
416                    error!(error = ?e, "MT107 JSON conversion failed");
417                    DataflowError::Validation(format!("MT107 JSON conversion failed: {e}"))
418                })?
419            }
420            "110" => {
421                let Some(mt110_message) = parsed_message.into_mt110() else {
422                    error!("Failed to convert SwiftMessage to MT110");
423                    return Err(DataflowError::Validation(
424                        "MT110 message not found in SwiftMT message".to_string(),
425                    ));
426                };
427                method = "normal".to_string();
428                serde_json::to_value(&mt110_message).map_err(|e| {
429                    error!(error = ?e, "MT110 JSON conversion failed");
430                    DataflowError::Validation(format!("MT110 JSON conversion failed: {e}"))
431                })?
432            }
433            "111" => {
434                let Some(mt111_message) = parsed_message.into_mt111() else {
435                    error!("Failed to convert SwiftMessage to MT111");
436                    return Err(DataflowError::Validation(
437                        "MT111 message not found in SwiftMT message".to_string(),
438                    ));
439                };
440                method = "normal".to_string();
441                serde_json::to_value(&mt111_message).map_err(|e| {
442                    error!(error = ?e, "MT111 JSON conversion failed");
443                    DataflowError::Validation(format!("MT111 JSON conversion failed: {e}"))
444                })?
445            }
446            "112" => {
447                let Some(mt112_message) = parsed_message.into_mt112() else {
448                    error!("Failed to convert SwiftMessage to MT112");
449                    return Err(DataflowError::Validation(
450                        "MT112 message not found in SwiftMT message".to_string(),
451                    ));
452                };
453                method = "normal".to_string();
454                serde_json::to_value(&mt112_message).map_err(|e| {
455                    error!(error = ?e, "MT112 JSON conversion failed");
456                    DataflowError::Validation(format!("MT112 JSON conversion failed: {e}"))
457                })?
458            }
459            "190" => {
460                let Some(mt190_message) = parsed_message.into_mt190() else {
461                    error!("Failed to convert SwiftMessage to MT190");
462                    return Err(DataflowError::Validation(
463                        "MT190 message not found in SwiftMT message".to_string(),
464                    ));
465                };
466                method = "normal".to_string();
467                serde_json::to_value(&mt190_message).map_err(|e| {
468                    error!(error = ?e, "MT190 JSON conversion failed");
469                    DataflowError::Validation(format!("MT190 JSON conversion failed: {e}"))
470                })?
471            }
472            "191" => {
473                let Some(mt191_message) = parsed_message.into_mt191() else {
474                    error!("Failed to convert SwiftMessage to MT191");
475                    return Err(DataflowError::Validation(
476                        "MT191 message not found in SwiftMT message".to_string(),
477                    ));
478                };
479                method = "normal".to_string();
480                serde_json::to_value(&mt191_message).map_err(|e| {
481                    error!(error = ?e, "MT191 JSON conversion failed");
482                    DataflowError::Validation(format!("MT191 JSON conversion failed: {e}"))
483                })?
484            }
485            "199" => {
486                let Some(mt199_message) = parsed_message.into_mt199() else {
487                    error!("Failed to convert SwiftMessage to MT199");
488                    return Err(DataflowError::Validation(
489                        "MT199 message not found in SwiftMT message".to_string(),
490                    ));
491                };
492                method = "normal".to_string();
493                serde_json::to_value(&mt199_message).map_err(|e| {
494                    error!(error = ?e, "MT199 JSON conversion failed");
495                    DataflowError::Validation(format!("MT199 JSON conversion failed: {e}"))
496                })?
497            }
498            "204" => {
499                let Some(mt204_message) = parsed_message.into_mt204() else {
500                    error!("Failed to convert SwiftMessage to MT204");
501                    return Err(DataflowError::Validation(
502                        "MT204 message not found in SwiftMT message".to_string(),
503                    ));
504                };
505                method = "normal".to_string();
506                serde_json::to_value(&mt204_message).map_err(|e| {
507                    error!(error = ?e, "MT204 JSON conversion failed");
508                    DataflowError::Validation(format!("MT204 JSON conversion failed: {e}"))
509                })?
510            }
511            "210" => {
512                let Some(mt210_message) = parsed_message.into_mt210() else {
513                    error!("Failed to convert SwiftMessage to MT210");
514                    return Err(DataflowError::Validation(
515                        "MT210 message not found in SwiftMT message".to_string(),
516                    ));
517                };
518                method = "normal".to_string();
519                serde_json::to_value(&mt210_message).map_err(|e| {
520                    error!(error = ?e, "MT210 JSON conversion failed");
521                    DataflowError::Validation(format!("MT210 JSON conversion failed: {e}"))
522                })?
523            }
524            "290" => {
525                let Some(mt290_message) = parsed_message.into_mt290() else {
526                    error!("Failed to convert SwiftMessage to MT290");
527                    return Err(DataflowError::Validation(
528                        "MT290 message not found in SwiftMT message".to_string(),
529                    ));
530                };
531                method = "normal".to_string();
532                serde_json::to_value(&mt290_message).map_err(|e| {
533                    error!(error = ?e, "MT290 JSON conversion failed");
534                    DataflowError::Validation(format!("MT290 JSON conversion failed: {e}"))
535                })?
536            }
537            "291" => {
538                let Some(mt291_message) = parsed_message.into_mt291() else {
539                    error!("Failed to convert SwiftMessage to MT291");
540                    return Err(DataflowError::Validation(
541                        "MT291 message not found in SwiftMT message".to_string(),
542                    ));
543                };
544                method = "normal".to_string();
545                serde_json::to_value(&mt291_message).map_err(|e| {
546                    error!(error = ?e, "MT291 JSON conversion failed");
547                    DataflowError::Validation(format!("MT291 JSON conversion failed: {e}"))
548                })?
549            }
550            "299" => {
551                let Some(mt299_message) = parsed_message.into_mt299() else {
552                    error!("Failed to convert SwiftMessage to MT299");
553                    return Err(DataflowError::Validation(
554                        "MT299 message not found in SwiftMT message".to_string(),
555                    ));
556                };
557                method = "normal".to_string();
558                serde_json::to_value(&mt299_message).map_err(|e| {
559                    error!(error = ?e, "MT299 JSON conversion failed");
560                    DataflowError::Validation(format!("MT299 JSON conversion failed: {e}"))
561                })?
562            }
563            "935" => {
564                let Some(mt935_message) = parsed_message.into_mt935() else {
565                    error!("Failed to convert SwiftMessage to MT935");
566                    return Err(DataflowError::Validation(
567                        "MT935 message not found in SwiftMT message".to_string(),
568                    ));
569                };
570                method = "normal".to_string();
571                serde_json::to_value(&mt935_message).map_err(|e| {
572                    error!(error = ?e, "MT935 JSON conversion failed");
573                    DataflowError::Validation(format!("MT935 JSON conversion failed: {e}"))
574                })?
575            }
576            "941" => {
577                let Some(mt941_message) = parsed_message.into_mt941() else {
578                    error!("Failed to convert SwiftMessage to MT941");
579                    return Err(DataflowError::Validation(
580                        "MT941 message not found in SwiftMT message".to_string(),
581                    ));
582                };
583                method = "normal".to_string();
584                serde_json::to_value(&mt941_message).map_err(|e| {
585                    error!(error = ?e, "MT941 JSON conversion failed");
586                    DataflowError::Validation(format!("MT941 JSON conversion failed: {e}"))
587                })?
588            }
589            "942" => {
590                let Some(mt942_message) = parsed_message.into_mt942() else {
591                    error!("Failed to convert SwiftMessage to MT942");
592                    return Err(DataflowError::Validation(
593                        "MT942 message not found in SwiftMT message".to_string(),
594                    ));
595                };
596                method = "normal".to_string();
597                serde_json::to_value(&mt942_message).map_err(|e| {
598                    error!(error = ?e, "MT942 JSON conversion failed");
599                    DataflowError::Validation(format!("MT942 JSON conversion failed: {e}"))
600                })?
601            }
602            _ => {
603                error!(message_type = %message_type, "Unsupported message type encountered");
604                return Err(DataflowError::Validation(format!(
605                    "Unsupported message type: {message_type}"
606                )));
607            }
608        };
609
610        // Store the parsed result in message data
611        message
612            .data_mut()
613            .as_object_mut()
614            .unwrap()
615            .insert(target_field.to_string(), parsed_data.clone());
616
617        message.metadata_mut().as_object_mut().unwrap().insert(
618            target_field.to_string(),
619            json!({
620                "message_type": message_type,
621                "method": method,
622            }),
623        );
624
625        debug!(
626            message_type = %message_type,
627            method = %method,
628            target_field = %target_field,
629            "MT message parsing completed successfully for forward transformation"
630        );
631
632        // Important: invalidate cache after modifications
633        message.invalidate_context_cache();
634
635        Ok((
636            200,
637            vec![Change {
638                path: Arc::from(format!("data.{}", target_field)),
639                old_value: Arc::new(Value::Null),
640                new_value: Arc::new(parsed_data),
641            }],
642        ))
643    }
644
645    /// Manual string unescaping for common escape sequences
646    fn manual_unescape(input: &str) -> String {
647        let mut result = input.trim();
648
649        // Remove surrounding double quotes if present
650        if result.starts_with('"') && result.ends_with('"') && result.len() > 1 {
651            result = &result[1..result.len() - 1];
652        }
653
654        // Now unescape the inner content
655        result
656            .replace("\\r\\n", "\n")
657            .replace("\\r", "\r")
658            .replace("\\n", "\n")
659            .replace("\\t", "\t")
660            .replace("\\\"", "\"")
661            .replace("\\'", "'")
662            .replace("\\\\", "\\")
663            .replace("\\u0020", " ")
664            .replace("\\u0022", "\"")
665            .replace("\\u003C", "<")
666            .replace("\\u003E", ">")
667            .replace("\\u003D", "=")
668            .replace("\\u002F", "/")
669    }
670}