Skip to main content

dataflow_rs/engine/functions/
map.rs

1//! # Map Function Module
2//!
3//! This module provides data transformation capabilities using JSONLogic expressions.
4//! The map function allows copying, transforming, and reorganizing data within messages
5//! by evaluating JSONLogic expressions and assigning results to specified paths.
6//!
7//! ## Features
8//!
9//! - Transform data using JSONLogic expressions
10//! - Support for nested path access and creation
11//! - Automatic merging for root fields (data, metadata, temp_data)
12//! - Null value handling (null results skip assignment)
13//! - Change tracking for audit trails
14//!
15//! ## Example Usage
16//!
17//! ```json
18//! {
19//!     "name": "map",
20//!     "input": {
21//!         "mappings": [
22//!             {
23//!                 "path": "data.full_name",
24//!                 "logic": {"cat": [{"var": "data.first_name"}, " ", {"var": "data.last_name"}]}
25//!             },
26//!             {
27//!                 "path": "metadata.processed",
28//!                 "logic": true
29//!             }
30//!         ]
31//!     }
32//! }
33//! ```
34
35use crate::engine::error::{DataflowError, Result};
36use crate::engine::message::{Change, Message};
37use crate::engine::utils::{get_nested_value, set_nested_value};
38use datalogic_rs::{CompiledLogic, DataLogic};
39use log::{debug, error};
40use serde::Deserialize;
41use serde_json::Value;
42use std::sync::Arc;
43
44/// Configuration for the map function containing a list of mappings.
45///
46/// Each mapping specifies a target path and a JSONLogic expression to evaluate.
47/// Mappings are processed sequentially, allowing later mappings to use results
48/// from earlier ones.
49#[derive(Debug, Clone, Deserialize)]
50pub struct MapConfig {
51    /// List of mappings to execute in order.
52    pub mappings: Vec<MapMapping>,
53}
54
55/// A single mapping that transforms and assigns data.
56///
57/// The mapping evaluates a JSONLogic expression against the message context
58/// and assigns the result to the specified path.
59#[derive(Debug, Clone, Deserialize)]
60pub struct MapMapping {
61    /// Target path where the result will be stored (e.g., "data.user.name").
62    /// Supports dot notation for nested paths and `#` prefix for numeric field names.
63    pub path: String,
64
65    /// JSONLogic expression to evaluate. Can reference any field in the context
66    /// using `{"var": "path.to.field"}` syntax.
67    pub logic: Value,
68
69    /// Index into the compiled logic cache. Set during workflow compilation.
70    #[serde(skip)]
71    pub logic_index: Option<usize>,
72}
73
74impl MapConfig {
75    /// Parses a `MapConfig` from a JSON value.
76    ///
77    /// # Arguments
78    /// * `input` - JSON object containing a "mappings" array
79    ///
80    /// # Errors
81    /// Returns `DataflowError::Validation` if:
82    /// - The "mappings" field is missing
83    /// - The "mappings" field is not an array
84    /// - Any mapping is missing "path" or "logic" fields
85    pub fn from_json(input: &Value) -> Result<Self> {
86        let mappings = input.get("mappings").ok_or_else(|| {
87            DataflowError::Validation("Missing 'mappings' array in input".to_string())
88        })?;
89
90        let mappings_arr = mappings
91            .as_array()
92            .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
93
94        let mut parsed_mappings = Vec::new();
95
96        for mapping in mappings_arr {
97            let path = mapping
98                .get("path")
99                .and_then(Value::as_str)
100                .ok_or_else(|| DataflowError::Validation("Missing 'path' in mapping".to_string()))?
101                .to_string();
102
103            let logic = mapping
104                .get("logic")
105                .ok_or_else(|| DataflowError::Validation("Missing 'logic' in mapping".to_string()))?
106                .clone();
107
108            parsed_mappings.push(MapMapping {
109                path,
110                logic,
111                logic_index: None,
112            });
113        }
114
115        Ok(MapConfig {
116            mappings: parsed_mappings,
117        })
118    }
119
120    /// Executes all map transformations using pre-compiled logic.
121    ///
122    /// Processes each mapping sequentially, evaluating the JSONLogic expression
123    /// and assigning the result to the target path. Changes are tracked for
124    /// audit trail purposes.
125    ///
126    /// # Arguments
127    /// * `message` - The message to transform (modified in place)
128    /// * `datalogic` - DataLogic instance for evaluation
129    /// * `logic_cache` - Pre-compiled logic expressions
130    ///
131    /// # Returns
132    /// * `Ok((status, changes))` - Status code (200 success, 500 if errors) and list of changes
133    /// * `Err` - If a critical error occurs during execution
134    ///
135    /// # Behavior
136    /// - Null evaluation results are skipped (no assignment made)
137    /// - Root field assignments (data, metadata, temp_data) merge objects instead of replacing
138    /// - Each successful assignment invalidates the context cache for subsequent mappings
139    pub fn execute(
140        &self,
141        message: &mut Message,
142        datalogic: &Arc<DataLogic>,
143        logic_cache: &[Arc<CompiledLogic>],
144    ) -> Result<(usize, Vec<Change>)> {
145        let mut changes = Vec::new();
146        let mut errors_encountered = false;
147
148        debug!("Map: Executing {} mappings", self.mappings.len());
149
150        // Process each mapping
151        for mapping in &self.mappings {
152            // Get or create context Arc for this iteration
153            // This ensures we use cached Arc when available, or create fresh one after modifications
154            let context_arc = message.get_context_arc();
155            debug!("Processing mapping to path: {}", mapping.path);
156
157            // Get the compiled logic from cache with proper bounds checking
158            let compiled_logic = match mapping.logic_index {
159                Some(index) => {
160                    // Ensure index is valid before accessing
161                    if index >= logic_cache.len() {
162                        error!(
163                            "Map: Logic index {} out of bounds (cache size: {}) for mapping to {}",
164                            index,
165                            logic_cache.len(),
166                            mapping.path
167                        );
168                        errors_encountered = true;
169                        continue;
170                    }
171                    &logic_cache[index]
172                }
173                None => {
174                    error!(
175                        "Map: Logic not compiled (no index) for mapping to {}",
176                        mapping.path
177                    );
178                    errors_encountered = true;
179                    continue;
180                }
181            };
182
183            // Evaluate the transformation logic using DataLogic v4
184            // DataLogic v4 is thread-safe with Arc<CompiledLogic>, no spawn_blocking needed
185            let result = datalogic.evaluate(compiled_logic, Arc::clone(&context_arc));
186
187            match result {
188                Ok(transformed_value) => {
189                    debug!(
190                        "Map: Evaluated logic for path {} resulted in: {:?}",
191                        mapping.path, transformed_value
192                    );
193
194                    // Skip mapping if the result is null
195                    if transformed_value.is_null() {
196                        debug!(
197                            "Map: Skipping mapping for path {} as result is null",
198                            mapping.path
199                        );
200                        continue;
201                    }
202
203                    // Get old value from the appropriate location in context
204                    let old_value = get_nested_value(&message.context, &mapping.path);
205
206                    let old_value_arc = Arc::new(old_value.cloned().unwrap_or(Value::Null));
207                    let new_value_arc = Arc::new(transformed_value.clone());
208
209                    debug!(
210                        "Recording change for path '{}': old={:?}, new={:?}",
211                        mapping.path, old_value_arc, new_value_arc
212                    );
213                    changes.push(Change {
214                        path: Arc::from(mapping.path.as_str()),
215                        old_value: old_value_arc,
216                        new_value: Arc::clone(&new_value_arc),
217                    });
218
219                    // Update the context directly with the transformed value
220                    // Check if we're replacing a root field (data, metadata, or temp_data)
221                    if mapping.path == "data"
222                        || mapping.path == "metadata"
223                        || mapping.path == "temp_data"
224                    {
225                        // Merge with existing field instead of replacing entirely
226                        if let Value::Object(new_map) = transformed_value {
227                            // If new value is an object, merge its fields
228                            if let Value::Object(existing_map) = &mut message.context[&mapping.path]
229                            {
230                                // Merge new fields into existing object
231                                for (key, value) in new_map {
232                                    existing_map.insert(key, value);
233                                }
234                            } else {
235                                // If existing is not an object, replace with new object
236                                message.context[&mapping.path] = Value::Object(new_map);
237                            }
238                        } else {
239                            // If new value is not an object, replace entirely
240                            message.context[&mapping.path] = transformed_value;
241                        }
242                    } else {
243                        // Set nested value in context
244                        set_nested_value(&mut message.context, &mapping.path, transformed_value);
245                    }
246                    // Invalidate the cached context Arc since we modified the context
247                    // The next iteration (if any) will create a fresh Arc when needed
248                    message.invalidate_context_cache();
249                    debug!("Successfully mapped to path: {}", mapping.path);
250                }
251                Err(e) => {
252                    error!(
253                        "Map: Error evaluating logic for path {}: {:?}",
254                        mapping.path, e
255                    );
256                    errors_encountered = true;
257                }
258            }
259        }
260
261        // Return appropriate status based on results
262        let status = if errors_encountered { 500 } else { 200 };
263        Ok((status, changes))
264    }
265
266    /// Executes all map transformations with trace support.
267    ///
268    /// Same as `execute()` but captures a context snapshot before each mapping
269    /// for sub-step debugging. Returns the snapshots alongside the normal results.
270    ///
271    /// # Returns
272    /// * `Ok((status, changes, context_snapshots))` - Status, changes, and per-mapping context snapshots
273    pub fn execute_with_trace(
274        &self,
275        message: &mut Message,
276        datalogic: &Arc<DataLogic>,
277        logic_cache: &[Arc<CompiledLogic>],
278    ) -> Result<(usize, Vec<Change>, Vec<Value>)> {
279        let mut changes = Vec::new();
280        let mut errors_encountered = false;
281        let mut context_snapshots = Vec::with_capacity(self.mappings.len());
282
283        debug!("Map (trace): Executing {} mappings", self.mappings.len());
284
285        for mapping in &self.mappings {
286            // Capture context snapshot before this mapping executes
287            context_snapshots.push(message.context.clone());
288
289            let context_arc = message.get_context_arc();
290            debug!("Processing mapping to path: {}", mapping.path);
291
292            let compiled_logic = match mapping.logic_index {
293                Some(index) => {
294                    if index >= logic_cache.len() {
295                        error!(
296                            "Map: Logic index {} out of bounds (cache size: {}) for mapping to {}",
297                            index,
298                            logic_cache.len(),
299                            mapping.path
300                        );
301                        errors_encountered = true;
302                        continue;
303                    }
304                    &logic_cache[index]
305                }
306                None => {
307                    error!(
308                        "Map: Logic not compiled (no index) for mapping to {}",
309                        mapping.path
310                    );
311                    errors_encountered = true;
312                    continue;
313                }
314            };
315
316            let result = datalogic.evaluate(compiled_logic, Arc::clone(&context_arc));
317
318            match result {
319                Ok(transformed_value) => {
320                    debug!(
321                        "Map: Evaluated logic for path {} resulted in: {:?}",
322                        mapping.path, transformed_value
323                    );
324
325                    if transformed_value.is_null() {
326                        debug!(
327                            "Map: Skipping mapping for path {} as result is null",
328                            mapping.path
329                        );
330                        continue;
331                    }
332
333                    let old_value = get_nested_value(&message.context, &mapping.path);
334
335                    let old_value_arc = Arc::new(old_value.cloned().unwrap_or(Value::Null));
336                    let new_value_arc = Arc::new(transformed_value.clone());
337
338                    changes.push(Change {
339                        path: Arc::from(mapping.path.as_str()),
340                        old_value: old_value_arc,
341                        new_value: Arc::clone(&new_value_arc),
342                    });
343
344                    if mapping.path == "data"
345                        || mapping.path == "metadata"
346                        || mapping.path == "temp_data"
347                    {
348                        if let Value::Object(new_map) = transformed_value {
349                            if let Value::Object(existing_map) = &mut message.context[&mapping.path]
350                            {
351                                for (key, value) in new_map {
352                                    existing_map.insert(key, value);
353                                }
354                            } else {
355                                message.context[&mapping.path] = Value::Object(new_map);
356                            }
357                        } else {
358                            message.context[&mapping.path] = transformed_value;
359                        }
360                    } else {
361                        set_nested_value(&mut message.context, &mapping.path, transformed_value);
362                    }
363                    message.invalidate_context_cache();
364                }
365                Err(e) => {
366                    error!(
367                        "Map: Error evaluating logic for path {}: {:?}",
368                        mapping.path, e
369                    );
370                    errors_encountered = true;
371                }
372            }
373        }
374
375        let status = if errors_encountered { 500 } else { 200 };
376        Ok((status, changes, context_snapshots))
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383    use crate::engine::message::Message;
384    use serde_json::json;
385
386    #[test]
387    fn test_map_config_from_json() {
388        let input = json!({
389            "mappings": [
390                {
391                    "path": "data.field1",
392                    "logic": {"var": "data.source"}
393                },
394                {
395                    "path": "data.field2",
396                    "logic": "static_value"
397                }
398            ]
399        });
400
401        let config = MapConfig::from_json(&input).unwrap();
402        assert_eq!(config.mappings.len(), 2);
403        assert_eq!(config.mappings[0].path, "data.field1");
404        assert_eq!(config.mappings[1].path, "data.field2");
405    }
406
407    #[test]
408    fn test_map_config_missing_mappings() {
409        let input = json!({});
410        let result = MapConfig::from_json(&input);
411        assert!(result.is_err());
412    }
413
414    #[test]
415    fn test_map_config_invalid_mappings() {
416        let input = json!({
417            "mappings": "not_an_array"
418        });
419        let result = MapConfig::from_json(&input);
420        assert!(result.is_err());
421    }
422
423    #[test]
424    fn test_map_config_missing_path() {
425        let input = json!({
426            "mappings": [
427                {
428                    "logic": {"var": "data.source"}
429                }
430            ]
431        });
432        let result = MapConfig::from_json(&input);
433        assert!(result.is_err());
434    }
435
436    #[test]
437    fn test_map_config_missing_logic() {
438        let input = json!({
439            "mappings": [
440                {
441                    "path": "data.field1"
442                }
443            ]
444        });
445        let result = MapConfig::from_json(&input);
446        assert!(result.is_err());
447    }
448
449    #[test]
450    fn test_map_metadata_assignment() {
451        // Test that metadata field assignments work correctly
452        let datalogic = Arc::new(DataLogic::with_preserve_structure());
453
454        // Create test message
455        let mut message = Message::new(Arc::new(json!({})));
456        message.context["data"] = json!({
457            "SwiftMT": {
458                "message_type": "103"
459            }
460        });
461
462        // Create mapping config that assigns from data to metadata
463        let config = MapConfig {
464            mappings: vec![MapMapping {
465                path: "metadata.SwiftMT.message_type".to_string(),
466                logic: json!({"var": "data.SwiftMT.message_type"}),
467                logic_index: Some(0),
468            }],
469        };
470
471        // Compile the logic
472        let logic_cache = vec![datalogic.compile(&config.mappings[0].logic).unwrap()];
473
474        // Execute the mapping
475        let result = config.execute(&mut message, &datalogic, &logic_cache);
476        assert!(result.is_ok());
477
478        let (status, changes) = result.unwrap();
479        assert_eq!(status, 200);
480        assert_eq!(changes.len(), 1);
481
482        // Verify metadata was updated
483        assert_eq!(
484            message.context["metadata"]
485                .get("SwiftMT")
486                .and_then(|v| v.get("message_type")),
487            Some(&json!("103"))
488        );
489    }
490
491    #[test]
492    fn test_map_null_values_skip_assignment() {
493        // Test that null evaluation results skip the mapping entirely
494        let datalogic = Arc::new(DataLogic::with_preserve_structure());
495
496        // Create test message with existing values
497        let mut message = Message::new(Arc::new(json!({})));
498        message.context["data"] = json!({
499            "existing_field": "should_remain"
500        });
501        message.context["metadata"] = json!({
502            "existing_meta": "should_remain"
503        });
504
505        // Create mapping config that would return null
506        let config = MapConfig {
507            mappings: vec![
508                MapMapping {
509                    path: "data.new_field".to_string(),
510                    logic: json!({"var": "data.non_existent_field"}), // This will return null
511                    logic_index: Some(0),
512                },
513                MapMapping {
514                    path: "metadata.new_meta".to_string(),
515                    logic: json!({"var": "data.another_non_existent"}), // This will return null
516                    logic_index: Some(1),
517                },
518                MapMapping {
519                    path: "data.actual_field".to_string(),
520                    logic: json!("actual_value"), // This will succeed
521                    logic_index: Some(2),
522                },
523            ],
524        };
525
526        // Compile the logic
527        let logic_cache = vec![
528            datalogic.compile(&config.mappings[0].logic).unwrap(),
529            datalogic.compile(&config.mappings[1].logic).unwrap(),
530            datalogic.compile(&config.mappings[2].logic).unwrap(),
531        ];
532
533        // Execute the mapping
534        let result = config.execute(&mut message, &datalogic, &logic_cache);
535        assert!(result.is_ok());
536
537        let (status, changes) = result.unwrap();
538        assert_eq!(status, 200);
539        // Only one change should be recorded (the non-null mapping)
540        assert_eq!(changes.len(), 1);
541        assert_eq!(changes[0].path.as_ref(), "data.actual_field");
542
543        // Verify that null mappings were skipped (fields don't exist)
544        assert_eq!(message.context["data"].get("new_field"), None);
545        assert_eq!(message.context["metadata"].get("new_meta"), None);
546
547        // Verify existing fields remain unchanged
548        assert_eq!(
549            message.context["data"].get("existing_field"),
550            Some(&json!("should_remain"))
551        );
552        assert_eq!(
553            message.context["metadata"].get("existing_meta"),
554            Some(&json!("should_remain"))
555        );
556
557        // Verify the successful mapping
558        assert_eq!(
559            message.context["data"].get("actual_field"),
560            Some(&json!("actual_value"))
561        );
562    }
563
564    #[test]
565    fn test_map_execute_with_trace_captures_context_snapshots() {
566        let datalogic = Arc::new(DataLogic::with_preserve_structure());
567
568        let mut message = Message::new(Arc::new(json!({})));
569        message.context["data"] = json!({
570            "first": "Alice",
571            "last": "Smith"
572        });
573
574        let mut config = MapConfig {
575            mappings: vec![
576                MapMapping {
577                    path: "data.full_name".to_string(),
578                    logic: json!({"cat": [{"var": "data.first"}, " ", {"var": "data.last"}]}),
579                    logic_index: None,
580                },
581                MapMapping {
582                    path: "data.greeting".to_string(),
583                    logic: json!({"cat": ["Hello, ", {"var": "data.full_name"}]}),
584                    logic_index: None,
585                },
586            ],
587        };
588
589        let mut logic_cache = Vec::new();
590        for (i, mapping) in config.mappings.iter_mut().enumerate() {
591            logic_cache.push(datalogic.compile(&mapping.logic).unwrap());
592            mapping.logic_index = Some(i);
593        }
594
595        let result = config.execute_with_trace(&mut message, &datalogic, &logic_cache);
596        assert!(result.is_ok());
597
598        let (status, changes, context_snapshots) = result.unwrap();
599        assert_eq!(status, 200);
600        assert_eq!(changes.len(), 2);
601        assert_eq!(context_snapshots.len(), 2);
602
603        // First snapshot: before any mapping, no full_name
604        assert!(context_snapshots[0]["data"].get("full_name").is_none());
605
606        // Second snapshot: after first mapping, full_name should exist
607        assert_eq!(
608            context_snapshots[1]["data"].get("full_name"),
609            Some(&json!("Alice Smith"))
610        );
611    }
612
613    #[test]
614    fn test_map_multiple_fields_including_metadata() {
615        // Test mapping to data, metadata, and temp_data in one task
616        let datalogic = Arc::new(DataLogic::with_preserve_structure());
617
618        // Create test message
619        let mut message = Message::new(Arc::new(json!({})));
620        message.context["data"] = json!({
621            "ISO20022_MX": {
622                "document": {
623                    "TxInf": {
624                        "OrgnlGrpInf": {
625                            "OrgnlMsgNmId": "pacs.008.001.08"
626                        }
627                    }
628                }
629            },
630            "SwiftMT": {
631                "message_type": "103"
632            }
633        });
634
635        // Create mapping config with multiple mappings
636        let mut config = MapConfig {
637            mappings: vec![
638                MapMapping {
639                    path: "data.SwiftMT.message_type".to_string(),
640                    logic: json!("103"),
641                    logic_index: None,
642                },
643                MapMapping {
644                    path: "metadata.SwiftMT.message_type".to_string(),
645                    logic: json!({"var": "data.SwiftMT.message_type"}),
646                    logic_index: None,
647                },
648                MapMapping {
649                    path: "temp_data.original_msg_type".to_string(),
650                    logic: json!({"var": "data.ISO20022_MX.document.TxInf.OrgnlGrpInf.OrgnlMsgNmId"}),
651                    logic_index: None,
652                },
653            ],
654        };
655
656        // Compile the logic and set indices
657        let mut logic_cache = Vec::new();
658        for (i, mapping) in config.mappings.iter_mut().enumerate() {
659            logic_cache.push(datalogic.compile(&mapping.logic).unwrap());
660            mapping.logic_index = Some(i);
661        }
662
663        // Execute the mapping
664        let result = config.execute(&mut message, &datalogic, &logic_cache);
665        assert!(result.is_ok());
666
667        let (status, changes) = result.unwrap();
668        assert_eq!(status, 200);
669        assert_eq!(changes.len(), 3);
670
671        // Verify all fields were updated correctly
672        assert_eq!(
673            message.context["data"]
674                .get("SwiftMT")
675                .and_then(|v| v.get("message_type")),
676            Some(&json!("103"))
677        );
678        assert_eq!(
679            message.context["metadata"]
680                .get("SwiftMT")
681                .and_then(|v| v.get("message_type")),
682            Some(&json!("103"))
683        );
684        assert_eq!(
685            message.context["temp_data"].get("original_msg_type"),
686            Some(&json!("pacs.008.001.08"))
687        );
688    }
689}