1use crate::SwiftParser;
2use async_trait::async_trait;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5 AsyncFunctionHandler, FunctionConfig,
6 error::Result,
7 message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::{Value, json};
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Parse;
15
16#[async_trait]
17impl AsyncFunctionHandler for Parse {
18 #[instrument(skip(self, message, config, _datalogic))]
19 async fn execute(
20 &self,
21 message: &mut Message,
22 config: &FunctionConfig,
23 _datalogic: Arc<DataLogic>,
24 ) -> Result<(usize, Vec<Change>)> {
25 debug!("Starting MT message parsing for forward transformation");
26
27 let input = match config {
29 FunctionConfig::Custom { input, .. } => input,
30 _ => {
31 return Err(DataflowError::Validation(
32 "Invalid configuration type".to_string(),
33 ));
34 }
35 };
36
37 let source_field = input.get("source").and_then(Value::as_str).ok_or_else(|| {
38 DataflowError::Validation("'source' parameter is required".to_string())
39 })?;
40
41 let target_field = input.get("target").and_then(Value::as_str).ok_or_else(|| {
42 DataflowError::Validation("'target' parameter is required".to_string())
43 })?;
44
45 let payload = if source_field == "payload" {
46 message.payload.to_string().replace("\\n", "\n")
47 } else {
48 let field_value = message.data().get(source_field).ok_or_else(|| {
50 DataflowError::Validation(format!(
51 "MT message field '{}' not found in message data",
52 source_field
53 ))
54 })?;
55
56 if let Some(mt_msg) = field_value.get("mt_message").and_then(Value::as_str) {
58 mt_msg.to_string()
59 } else if let Some(s) = field_value.as_str() {
60 s.to_string()
62 } else {
63 return Err(DataflowError::Validation(format!(
64 "Field '{}' does not contain a valid MT message",
65 source_field
66 )));
67 }
68 };
69
70 debug!(
71 source_field = %source_field,
72 target_field = %target_field,
73 payload_length = payload.len(),
74 "Extracted MT payload for parsing"
75 );
76
77 self.parse_swift_mt(message, &payload, target_field)
78 }
79}
80
81impl Parse {
82 fn parse_swift_mt(
83 &self,
84 message: &mut Message,
85 payload: &str,
86 target_field: &str,
87 ) -> Result<(usize, Vec<Change>)> {
88 debug!("Parsing SwiftMT message for forward transformation");
89
90 let payload = Parse::manual_unescape(payload);
91 debug!("Parsing MT message with payload length: {}", payload.len());
92 let parsed_message = SwiftParser::parse_auto(&payload).map_err(|e| {
93 error!(error = ?e, "SwiftMT parsing failed");
94 DataflowError::Validation(format!("SwiftMT parser error: {e:?}"))
95 })?;
96
97 let message_type = parsed_message.message_type().to_string();
98 debug!(message_type = %message_type, "Successfully parsed SwiftMT message");
99
100 let method: String;
101
102 let parsed_data = match message_type.as_str() {
103 "101" => {
104 let Some(mt101_message) = parsed_message.into_mt101() else {
105 error!("Failed to convert SwiftMessage to MT101");
106 return Err(DataflowError::Validation(
107 "MT101 message not found in SwiftMT message".to_string(),
108 ));
109 };
110
111 method = "normal".to_string();
112
113 serde_json::to_value(&mt101_message).map_err(|e| {
114 error!(error = ?e, "MT101 JSON conversion failed");
115 DataflowError::Validation(format!("MT101 JSON conversion failed: {e}"))
116 })?
117 }
118 "103" => {
119 let Some(mt103_message) = parsed_message.into_mt103() else {
120 error!("Failed to convert SwiftMessage to MT103");
121 return Err(DataflowError::Validation(
122 "MT103 message not found in SwiftMT message".to_string(),
123 ));
124 };
125
126 method = if mt103_message.has_reject_codes() {
127 "reject".to_string()
128 } else if mt103_message.has_return_codes() {
129 "return".to_string()
130 } else if mt103_message.is_stp_message() {
131 "stp".to_string()
132 } else {
133 "normal".to_string()
134 };
135
136 debug!(method = %method, "Determined MT103 processing method");
137
138 serde_json::to_value(&mt103_message).map_err(|e| {
139 error!(error = ?e, "MT103 JSON conversion failed");
140 DataflowError::Validation(format!("MT103 JSON conversion failed: {e}"))
141 })?
142 }
143 "200" => {
144 let Some(mt200_message) = parsed_message.into_mt200() else {
145 error!("Failed to convert SwiftMessage to MT200");
146 return Err(DataflowError::Validation(
147 "MT200 message not found in SwiftMT message".to_string(),
148 ));
149 };
150
151 method = "normal".to_string();
152 debug!("Processing MT200 with normal method");
153
154 serde_json::to_value(&mt200_message).map_err(|e| {
155 error!(error = ?e, "MT200 JSON conversion failed");
156 DataflowError::Validation(format!("MT200 JSON conversion failed: {e}"))
157 })?
158 }
159 "202" => {
160 let Some(mt202_message) = parsed_message.into_mt202() else {
161 error!("Failed to convert SwiftMessage to MT202");
162 return Err(DataflowError::Validation(
163 "MT202 message not found in SwiftMT message".to_string(),
164 ));
165 };
166
167 method = if mt202_message.has_reject_codes()
168 || mt202_message
169 .user_header
170 .as_ref()
171 .and_then(|h| h.validation_flag.as_ref())
172 .map(|flag| flag.as_str() == "REJT")
173 .unwrap_or(false)
174 {
175 "reject".to_string()
176 } else if mt202_message.has_return_codes()
177 || mt202_message
178 .user_header
179 .as_ref()
180 .and_then(|h| h.validation_flag.as_ref())
181 .map(|flag| flag.as_str() == "RETN")
182 .unwrap_or(false)
183 {
184 "return".to_string()
185 } else if mt202_message.is_cover_message()
186 || mt202_message
187 .user_header
188 .as_ref()
189 .and_then(|h| h.validation_flag.as_ref())
190 .map(|flag| flag.as_str() == "COV")
191 .unwrap_or(false)
192 {
193 "cover".to_string()
194 } else {
195 "normal".to_string()
196 };
197
198 debug!(method = %method, "Determined MT202 processing method");
199
200 serde_json::to_value(&mt202_message).map_err(|e| {
201 error!(error = ?e, "MT202 JSON conversion failed");
202 DataflowError::Validation(format!("MT202 JSON conversion failed: {e}"))
203 })?
204 }
205 "205" => {
206 let Some(mt205_message) = parsed_message.into_mt205() else {
207 error!("Failed to convert SwiftMessage to MT205");
208 return Err(DataflowError::Validation(
209 "MT205 message not found in SwiftMT message".to_string(),
210 ));
211 };
212
213 method = if mt205_message.has_reject_codes()
214 || mt205_message
215 .user_header
216 .as_ref()
217 .and_then(|h| h.validation_flag.as_ref())
218 .map(|flag| flag.as_str() == "REJT")
219 .unwrap_or(false)
220 {
221 "reject".to_string()
222 } else if mt205_message.has_return_codes()
223 || mt205_message
224 .user_header
225 .as_ref()
226 .and_then(|h| h.validation_flag.as_ref())
227 .map(|flag| flag.as_str() == "RETN")
228 .unwrap_or(false)
229 {
230 "return".to_string()
231 } else if mt205_message.is_cover_message()
232 || mt205_message
233 .user_header
234 .as_ref()
235 .and_then(|h| h.validation_flag.as_ref())
236 .map(|flag| flag.as_str() == "COV")
237 .unwrap_or(false)
238 {
239 "cover".to_string()
240 } else {
241 "normal".to_string()
242 };
243
244 debug!(method = %method, "Determined MT205 processing method");
245
246 serde_json::to_value(&mt205_message).map_err(|e| {
247 error!(error = ?e, "MT205 JSON conversion failed");
248 DataflowError::Validation(format!("MT205 JSON conversion failed: {e}"))
249 })?
250 }
251 "900" => {
252 let Some(mt900_message) = parsed_message.into_mt900() else {
253 error!("Failed to convert SwiftMessage to MT900");
254 return Err(DataflowError::Validation(
255 "MT900 message not found in SwiftMT message".to_string(),
256 ));
257 };
258
259 method = "normal".to_string();
260 debug!("Processing MT900 with normal method");
261
262 serde_json::to_value(&mt900_message).map_err(|e| {
263 error!(error = ?e, "MT900 JSON conversion failed");
264 DataflowError::Validation(format!("MT900 JSON conversion failed: {e}"))
265 })?
266 }
267 "910" => {
268 let Some(mt910_message) = parsed_message.into_mt910() else {
269 error!("Failed to convert SwiftMessage to MT910");
270 return Err(DataflowError::Validation(
271 "MT910 message not found in SwiftMT message".to_string(),
272 ));
273 };
274
275 method = "normal".to_string();
276 debug!("Processing MT910 with normal method");
277
278 serde_json::to_value(&mt910_message).map_err(|e| {
279 error!(error = ?e, "MT910 JSON conversion failed");
280 DataflowError::Validation(format!("MT910 JSON conversion failed: {e}"))
281 })?
282 }
283 "192" => {
284 let Some(mt192_message) = parsed_message.into_mt192() else {
285 error!("Failed to convert SwiftMessage to MT192");
286 return Err(DataflowError::Validation(
287 "MT192 message not found in SwiftMT message".to_string(),
288 ));
289 };
290
291 method = "normal".to_string();
292 debug!("Processing MT192 with normal method");
293
294 serde_json::to_value(&mt192_message).map_err(|e| {
295 error!(error = ?e, "MT192 JSON conversion failed");
296 DataflowError::Validation(format!("MT192 JSON conversion failed: {e}"))
297 })?
298 }
299 "292" => {
300 let Some(mt292_message) = parsed_message.into_mt292() else {
301 error!("Failed to convert SwiftMessage to MT292");
302 return Err(DataflowError::Validation(
303 "MT292 message not found in SwiftMT message".to_string(),
304 ));
305 };
306
307 method = "normal".to_string();
308 debug!("Processing MT292 with normal method");
309
310 serde_json::to_value(&mt292_message).map_err(|e| {
311 error!(error = ?e, "MT292 JSON conversion failed");
312 DataflowError::Validation(format!("MT292 JSON conversion failed: {e}"))
313 })?
314 }
315 "196" => {
316 let Some(mt196_message) = parsed_message.into_mt196() else {
317 error!("Failed to convert SwiftMessage to MT196");
318 return Err(DataflowError::Validation(
319 "MT196 message not found in SwiftMT message".to_string(),
320 ));
321 };
322
323 method = "normal".to_string();
324 debug!("Processing MT196 with normal method");
325
326 serde_json::to_value(&mt196_message).map_err(|e| {
327 error!(error = ?e, "MT196 JSON conversion failed");
328 DataflowError::Validation(format!("MT196 JSON conversion failed: {e}"))
329 })?
330 }
331 "296" => {
332 let Some(mt296_message) = parsed_message.into_mt296() else {
333 error!("Failed to convert SwiftMessage to MT296");
334 return Err(DataflowError::Validation(
335 "MT296 message not found in SwiftMT message".to_string(),
336 ));
337 };
338
339 method = "normal".to_string();
340 debug!("Processing MT296 with normal method");
341
342 serde_json::to_value(&mt296_message).map_err(|e| {
343 error!(error = ?e, "MT296 JSON conversion failed");
344 DataflowError::Validation(format!("MT296 JSON conversion failed: {e}"))
345 })?
346 }
347 "104" => {
348 let Some(mt104_message) = parsed_message.into_mt104() else {
349 error!("Failed to convert SwiftMessage to MT104");
350 return Err(DataflowError::Validation(
351 "MT104 message not found in SwiftMT message".to_string(),
352 ));
353 };
354
355 method = "normal".to_string();
356
357 serde_json::to_value(&mt104_message).map_err(|e| {
358 error!(error = ?e, "MT104 JSON conversion failed");
359 DataflowError::Validation(format!("MT104 JSON conversion failed: {e}"))
360 })?
361 }
362 "920" => {
363 let Some(mt920_message) = parsed_message.into_mt920() else {
364 error!("Failed to convert SwiftMessage to MT920");
365 return Err(DataflowError::Validation(
366 "MT920 message not found in SwiftMT message".to_string(),
367 ));
368 };
369
370 method = "normal".to_string();
371
372 serde_json::to_value(&mt920_message).map_err(|e| {
373 error!(error = ?e, "MT920 JSON conversion failed");
374 DataflowError::Validation(format!("MT920 JSON conversion failed: {e}"))
375 })?
376 }
377 "940" => {
378 let Some(mt940_message) = parsed_message.into_mt940() else {
379 error!("Failed to convert SwiftMessage to MT940");
380 return Err(DataflowError::Validation(
381 "MT940 message not found in SwiftMT message".to_string(),
382 ));
383 };
384
385 method = "normal".to_string();
386
387 serde_json::to_value(&mt940_message).map_err(|e| {
388 error!(error = ?e, "MT940 JSON conversion failed");
389 DataflowError::Validation(format!("MT940 JSON conversion failed: {e}"))
390 })?
391 }
392 "950" => {
393 let Some(mt950_message) = parsed_message.into_mt950() else {
394 error!("Failed to convert SwiftMessage to MT950");
395 return Err(DataflowError::Validation(
396 "MT950 message not found in SwiftMT message".to_string(),
397 ));
398 };
399
400 method = "normal".to_string();
401
402 serde_json::to_value(&mt950_message).map_err(|e| {
403 error!(error = ?e, "MT950 JSON conversion failed");
404 DataflowError::Validation(format!("MT950 JSON conversion failed: {e}"))
405 })?
406 }
407 "107" => {
408 let Some(mt107_message) = parsed_message.into_mt107() else {
409 error!("Failed to convert SwiftMessage to MT107");
410 return Err(DataflowError::Validation(
411 "MT107 message not found in SwiftMT message".to_string(),
412 ));
413 };
414 method = "normal".to_string();
415 serde_json::to_value(&mt107_message).map_err(|e| {
416 error!(error = ?e, "MT107 JSON conversion failed");
417 DataflowError::Validation(format!("MT107 JSON conversion failed: {e}"))
418 })?
419 }
420 "110" => {
421 let Some(mt110_message) = parsed_message.into_mt110() else {
422 error!("Failed to convert SwiftMessage to MT110");
423 return Err(DataflowError::Validation(
424 "MT110 message not found in SwiftMT message".to_string(),
425 ));
426 };
427 method = "normal".to_string();
428 serde_json::to_value(&mt110_message).map_err(|e| {
429 error!(error = ?e, "MT110 JSON conversion failed");
430 DataflowError::Validation(format!("MT110 JSON conversion failed: {e}"))
431 })?
432 }
433 "111" => {
434 let Some(mt111_message) = parsed_message.into_mt111() else {
435 error!("Failed to convert SwiftMessage to MT111");
436 return Err(DataflowError::Validation(
437 "MT111 message not found in SwiftMT message".to_string(),
438 ));
439 };
440 method = "normal".to_string();
441 serde_json::to_value(&mt111_message).map_err(|e| {
442 error!(error = ?e, "MT111 JSON conversion failed");
443 DataflowError::Validation(format!("MT111 JSON conversion failed: {e}"))
444 })?
445 }
446 "112" => {
447 let Some(mt112_message) = parsed_message.into_mt112() else {
448 error!("Failed to convert SwiftMessage to MT112");
449 return Err(DataflowError::Validation(
450 "MT112 message not found in SwiftMT message".to_string(),
451 ));
452 };
453 method = "normal".to_string();
454 serde_json::to_value(&mt112_message).map_err(|e| {
455 error!(error = ?e, "MT112 JSON conversion failed");
456 DataflowError::Validation(format!("MT112 JSON conversion failed: {e}"))
457 })?
458 }
459 "190" => {
460 let Some(mt190_message) = parsed_message.into_mt190() else {
461 error!("Failed to convert SwiftMessage to MT190");
462 return Err(DataflowError::Validation(
463 "MT190 message not found in SwiftMT message".to_string(),
464 ));
465 };
466 method = "normal".to_string();
467 serde_json::to_value(&mt190_message).map_err(|e| {
468 error!(error = ?e, "MT190 JSON conversion failed");
469 DataflowError::Validation(format!("MT190 JSON conversion failed: {e}"))
470 })?
471 }
472 "191" => {
473 let Some(mt191_message) = parsed_message.into_mt191() else {
474 error!("Failed to convert SwiftMessage to MT191");
475 return Err(DataflowError::Validation(
476 "MT191 message not found in SwiftMT message".to_string(),
477 ));
478 };
479 method = "normal".to_string();
480 serde_json::to_value(&mt191_message).map_err(|e| {
481 error!(error = ?e, "MT191 JSON conversion failed");
482 DataflowError::Validation(format!("MT191 JSON conversion failed: {e}"))
483 })?
484 }
485 "199" => {
486 let Some(mt199_message) = parsed_message.into_mt199() else {
487 error!("Failed to convert SwiftMessage to MT199");
488 return Err(DataflowError::Validation(
489 "MT199 message not found in SwiftMT message".to_string(),
490 ));
491 };
492 method = "normal".to_string();
493 serde_json::to_value(&mt199_message).map_err(|e| {
494 error!(error = ?e, "MT199 JSON conversion failed");
495 DataflowError::Validation(format!("MT199 JSON conversion failed: {e}"))
496 })?
497 }
498 "204" => {
499 let Some(mt204_message) = parsed_message.into_mt204() else {
500 error!("Failed to convert SwiftMessage to MT204");
501 return Err(DataflowError::Validation(
502 "MT204 message not found in SwiftMT message".to_string(),
503 ));
504 };
505 method = "normal".to_string();
506 serde_json::to_value(&mt204_message).map_err(|e| {
507 error!(error = ?e, "MT204 JSON conversion failed");
508 DataflowError::Validation(format!("MT204 JSON conversion failed: {e}"))
509 })?
510 }
511 "210" => {
512 let Some(mt210_message) = parsed_message.into_mt210() else {
513 error!("Failed to convert SwiftMessage to MT210");
514 return Err(DataflowError::Validation(
515 "MT210 message not found in SwiftMT message".to_string(),
516 ));
517 };
518 method = "normal".to_string();
519 serde_json::to_value(&mt210_message).map_err(|e| {
520 error!(error = ?e, "MT210 JSON conversion failed");
521 DataflowError::Validation(format!("MT210 JSON conversion failed: {e}"))
522 })?
523 }
524 "290" => {
525 let Some(mt290_message) = parsed_message.into_mt290() else {
526 error!("Failed to convert SwiftMessage to MT290");
527 return Err(DataflowError::Validation(
528 "MT290 message not found in SwiftMT message".to_string(),
529 ));
530 };
531 method = "normal".to_string();
532 serde_json::to_value(&mt290_message).map_err(|e| {
533 error!(error = ?e, "MT290 JSON conversion failed");
534 DataflowError::Validation(format!("MT290 JSON conversion failed: {e}"))
535 })?
536 }
537 "291" => {
538 let Some(mt291_message) = parsed_message.into_mt291() else {
539 error!("Failed to convert SwiftMessage to MT291");
540 return Err(DataflowError::Validation(
541 "MT291 message not found in SwiftMT message".to_string(),
542 ));
543 };
544 method = "normal".to_string();
545 serde_json::to_value(&mt291_message).map_err(|e| {
546 error!(error = ?e, "MT291 JSON conversion failed");
547 DataflowError::Validation(format!("MT291 JSON conversion failed: {e}"))
548 })?
549 }
550 "299" => {
551 let Some(mt299_message) = parsed_message.into_mt299() else {
552 error!("Failed to convert SwiftMessage to MT299");
553 return Err(DataflowError::Validation(
554 "MT299 message not found in SwiftMT message".to_string(),
555 ));
556 };
557 method = "normal".to_string();
558 serde_json::to_value(&mt299_message).map_err(|e| {
559 error!(error = ?e, "MT299 JSON conversion failed");
560 DataflowError::Validation(format!("MT299 JSON conversion failed: {e}"))
561 })?
562 }
563 "935" => {
564 let Some(mt935_message) = parsed_message.into_mt935() else {
565 error!("Failed to convert SwiftMessage to MT935");
566 return Err(DataflowError::Validation(
567 "MT935 message not found in SwiftMT message".to_string(),
568 ));
569 };
570 method = "normal".to_string();
571 serde_json::to_value(&mt935_message).map_err(|e| {
572 error!(error = ?e, "MT935 JSON conversion failed");
573 DataflowError::Validation(format!("MT935 JSON conversion failed: {e}"))
574 })?
575 }
576 "941" => {
577 let Some(mt941_message) = parsed_message.into_mt941() else {
578 error!("Failed to convert SwiftMessage to MT941");
579 return Err(DataflowError::Validation(
580 "MT941 message not found in SwiftMT message".to_string(),
581 ));
582 };
583 method = "normal".to_string();
584 serde_json::to_value(&mt941_message).map_err(|e| {
585 error!(error = ?e, "MT941 JSON conversion failed");
586 DataflowError::Validation(format!("MT941 JSON conversion failed: {e}"))
587 })?
588 }
589 "942" => {
590 let Some(mt942_message) = parsed_message.into_mt942() else {
591 error!("Failed to convert SwiftMessage to MT942");
592 return Err(DataflowError::Validation(
593 "MT942 message not found in SwiftMT message".to_string(),
594 ));
595 };
596 method = "normal".to_string();
597 serde_json::to_value(&mt942_message).map_err(|e| {
598 error!(error = ?e, "MT942 JSON conversion failed");
599 DataflowError::Validation(format!("MT942 JSON conversion failed: {e}"))
600 })?
601 }
602 _ => {
603 error!(message_type = %message_type, "Unsupported message type encountered");
604 return Err(DataflowError::Validation(format!(
605 "Unsupported message type: {message_type}"
606 )));
607 }
608 };
609
610 message
612 .data_mut()
613 .as_object_mut()
614 .unwrap()
615 .insert(target_field.to_string(), parsed_data.clone());
616
617 message.metadata_mut().as_object_mut().unwrap().insert(
618 target_field.to_string(),
619 json!({
620 "message_type": message_type,
621 "method": method,
622 }),
623 );
624
625 debug!(
626 message_type = %message_type,
627 method = %method,
628 target_field = %target_field,
629 "MT message parsing completed successfully for forward transformation"
630 );
631
632 message.invalidate_context_cache();
634
635 Ok((
636 200,
637 vec![Change {
638 path: Arc::from(format!("data.{}", target_field)),
639 old_value: Arc::new(Value::Null),
640 new_value: Arc::new(parsed_data),
641 }],
642 ))
643 }
644
645 fn manual_unescape(input: &str) -> String {
647 let mut result = input.trim();
648
649 if result.starts_with('"') && result.ends_with('"') && result.len() > 1 {
651 result = &result[1..result.len() - 1];
652 }
653
654 result
656 .replace("\\r\\n", "\n")
657 .replace("\\r", "\r")
658 .replace("\\n", "\n")
659 .replace("\\t", "\t")
660 .replace("\\\"", "\"")
661 .replace("\\'", "'")
662 .replace("\\\\", "\\")
663 .replace("\\u0020", " ")
664 .replace("\\u0022", "\"")
665 .replace("\\u003C", "<")
666 .replace("\\u003E", ">")
667 .replace("\\u003D", "=")
668 .replace("\\u002F", "/")
669 }
670}