1use crate::SwiftParser;
2use async_trait::async_trait;
3use dataflow_rs::engine::error::DataflowError;
4use dataflow_rs::engine::{
5 AsyncFunctionHandler, FunctionConfig,
6 error::Result,
7 message::{Change, Message},
8};
9use datalogic_rs::DataLogic;
10use serde_json::{Value, json};
11use std::sync::Arc;
12use tracing::{debug, error, instrument};
13
14pub struct Parse;
15
16#[async_trait]
17impl AsyncFunctionHandler for Parse {
18 #[instrument(skip(self, message, config, _datalogic))]
19 async fn execute(
20 &self,
21 message: &mut Message,
22 config: &FunctionConfig,
23 _datalogic: Arc<DataLogic>,
24 ) -> Result<(usize, Vec<Change>)> {
25 debug!("Starting MT message parsing for forward transformation");
26
27 let input = match config {
29 FunctionConfig::Custom { input, .. } => input,
30 _ => {
31 return Err(DataflowError::Validation(
32 "Invalid configuration type".to_string(),
33 ));
34 }
35 };
36
37 let mt_message_field =
38 input
39 .get("mt_message")
40 .and_then(Value::as_str)
41 .ok_or_else(|| {
42 DataflowError::Validation("'mt_message' parameter is required".to_string())
43 })?;
44
45 let parsed_field = input.get("parsed").and_then(Value::as_str).ok_or_else(|| {
46 DataflowError::Validation("'parsed' parameter is required".to_string())
47 })?;
48
49 let payload = if mt_message_field == "payload" {
50 message.payload.to_string().replace("\\n", "\n")
51 } else {
52 let field_value = message.data().get(mt_message_field).ok_or_else(|| {
54 DataflowError::Validation(format!(
55 "MT message field '{}' not found in message data",
56 mt_message_field
57 ))
58 })?;
59
60 if let Some(mt_msg) = field_value.get("mt_message").and_then(Value::as_str) {
62 mt_msg.to_string()
63 } else if let Some(s) = field_value.as_str() {
64 s.to_string()
66 } else {
67 return Err(DataflowError::Validation(format!(
68 "Field '{}' does not contain a valid MT message",
69 mt_message_field
70 )));
71 }
72 };
73
74 debug!(
75 mt_message_field = %mt_message_field,
76 parsed_field = %parsed_field,
77 payload_length = payload.len(),
78 "Extracted MT payload for parsing"
79 );
80
81 self.parse_swift_mt(message, &payload, parsed_field)
82 }
83}
84
85impl Parse {
86 fn parse_swift_mt(
87 &self,
88 message: &mut Message,
89 payload: &str,
90 parsed_field: &str,
91 ) -> Result<(usize, Vec<Change>)> {
92 debug!("Parsing SwiftMT message for forward transformation");
93
94 let payload = Parse::manual_unescape(payload);
95 debug!("Parsing MT message with payload length: {}", payload.len());
96 let parsed_message = SwiftParser::parse_auto(&payload).map_err(|e| {
97 error!(error = ?e, "SwiftMT parsing failed");
98 DataflowError::Validation(format!("SwiftMT parser error: {e:?}"))
99 })?;
100
101 let message_type = parsed_message.message_type().to_string();
102 debug!(message_type = %message_type, "Successfully parsed SwiftMT message");
103
104 let method: String;
105
106 let parsed_data = match message_type.as_str() {
107 "101" => {
108 let Some(mt101_message) = parsed_message.into_mt101() else {
109 error!("Failed to convert SwiftMessage to MT101");
110 return Err(DataflowError::Validation(
111 "MT101 message not found in SwiftMT message".to_string(),
112 ));
113 };
114
115 method = "normal".to_string();
116
117 serde_json::to_value(&mt101_message).map_err(|e| {
118 error!(error = ?e, "MT101 JSON conversion failed");
119 DataflowError::Validation(format!("MT101 JSON conversion failed: {e}"))
120 })?
121 }
122 "103" => {
123 let Some(mt103_message) = parsed_message.into_mt103() else {
124 error!("Failed to convert SwiftMessage to MT103");
125 return Err(DataflowError::Validation(
126 "MT103 message not found in SwiftMT message".to_string(),
127 ));
128 };
129
130 method = if mt103_message.has_reject_codes() {
131 "reject".to_string()
132 } else if mt103_message.has_return_codes() {
133 "return".to_string()
134 } else if mt103_message.is_stp_message() {
135 "stp".to_string()
136 } else {
137 "normal".to_string()
138 };
139
140 debug!(method = %method, "Determined MT103 processing method");
141
142 serde_json::to_value(&mt103_message).map_err(|e| {
143 error!(error = ?e, "MT103 JSON conversion failed");
144 DataflowError::Validation(format!("MT103 JSON conversion failed: {e}"))
145 })?
146 }
147 "200" => {
148 let Some(mt200_message) = parsed_message.into_mt200() else {
149 error!("Failed to convert SwiftMessage to MT200");
150 return Err(DataflowError::Validation(
151 "MT200 message not found in SwiftMT message".to_string(),
152 ));
153 };
154
155 method = "normal".to_string();
156 debug!("Processing MT200 with normal method");
157
158 serde_json::to_value(&mt200_message).map_err(|e| {
159 error!(error = ?e, "MT200 JSON conversion failed");
160 DataflowError::Validation(format!("MT200 JSON conversion failed: {e}"))
161 })?
162 }
163 "202" => {
164 let Some(mt202_message) = parsed_message.into_mt202() else {
165 error!("Failed to convert SwiftMessage to MT202");
166 return Err(DataflowError::Validation(
167 "MT202 message not found in SwiftMT message".to_string(),
168 ));
169 };
170
171 method = if mt202_message.has_reject_codes()
172 || mt202_message
173 .user_header
174 .as_ref()
175 .and_then(|h| h.validation_flag.as_ref())
176 .map(|flag| flag.as_str() == "REJT")
177 .unwrap_or(false)
178 {
179 "reject".to_string()
180 } else if mt202_message.has_return_codes()
181 || mt202_message
182 .user_header
183 .as_ref()
184 .and_then(|h| h.validation_flag.as_ref())
185 .map(|flag| flag.as_str() == "RETN")
186 .unwrap_or(false)
187 {
188 "return".to_string()
189 } else if mt202_message.is_cover_message()
190 || mt202_message
191 .user_header
192 .as_ref()
193 .and_then(|h| h.validation_flag.as_ref())
194 .map(|flag| flag.as_str() == "COV")
195 .unwrap_or(false)
196 {
197 "cover".to_string()
198 } else {
199 "normal".to_string()
200 };
201
202 debug!(method = %method, "Determined MT202 processing method");
203
204 serde_json::to_value(&mt202_message).map_err(|e| {
205 error!(error = ?e, "MT202 JSON conversion failed");
206 DataflowError::Validation(format!("MT202 JSON conversion failed: {e}"))
207 })?
208 }
209 "205" => {
210 let Some(mt205_message) = parsed_message.into_mt205() else {
211 error!("Failed to convert SwiftMessage to MT205");
212 return Err(DataflowError::Validation(
213 "MT205 message not found in SwiftMT message".to_string(),
214 ));
215 };
216
217 method = if mt205_message.has_reject_codes()
218 || mt205_message
219 .user_header
220 .as_ref()
221 .and_then(|h| h.validation_flag.as_ref())
222 .map(|flag| flag.as_str() == "REJT")
223 .unwrap_or(false)
224 {
225 "reject".to_string()
226 } else if mt205_message.has_return_codes()
227 || mt205_message
228 .user_header
229 .as_ref()
230 .and_then(|h| h.validation_flag.as_ref())
231 .map(|flag| flag.as_str() == "RETN")
232 .unwrap_or(false)
233 {
234 "return".to_string()
235 } else if mt205_message.is_cover_message()
236 || mt205_message
237 .user_header
238 .as_ref()
239 .and_then(|h| h.validation_flag.as_ref())
240 .map(|flag| flag.as_str() == "COV")
241 .unwrap_or(false)
242 {
243 "cover".to_string()
244 } else {
245 "normal".to_string()
246 };
247
248 debug!(method = %method, "Determined MT205 processing method");
249
250 serde_json::to_value(&mt205_message).map_err(|e| {
251 error!(error = ?e, "MT205 JSON conversion failed");
252 DataflowError::Validation(format!("MT205 JSON conversion failed: {e}"))
253 })?
254 }
255 "900" => {
256 let Some(mt900_message) = parsed_message.into_mt900() else {
257 error!("Failed to convert SwiftMessage to MT900");
258 return Err(DataflowError::Validation(
259 "MT900 message not found in SwiftMT message".to_string(),
260 ));
261 };
262
263 method = "normal".to_string();
264 debug!("Processing MT900 with normal method");
265
266 serde_json::to_value(&mt900_message).map_err(|e| {
267 error!(error = ?e, "MT900 JSON conversion failed");
268 DataflowError::Validation(format!("MT900 JSON conversion failed: {e}"))
269 })?
270 }
271 "910" => {
272 let Some(mt910_message) = parsed_message.into_mt910() else {
273 error!("Failed to convert SwiftMessage to MT910");
274 return Err(DataflowError::Validation(
275 "MT910 message not found in SwiftMT message".to_string(),
276 ));
277 };
278
279 method = "normal".to_string();
280 debug!("Processing MT910 with normal method");
281
282 serde_json::to_value(&mt910_message).map_err(|e| {
283 error!(error = ?e, "MT910 JSON conversion failed");
284 DataflowError::Validation(format!("MT910 JSON conversion failed: {e}"))
285 })?
286 }
287 "192" => {
288 let Some(mt192_message) = parsed_message.into_mt192() else {
289 error!("Failed to convert SwiftMessage to MT192");
290 return Err(DataflowError::Validation(
291 "MT192 message not found in SwiftMT message".to_string(),
292 ));
293 };
294
295 method = "normal".to_string();
296 debug!("Processing MT192 with normal method");
297
298 serde_json::to_value(&mt192_message).map_err(|e| {
299 error!(error = ?e, "MT192 JSON conversion failed");
300 DataflowError::Validation(format!("MT192 JSON conversion failed: {e}"))
301 })?
302 }
303 "292" => {
304 let Some(mt292_message) = parsed_message.into_mt292() else {
305 error!("Failed to convert SwiftMessage to MT292");
306 return Err(DataflowError::Validation(
307 "MT292 message not found in SwiftMT message".to_string(),
308 ));
309 };
310
311 method = "normal".to_string();
312 debug!("Processing MT292 with normal method");
313
314 serde_json::to_value(&mt292_message).map_err(|e| {
315 error!(error = ?e, "MT292 JSON conversion failed");
316 DataflowError::Validation(format!("MT292 JSON conversion failed: {e}"))
317 })?
318 }
319 "196" => {
320 let Some(mt196_message) = parsed_message.into_mt196() else {
321 error!("Failed to convert SwiftMessage to MT196");
322 return Err(DataflowError::Validation(
323 "MT196 message not found in SwiftMT message".to_string(),
324 ));
325 };
326
327 method = "normal".to_string();
328 debug!("Processing MT196 with normal method");
329
330 serde_json::to_value(&mt196_message).map_err(|e| {
331 error!(error = ?e, "MT196 JSON conversion failed");
332 DataflowError::Validation(format!("MT196 JSON conversion failed: {e}"))
333 })?
334 }
335 "296" => {
336 let Some(mt296_message) = parsed_message.into_mt296() else {
337 error!("Failed to convert SwiftMessage to MT296");
338 return Err(DataflowError::Validation(
339 "MT296 message not found in SwiftMT message".to_string(),
340 ));
341 };
342
343 method = "normal".to_string();
344 debug!("Processing MT296 with normal method");
345
346 serde_json::to_value(&mt296_message).map_err(|e| {
347 error!(error = ?e, "MT296 JSON conversion failed");
348 DataflowError::Validation(format!("MT296 JSON conversion failed: {e}"))
349 })?
350 }
351 "104" => {
352 let Some(mt104_message) = parsed_message.into_mt104() else {
353 error!("Failed to convert SwiftMessage to MT104");
354 return Err(DataflowError::Validation(
355 "MT104 message not found in SwiftMT message".to_string(),
356 ));
357 };
358
359 method = "normal".to_string();
360
361 serde_json::to_value(&mt104_message).map_err(|e| {
362 error!(error = ?e, "MT104 JSON conversion failed");
363 DataflowError::Validation(format!("MT104 JSON conversion failed: {e}"))
364 })?
365 }
366 "920" => {
367 let Some(mt920_message) = parsed_message.into_mt920() else {
368 error!("Failed to convert SwiftMessage to MT920");
369 return Err(DataflowError::Validation(
370 "MT920 message not found in SwiftMT message".to_string(),
371 ));
372 };
373
374 method = "normal".to_string();
375
376 serde_json::to_value(&mt920_message).map_err(|e| {
377 error!(error = ?e, "MT920 JSON conversion failed");
378 DataflowError::Validation(format!("MT920 JSON conversion failed: {e}"))
379 })?
380 }
381 "940" => {
382 let Some(mt940_message) = parsed_message.into_mt940() else {
383 error!("Failed to convert SwiftMessage to MT940");
384 return Err(DataflowError::Validation(
385 "MT940 message not found in SwiftMT message".to_string(),
386 ));
387 };
388
389 method = "normal".to_string();
390
391 serde_json::to_value(&mt940_message).map_err(|e| {
392 error!(error = ?e, "MT940 JSON conversion failed");
393 DataflowError::Validation(format!("MT940 JSON conversion failed: {e}"))
394 })?
395 }
396 "950" => {
397 let Some(mt950_message) = parsed_message.into_mt950() else {
398 error!("Failed to convert SwiftMessage to MT950");
399 return Err(DataflowError::Validation(
400 "MT950 message not found in SwiftMT message".to_string(),
401 ));
402 };
403
404 method = "normal".to_string();
405
406 serde_json::to_value(&mt950_message).map_err(|e| {
407 error!(error = ?e, "MT950 JSON conversion failed");
408 DataflowError::Validation(format!("MT950 JSON conversion failed: {e}"))
409 })?
410 }
411 "107" => {
412 let Some(mt107_message) = parsed_message.into_mt107() else {
413 error!("Failed to convert SwiftMessage to MT107");
414 return Err(DataflowError::Validation(
415 "MT107 message not found in SwiftMT message".to_string(),
416 ));
417 };
418 method = "normal".to_string();
419 serde_json::to_value(&mt107_message).map_err(|e| {
420 error!(error = ?e, "MT107 JSON conversion failed");
421 DataflowError::Validation(format!("MT107 JSON conversion failed: {e}"))
422 })?
423 }
424 "110" => {
425 let Some(mt110_message) = parsed_message.into_mt110() else {
426 error!("Failed to convert SwiftMessage to MT110");
427 return Err(DataflowError::Validation(
428 "MT110 message not found in SwiftMT message".to_string(),
429 ));
430 };
431 method = "normal".to_string();
432 serde_json::to_value(&mt110_message).map_err(|e| {
433 error!(error = ?e, "MT110 JSON conversion failed");
434 DataflowError::Validation(format!("MT110 JSON conversion failed: {e}"))
435 })?
436 }
437 "111" => {
438 let Some(mt111_message) = parsed_message.into_mt111() else {
439 error!("Failed to convert SwiftMessage to MT111");
440 return Err(DataflowError::Validation(
441 "MT111 message not found in SwiftMT message".to_string(),
442 ));
443 };
444 method = "normal".to_string();
445 serde_json::to_value(&mt111_message).map_err(|e| {
446 error!(error = ?e, "MT111 JSON conversion failed");
447 DataflowError::Validation(format!("MT111 JSON conversion failed: {e}"))
448 })?
449 }
450 "112" => {
451 let Some(mt112_message) = parsed_message.into_mt112() else {
452 error!("Failed to convert SwiftMessage to MT112");
453 return Err(DataflowError::Validation(
454 "MT112 message not found in SwiftMT message".to_string(),
455 ));
456 };
457 method = "normal".to_string();
458 serde_json::to_value(&mt112_message).map_err(|e| {
459 error!(error = ?e, "MT112 JSON conversion failed");
460 DataflowError::Validation(format!("MT112 JSON conversion failed: {e}"))
461 })?
462 }
463 "190" => {
464 let Some(mt190_message) = parsed_message.into_mt190() else {
465 error!("Failed to convert SwiftMessage to MT190");
466 return Err(DataflowError::Validation(
467 "MT190 message not found in SwiftMT message".to_string(),
468 ));
469 };
470 method = "normal".to_string();
471 serde_json::to_value(&mt190_message).map_err(|e| {
472 error!(error = ?e, "MT190 JSON conversion failed");
473 DataflowError::Validation(format!("MT190 JSON conversion failed: {e}"))
474 })?
475 }
476 "191" => {
477 let Some(mt191_message) = parsed_message.into_mt191() else {
478 error!("Failed to convert SwiftMessage to MT191");
479 return Err(DataflowError::Validation(
480 "MT191 message not found in SwiftMT message".to_string(),
481 ));
482 };
483 method = "normal".to_string();
484 serde_json::to_value(&mt191_message).map_err(|e| {
485 error!(error = ?e, "MT191 JSON conversion failed");
486 DataflowError::Validation(format!("MT191 JSON conversion failed: {e}"))
487 })?
488 }
489 "199" => {
490 let Some(mt199_message) = parsed_message.into_mt199() else {
491 error!("Failed to convert SwiftMessage to MT199");
492 return Err(DataflowError::Validation(
493 "MT199 message not found in SwiftMT message".to_string(),
494 ));
495 };
496 method = "normal".to_string();
497 serde_json::to_value(&mt199_message).map_err(|e| {
498 error!(error = ?e, "MT199 JSON conversion failed");
499 DataflowError::Validation(format!("MT199 JSON conversion failed: {e}"))
500 })?
501 }
502 "204" => {
503 let Some(mt204_message) = parsed_message.into_mt204() else {
504 error!("Failed to convert SwiftMessage to MT204");
505 return Err(DataflowError::Validation(
506 "MT204 message not found in SwiftMT message".to_string(),
507 ));
508 };
509 method = "normal".to_string();
510 serde_json::to_value(&mt204_message).map_err(|e| {
511 error!(error = ?e, "MT204 JSON conversion failed");
512 DataflowError::Validation(format!("MT204 JSON conversion failed: {e}"))
513 })?
514 }
515 "210" => {
516 let Some(mt210_message) = parsed_message.into_mt210() else {
517 error!("Failed to convert SwiftMessage to MT210");
518 return Err(DataflowError::Validation(
519 "MT210 message not found in SwiftMT message".to_string(),
520 ));
521 };
522 method = "normal".to_string();
523 serde_json::to_value(&mt210_message).map_err(|e| {
524 error!(error = ?e, "MT210 JSON conversion failed");
525 DataflowError::Validation(format!("MT210 JSON conversion failed: {e}"))
526 })?
527 }
528 "290" => {
529 let Some(mt290_message) = parsed_message.into_mt290() else {
530 error!("Failed to convert SwiftMessage to MT290");
531 return Err(DataflowError::Validation(
532 "MT290 message not found in SwiftMT message".to_string(),
533 ));
534 };
535 method = "normal".to_string();
536 serde_json::to_value(&mt290_message).map_err(|e| {
537 error!(error = ?e, "MT290 JSON conversion failed");
538 DataflowError::Validation(format!("MT290 JSON conversion failed: {e}"))
539 })?
540 }
541 "291" => {
542 let Some(mt291_message) = parsed_message.into_mt291() else {
543 error!("Failed to convert SwiftMessage to MT291");
544 return Err(DataflowError::Validation(
545 "MT291 message not found in SwiftMT message".to_string(),
546 ));
547 };
548 method = "normal".to_string();
549 serde_json::to_value(&mt291_message).map_err(|e| {
550 error!(error = ?e, "MT291 JSON conversion failed");
551 DataflowError::Validation(format!("MT291 JSON conversion failed: {e}"))
552 })?
553 }
554 "299" => {
555 let Some(mt299_message) = parsed_message.into_mt299() else {
556 error!("Failed to convert SwiftMessage to MT299");
557 return Err(DataflowError::Validation(
558 "MT299 message not found in SwiftMT message".to_string(),
559 ));
560 };
561 method = "normal".to_string();
562 serde_json::to_value(&mt299_message).map_err(|e| {
563 error!(error = ?e, "MT299 JSON conversion failed");
564 DataflowError::Validation(format!("MT299 JSON conversion failed: {e}"))
565 })?
566 }
567 "935" => {
568 let Some(mt935_message) = parsed_message.into_mt935() else {
569 error!("Failed to convert SwiftMessage to MT935");
570 return Err(DataflowError::Validation(
571 "MT935 message not found in SwiftMT message".to_string(),
572 ));
573 };
574 method = "normal".to_string();
575 serde_json::to_value(&mt935_message).map_err(|e| {
576 error!(error = ?e, "MT935 JSON conversion failed");
577 DataflowError::Validation(format!("MT935 JSON conversion failed: {e}"))
578 })?
579 }
580 "941" => {
581 let Some(mt941_message) = parsed_message.into_mt941() else {
582 error!("Failed to convert SwiftMessage to MT941");
583 return Err(DataflowError::Validation(
584 "MT941 message not found in SwiftMT message".to_string(),
585 ));
586 };
587 method = "normal".to_string();
588 serde_json::to_value(&mt941_message).map_err(|e| {
589 error!(error = ?e, "MT941 JSON conversion failed");
590 DataflowError::Validation(format!("MT941 JSON conversion failed: {e}"))
591 })?
592 }
593 "942" => {
594 let Some(mt942_message) = parsed_message.into_mt942() else {
595 error!("Failed to convert SwiftMessage to MT942");
596 return Err(DataflowError::Validation(
597 "MT942 message not found in SwiftMT message".to_string(),
598 ));
599 };
600 method = "normal".to_string();
601 serde_json::to_value(&mt942_message).map_err(|e| {
602 error!(error = ?e, "MT942 JSON conversion failed");
603 DataflowError::Validation(format!("MT942 JSON conversion failed: {e}"))
604 })?
605 }
606 _ => {
607 error!(message_type = %message_type, "Unsupported message type encountered");
608 return Err(DataflowError::Validation(format!(
609 "Unsupported message type: {message_type}"
610 )));
611 }
612 };
613
614 message
616 .data_mut()
617 .as_object_mut()
618 .unwrap()
619 .insert(parsed_field.to_string(), parsed_data.clone());
620
621 message.metadata_mut().as_object_mut().unwrap().insert(
622 parsed_field.to_string(),
623 json!({
624 "message_type": message_type,
625 "method": method,
626 }),
627 );
628
629 debug!(
630 message_type = %message_type,
631 method = %method,
632 parsed_field = %parsed_field,
633 "MT message parsing completed successfully for forward transformation"
634 );
635
636 message.invalidate_context_cache();
638
639 Ok((
640 200,
641 vec![Change {
642 path: Arc::from(format!("data.{}", parsed_field)),
643 old_value: Arc::new(Value::Null),
644 new_value: Arc::new(parsed_data),
645 }],
646 ))
647 }
648
649 fn manual_unescape(input: &str) -> String {
651 let mut result = input.trim();
652
653 if result.starts_with('"') && result.ends_with('"') && result.len() > 1 {
655 result = &result[1..result.len() - 1];
656 }
657
658 result
660 .replace("\\r\\n", "\n")
661 .replace("\\r", "\r")
662 .replace("\\n", "\n")
663 .replace("\\t", "\t")
664 .replace("\\\"", "\"")
665 .replace("\\'", "'")
666 .replace("\\\\", "\\")
667 .replace("\\u0020", " ")
668 .replace("\\u0022", "\"")
669 .replace("\\u003C", "<")
670 .replace("\\u003E", ">")
671 .replace("\\u003D", "=")
672 .replace("\\u002F", "/")
673 }
674}