dataflow_rs/engine/functions/
map.rs1use crate::engine::error::{DataflowError, Result};
36use crate::engine::message::{Change, Message};
37use crate::engine::utils::{get_nested_value, set_nested_value};
38use datalogic_rs::{CompiledLogic, DataLogic};
39use log::{debug, error};
40use serde::Deserialize;
41use serde_json::Value;
42use std::sync::Arc;
43
44#[derive(Debug, Clone, Deserialize)]
50pub struct MapConfig {
51 pub mappings: Vec<MapMapping>,
53}
54
55#[derive(Debug, Clone, Deserialize)]
60pub struct MapMapping {
61 pub path: String,
64
65 pub logic: Value,
68
69 #[serde(skip)]
71 pub logic_index: Option<usize>,
72}
73
74impl MapConfig {
75 pub fn from_json(input: &Value) -> Result<Self> {
86 let mappings = input.get("mappings").ok_or_else(|| {
87 DataflowError::Validation("Missing 'mappings' array in input".to_string())
88 })?;
89
90 let mappings_arr = mappings
91 .as_array()
92 .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
93
94 let mut parsed_mappings = Vec::new();
95
96 for mapping in mappings_arr {
97 let path = mapping
98 .get("path")
99 .and_then(Value::as_str)
100 .ok_or_else(|| DataflowError::Validation("Missing 'path' in mapping".to_string()))?
101 .to_string();
102
103 let logic = mapping
104 .get("logic")
105 .ok_or_else(|| DataflowError::Validation("Missing 'logic' in mapping".to_string()))?
106 .clone();
107
108 parsed_mappings.push(MapMapping {
109 path,
110 logic,
111 logic_index: None,
112 });
113 }
114
115 Ok(MapConfig {
116 mappings: parsed_mappings,
117 })
118 }
119
120 pub fn execute(
140 &self,
141 message: &mut Message,
142 datalogic: &Arc<DataLogic>,
143 logic_cache: &[Arc<CompiledLogic>],
144 ) -> Result<(usize, Vec<Change>)> {
145 let mut changes = Vec::new();
146 let mut errors_encountered = false;
147
148 debug!("Map: Executing {} mappings", self.mappings.len());
149
150 for mapping in &self.mappings {
152 let context_arc = message.get_context_arc();
155 debug!("Processing mapping to path: {}", mapping.path);
156
157 let compiled_logic = match mapping.logic_index {
159 Some(index) => {
160 if index >= logic_cache.len() {
162 error!(
163 "Map: Logic index {} out of bounds (cache size: {}) for mapping to {}",
164 index,
165 logic_cache.len(),
166 mapping.path
167 );
168 errors_encountered = true;
169 continue;
170 }
171 &logic_cache[index]
172 }
173 None => {
174 error!(
175 "Map: Logic not compiled (no index) for mapping to {}",
176 mapping.path
177 );
178 errors_encountered = true;
179 continue;
180 }
181 };
182
183 let result = datalogic.evaluate(compiled_logic, Arc::clone(&context_arc));
186
187 match result {
188 Ok(transformed_value) => {
189 debug!(
190 "Map: Evaluated logic for path {} resulted in: {:?}",
191 mapping.path, transformed_value
192 );
193
194 if transformed_value.is_null() {
196 debug!(
197 "Map: Skipping mapping for path {} as result is null",
198 mapping.path
199 );
200 continue;
201 }
202
203 let old_value = get_nested_value(&message.context, &mapping.path);
205
206 let old_value_arc = Arc::new(old_value.cloned().unwrap_or(Value::Null));
207 let new_value_arc = Arc::new(transformed_value.clone());
208
209 debug!(
210 "Recording change for path '{}': old={:?}, new={:?}",
211 mapping.path, old_value_arc, new_value_arc
212 );
213 changes.push(Change {
214 path: Arc::from(mapping.path.as_str()),
215 old_value: old_value_arc,
216 new_value: Arc::clone(&new_value_arc),
217 });
218
219 if mapping.path == "data"
222 || mapping.path == "metadata"
223 || mapping.path == "temp_data"
224 {
225 if let Value::Object(new_map) = transformed_value {
227 if let Value::Object(existing_map) = &mut message.context[&mapping.path]
229 {
230 for (key, value) in new_map {
232 existing_map.insert(key, value);
233 }
234 } else {
235 message.context[&mapping.path] = Value::Object(new_map);
237 }
238 } else {
239 message.context[&mapping.path] = transformed_value;
241 }
242 } else {
243 set_nested_value(&mut message.context, &mapping.path, transformed_value);
245 }
246 message.invalidate_context_cache();
249 debug!("Successfully mapped to path: {}", mapping.path);
250 }
251 Err(e) => {
252 error!(
253 "Map: Error evaluating logic for path {}: {:?}",
254 mapping.path, e
255 );
256 errors_encountered = true;
257 }
258 }
259 }
260
261 let status = if errors_encountered { 500 } else { 200 };
263 Ok((status, changes))
264 }
265
266 pub fn execute_with_trace(
274 &self,
275 message: &mut Message,
276 datalogic: &Arc<DataLogic>,
277 logic_cache: &[Arc<CompiledLogic>],
278 ) -> Result<(usize, Vec<Change>, Vec<Value>)> {
279 let mut changes = Vec::new();
280 let mut errors_encountered = false;
281 let mut context_snapshots = Vec::with_capacity(self.mappings.len());
282
283 debug!("Map (trace): Executing {} mappings", self.mappings.len());
284
285 for mapping in &self.mappings {
286 context_snapshots.push(message.context.clone());
288
289 let context_arc = message.get_context_arc();
290 debug!("Processing mapping to path: {}", mapping.path);
291
292 let compiled_logic = match mapping.logic_index {
293 Some(index) => {
294 if index >= logic_cache.len() {
295 error!(
296 "Map: Logic index {} out of bounds (cache size: {}) for mapping to {}",
297 index,
298 logic_cache.len(),
299 mapping.path
300 );
301 errors_encountered = true;
302 continue;
303 }
304 &logic_cache[index]
305 }
306 None => {
307 error!(
308 "Map: Logic not compiled (no index) for mapping to {}",
309 mapping.path
310 );
311 errors_encountered = true;
312 continue;
313 }
314 };
315
316 let result = datalogic.evaluate(compiled_logic, Arc::clone(&context_arc));
317
318 match result {
319 Ok(transformed_value) => {
320 debug!(
321 "Map: Evaluated logic for path {} resulted in: {:?}",
322 mapping.path, transformed_value
323 );
324
325 if transformed_value.is_null() {
326 debug!(
327 "Map: Skipping mapping for path {} as result is null",
328 mapping.path
329 );
330 continue;
331 }
332
333 let old_value = get_nested_value(&message.context, &mapping.path);
334
335 let old_value_arc = Arc::new(old_value.cloned().unwrap_or(Value::Null));
336 let new_value_arc = Arc::new(transformed_value.clone());
337
338 changes.push(Change {
339 path: Arc::from(mapping.path.as_str()),
340 old_value: old_value_arc,
341 new_value: Arc::clone(&new_value_arc),
342 });
343
344 if mapping.path == "data"
345 || mapping.path == "metadata"
346 || mapping.path == "temp_data"
347 {
348 if let Value::Object(new_map) = transformed_value {
349 if let Value::Object(existing_map) = &mut message.context[&mapping.path]
350 {
351 for (key, value) in new_map {
352 existing_map.insert(key, value);
353 }
354 } else {
355 message.context[&mapping.path] = Value::Object(new_map);
356 }
357 } else {
358 message.context[&mapping.path] = transformed_value;
359 }
360 } else {
361 set_nested_value(&mut message.context, &mapping.path, transformed_value);
362 }
363 message.invalidate_context_cache();
364 }
365 Err(e) => {
366 error!(
367 "Map: Error evaluating logic for path {}: {:?}",
368 mapping.path, e
369 );
370 errors_encountered = true;
371 }
372 }
373 }
374
375 let status = if errors_encountered { 500 } else { 200 };
376 Ok((status, changes, context_snapshots))
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383 use crate::engine::message::Message;
384 use serde_json::json;
385
386 #[test]
387 fn test_map_config_from_json() {
388 let input = json!({
389 "mappings": [
390 {
391 "path": "data.field1",
392 "logic": {"var": "data.source"}
393 },
394 {
395 "path": "data.field2",
396 "logic": "static_value"
397 }
398 ]
399 });
400
401 let config = MapConfig::from_json(&input).unwrap();
402 assert_eq!(config.mappings.len(), 2);
403 assert_eq!(config.mappings[0].path, "data.field1");
404 assert_eq!(config.mappings[1].path, "data.field2");
405 }
406
407 #[test]
408 fn test_map_config_missing_mappings() {
409 let input = json!({});
410 let result = MapConfig::from_json(&input);
411 assert!(result.is_err());
412 }
413
414 #[test]
415 fn test_map_config_invalid_mappings() {
416 let input = json!({
417 "mappings": "not_an_array"
418 });
419 let result = MapConfig::from_json(&input);
420 assert!(result.is_err());
421 }
422
423 #[test]
424 fn test_map_config_missing_path() {
425 let input = json!({
426 "mappings": [
427 {
428 "logic": {"var": "data.source"}
429 }
430 ]
431 });
432 let result = MapConfig::from_json(&input);
433 assert!(result.is_err());
434 }
435
436 #[test]
437 fn test_map_config_missing_logic() {
438 let input = json!({
439 "mappings": [
440 {
441 "path": "data.field1"
442 }
443 ]
444 });
445 let result = MapConfig::from_json(&input);
446 assert!(result.is_err());
447 }
448
449 #[test]
450 fn test_map_metadata_assignment() {
451 let datalogic = Arc::new(DataLogic::with_preserve_structure());
453
454 let mut message = Message::new(Arc::new(json!({})));
456 message.context["data"] = json!({
457 "SwiftMT": {
458 "message_type": "103"
459 }
460 });
461
462 let config = MapConfig {
464 mappings: vec![MapMapping {
465 path: "metadata.SwiftMT.message_type".to_string(),
466 logic: json!({"var": "data.SwiftMT.message_type"}),
467 logic_index: Some(0),
468 }],
469 };
470
471 let logic_cache = vec![datalogic.compile(&config.mappings[0].logic).unwrap()];
473
474 let result = config.execute(&mut message, &datalogic, &logic_cache);
476 assert!(result.is_ok());
477
478 let (status, changes) = result.unwrap();
479 assert_eq!(status, 200);
480 assert_eq!(changes.len(), 1);
481
482 assert_eq!(
484 message.context["metadata"]
485 .get("SwiftMT")
486 .and_then(|v| v.get("message_type")),
487 Some(&json!("103"))
488 );
489 }
490
491 #[test]
492 fn test_map_null_values_skip_assignment() {
493 let datalogic = Arc::new(DataLogic::with_preserve_structure());
495
496 let mut message = Message::new(Arc::new(json!({})));
498 message.context["data"] = json!({
499 "existing_field": "should_remain"
500 });
501 message.context["metadata"] = json!({
502 "existing_meta": "should_remain"
503 });
504
505 let config = MapConfig {
507 mappings: vec![
508 MapMapping {
509 path: "data.new_field".to_string(),
510 logic: json!({"var": "data.non_existent_field"}), logic_index: Some(0),
512 },
513 MapMapping {
514 path: "metadata.new_meta".to_string(),
515 logic: json!({"var": "data.another_non_existent"}), logic_index: Some(1),
517 },
518 MapMapping {
519 path: "data.actual_field".to_string(),
520 logic: json!("actual_value"), logic_index: Some(2),
522 },
523 ],
524 };
525
526 let logic_cache = vec![
528 datalogic.compile(&config.mappings[0].logic).unwrap(),
529 datalogic.compile(&config.mappings[1].logic).unwrap(),
530 datalogic.compile(&config.mappings[2].logic).unwrap(),
531 ];
532
533 let result = config.execute(&mut message, &datalogic, &logic_cache);
535 assert!(result.is_ok());
536
537 let (status, changes) = result.unwrap();
538 assert_eq!(status, 200);
539 assert_eq!(changes.len(), 1);
541 assert_eq!(changes[0].path.as_ref(), "data.actual_field");
542
543 assert_eq!(message.context["data"].get("new_field"), None);
545 assert_eq!(message.context["metadata"].get("new_meta"), None);
546
547 assert_eq!(
549 message.context["data"].get("existing_field"),
550 Some(&json!("should_remain"))
551 );
552 assert_eq!(
553 message.context["metadata"].get("existing_meta"),
554 Some(&json!("should_remain"))
555 );
556
557 assert_eq!(
559 message.context["data"].get("actual_field"),
560 Some(&json!("actual_value"))
561 );
562 }
563
564 #[test]
565 fn test_map_execute_with_trace_captures_context_snapshots() {
566 let datalogic = Arc::new(DataLogic::with_preserve_structure());
567
568 let mut message = Message::new(Arc::new(json!({})));
569 message.context["data"] = json!({
570 "first": "Alice",
571 "last": "Smith"
572 });
573
574 let mut config = MapConfig {
575 mappings: vec![
576 MapMapping {
577 path: "data.full_name".to_string(),
578 logic: json!({"cat": [{"var": "data.first"}, " ", {"var": "data.last"}]}),
579 logic_index: None,
580 },
581 MapMapping {
582 path: "data.greeting".to_string(),
583 logic: json!({"cat": ["Hello, ", {"var": "data.full_name"}]}),
584 logic_index: None,
585 },
586 ],
587 };
588
589 let mut logic_cache = Vec::new();
590 for (i, mapping) in config.mappings.iter_mut().enumerate() {
591 logic_cache.push(datalogic.compile(&mapping.logic).unwrap());
592 mapping.logic_index = Some(i);
593 }
594
595 let result = config.execute_with_trace(&mut message, &datalogic, &logic_cache);
596 assert!(result.is_ok());
597
598 let (status, changes, context_snapshots) = result.unwrap();
599 assert_eq!(status, 200);
600 assert_eq!(changes.len(), 2);
601 assert_eq!(context_snapshots.len(), 2);
602
603 assert!(context_snapshots[0]["data"].get("full_name").is_none());
605
606 assert_eq!(
608 context_snapshots[1]["data"].get("full_name"),
609 Some(&json!("Alice Smith"))
610 );
611 }
612
613 #[test]
614 fn test_map_multiple_fields_including_metadata() {
615 let datalogic = Arc::new(DataLogic::with_preserve_structure());
617
618 let mut message = Message::new(Arc::new(json!({})));
620 message.context["data"] = json!({
621 "ISO20022_MX": {
622 "document": {
623 "TxInf": {
624 "OrgnlGrpInf": {
625 "OrgnlMsgNmId": "pacs.008.001.08"
626 }
627 }
628 }
629 },
630 "SwiftMT": {
631 "message_type": "103"
632 }
633 });
634
635 let mut config = MapConfig {
637 mappings: vec![
638 MapMapping {
639 path: "data.SwiftMT.message_type".to_string(),
640 logic: json!("103"),
641 logic_index: None,
642 },
643 MapMapping {
644 path: "metadata.SwiftMT.message_type".to_string(),
645 logic: json!({"var": "data.SwiftMT.message_type"}),
646 logic_index: None,
647 },
648 MapMapping {
649 path: "temp_data.original_msg_type".to_string(),
650 logic: json!({"var": "data.ISO20022_MX.document.TxInf.OrgnlGrpInf.OrgnlMsgNmId"}),
651 logic_index: None,
652 },
653 ],
654 };
655
656 let mut logic_cache = Vec::new();
658 for (i, mapping) in config.mappings.iter_mut().enumerate() {
659 logic_cache.push(datalogic.compile(&mapping.logic).unwrap());
660 mapping.logic_index = Some(i);
661 }
662
663 let result = config.execute(&mut message, &datalogic, &logic_cache);
665 assert!(result.is_ok());
666
667 let (status, changes) = result.unwrap();
668 assert_eq!(status, 200);
669 assert_eq!(changes.len(), 3);
670
671 assert_eq!(
673 message.context["data"]
674 .get("SwiftMT")
675 .and_then(|v| v.get("message_type")),
676 Some(&json!("103"))
677 );
678 assert_eq!(
679 message.context["metadata"]
680 .get("SwiftMT")
681 .and_then(|v| v.get("message_type")),
682 Some(&json!("103"))
683 );
684 assert_eq!(
685 message.context["temp_data"].get("original_msg_type"),
686 Some(&json!("pacs.008.001.08"))
687 );
688 }
689}