Skip to main content

dataflow_rs/engine/functions/
map.rs

1//! # Map Function Module
2//!
3//! This module provides data transformation capabilities using JSONLogic expressions.
4//! The map function allows copying, transforming, and reorganizing data within messages
5//! by evaluating JSONLogic expressions and assigning results to specified paths.
6//!
7//! ## Features
8//!
9//! - Transform data using JSONLogic expressions
10//! - Support for nested path access and creation
11//! - Automatic merging for root fields (data, metadata, temp_data)
12//! - Null value handling (null results skip assignment)
13//! - Change tracking for audit trails
14//!
15//! ## Example Usage
16//!
17//! ```json
18//! {
19//!     "name": "map",
20//!     "input": {
21//!         "mappings": [
22//!             {
23//!                 "path": "data.full_name",
24//!                 "logic": {"cat": [{"var": "data.first_name"}, " ", {"var": "data.last_name"}]}
25//!             },
26//!             {
27//!                 "path": "metadata.processed",
28//!                 "logic": true
29//!             }
30//!         ]
31//!     }
32//! }
33//! ```
34
35use crate::engine::error::{DataflowError, Result};
36use crate::engine::message::{Change, Message};
37use crate::engine::utils::{get_nested_value, set_nested_value};
38use datalogic_rs::{CompiledLogic, DataLogic};
39use log::{debug, error};
40use serde::Deserialize;
41use serde_json::Value;
42use std::sync::Arc;
43
44/// Configuration for the map function containing a list of mappings.
45///
46/// Each mapping specifies a target path and a JSONLogic expression to evaluate.
47/// Mappings are processed sequentially, allowing later mappings to use results
48/// from earlier ones.
49#[derive(Debug, Clone, Deserialize)]
50pub struct MapConfig {
51    /// List of mappings to execute in order.
52    pub mappings: Vec<MapMapping>,
53}
54
55/// A single mapping that transforms and assigns data.
56///
57/// The mapping evaluates a JSONLogic expression against the message context
58/// and assigns the result to the specified path.
59#[derive(Debug, Clone, Deserialize)]
60pub struct MapMapping {
61    /// Target path where the result will be stored (e.g., "data.user.name").
62    /// Supports dot notation for nested paths and `#` prefix for numeric field names.
63    pub path: String,
64
65    /// JSONLogic expression to evaluate. Can reference any field in the context
66    /// using `{"var": "path.to.field"}` syntax.
67    pub logic: Value,
68
69    /// Index into the compiled logic cache. Set during workflow compilation.
70    #[serde(skip)]
71    pub logic_index: Option<usize>,
72}
73
74impl MapConfig {
75    /// Parses a `MapConfig` from a JSON value.
76    ///
77    /// # Arguments
78    /// * `input` - JSON object containing a "mappings" array
79    ///
80    /// # Errors
81    /// Returns `DataflowError::Validation` if:
82    /// - The "mappings" field is missing
83    /// - The "mappings" field is not an array
84    /// - Any mapping is missing "path" or "logic" fields
85    pub fn from_json(input: &Value) -> Result<Self> {
86        let mappings = input.get("mappings").ok_or_else(|| {
87            DataflowError::Validation("Missing 'mappings' array in input".to_string())
88        })?;
89
90        let mappings_arr = mappings
91            .as_array()
92            .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
93
94        let mut parsed_mappings = Vec::new();
95
96        for mapping in mappings_arr {
97            let path = mapping
98                .get("path")
99                .and_then(Value::as_str)
100                .ok_or_else(|| DataflowError::Validation("Missing 'path' in mapping".to_string()))?
101                .to_string();
102
103            let logic = mapping
104                .get("logic")
105                .ok_or_else(|| DataflowError::Validation("Missing 'logic' in mapping".to_string()))?
106                .clone();
107
108            parsed_mappings.push(MapMapping {
109                path,
110                logic,
111                logic_index: None,
112            });
113        }
114
115        Ok(MapConfig {
116            mappings: parsed_mappings,
117        })
118    }
119
120    /// Executes all map transformations using pre-compiled logic.
121    ///
122    /// Processes each mapping sequentially, evaluating the JSONLogic expression
123    /// and assigning the result to the target path. Changes are tracked for
124    /// audit trail purposes.
125    ///
126    /// # Arguments
127    /// * `message` - The message to transform (modified in place)
128    /// * `datalogic` - DataLogic instance for evaluation
129    /// * `logic_cache` - Pre-compiled logic expressions
130    ///
131    /// # Returns
132    /// * `Ok((status, changes))` - Status code (200 success, 500 if errors) and list of changes
133    /// * `Err` - If a critical error occurs during execution
134    ///
135    /// # Behavior
136    /// - Null evaluation results are skipped (no assignment made)
137    /// - Root field assignments (data, metadata, temp_data) merge objects instead of replacing
138    /// - Each successful assignment invalidates the context cache for subsequent mappings
139    pub fn execute(
140        &self,
141        message: &mut Message,
142        datalogic: &Arc<DataLogic>,
143        logic_cache: &[Arc<CompiledLogic>],
144    ) -> Result<(usize, Vec<Change>)> {
145        let mut changes = Vec::new();
146        let mut errors_encountered = false;
147
148        debug!("Map: Executing {} mappings", self.mappings.len());
149
150        // Process each mapping
151        for mapping in &self.mappings {
152            // Get or create context Arc for this iteration
153            // This ensures we use cached Arc when available, or create fresh one after modifications
154            let context_arc = message.get_context_arc();
155            debug!("Processing mapping to path: {}", mapping.path);
156
157            // Get the compiled logic from cache with proper bounds checking
158            let compiled_logic = match mapping.logic_index {
159                Some(index) => {
160                    // Ensure index is valid before accessing
161                    if index >= logic_cache.len() {
162                        error!(
163                            "Map: Logic index {} out of bounds (cache size: {}) for mapping to {}",
164                            index,
165                            logic_cache.len(),
166                            mapping.path
167                        );
168                        errors_encountered = true;
169                        continue;
170                    }
171                    &logic_cache[index]
172                }
173                None => {
174                    error!(
175                        "Map: Logic not compiled (no index) for mapping to {}",
176                        mapping.path
177                    );
178                    errors_encountered = true;
179                    continue;
180                }
181            };
182
183            // Evaluate the transformation logic using DataLogic v4
184            // DataLogic v4 is thread-safe with Arc<CompiledLogic>, no spawn_blocking needed
185            let result = datalogic.evaluate(compiled_logic, Arc::clone(&context_arc));
186
187            match result {
188                Ok(transformed_value) => {
189                    debug!(
190                        "Map: Evaluated logic for path {} resulted in: {:?}",
191                        mapping.path, transformed_value
192                    );
193
194                    // Skip mapping if the result is null
195                    if transformed_value.is_null() {
196                        debug!(
197                            "Map: Skipping mapping for path {} as result is null",
198                            mapping.path
199                        );
200                        continue;
201                    }
202
203                    // Get old value from the appropriate location in context
204                    let old_value = get_nested_value(&message.context, &mapping.path);
205
206                    let old_value_arc = Arc::new(old_value.cloned().unwrap_or(Value::Null));
207                    let new_value_arc = Arc::new(transformed_value.clone());
208
209                    debug!(
210                        "Recording change for path '{}': old={:?}, new={:?}",
211                        mapping.path, old_value_arc, new_value_arc
212                    );
213                    changes.push(Change {
214                        path: Arc::from(mapping.path.as_str()),
215                        old_value: old_value_arc,
216                        new_value: Arc::clone(&new_value_arc),
217                    });
218
219                    // Update the context directly with the transformed value
220                    // Check if we're replacing a root field (data, metadata, or temp_data)
221                    if mapping.path == "data"
222                        || mapping.path == "metadata"
223                        || mapping.path == "temp_data"
224                    {
225                        // Merge with existing field instead of replacing entirely
226                        if let Value::Object(new_map) = transformed_value {
227                            // If new value is an object, merge its fields
228                            if let Value::Object(existing_map) = &mut message.context[&mapping.path]
229                            {
230                                // Merge new fields into existing object
231                                for (key, value) in new_map {
232                                    existing_map.insert(key, value);
233                                }
234                            } else {
235                                // If existing is not an object, replace with new object
236                                message.context[&mapping.path] = Value::Object(new_map);
237                            }
238                        } else {
239                            // If new value is not an object, replace entirely
240                            message.context[&mapping.path] = transformed_value;
241                        }
242                    } else {
243                        // Set nested value in context
244                        set_nested_value(&mut message.context, &mapping.path, transformed_value);
245                    }
246                    // Invalidate the cached context Arc since we modified the context
247                    // The next iteration (if any) will create a fresh Arc when needed
248                    message.invalidate_context_cache();
249                    debug!("Successfully mapped to path: {}", mapping.path);
250                }
251                Err(e) => {
252                    error!(
253                        "Map: Error evaluating logic for path {}: {:?}",
254                        mapping.path, e
255                    );
256                    errors_encountered = true;
257                }
258            }
259        }
260
261        // Return appropriate status based on results
262        let status = if errors_encountered { 500 } else { 200 };
263        Ok((status, changes))
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use crate::engine::message::Message;
271    use serde_json::json;
272
273    #[test]
274    fn test_map_config_from_json() {
275        let input = json!({
276            "mappings": [
277                {
278                    "path": "data.field1",
279                    "logic": {"var": "data.source"}
280                },
281                {
282                    "path": "data.field2",
283                    "logic": "static_value"
284                }
285            ]
286        });
287
288        let config = MapConfig::from_json(&input).unwrap();
289        assert_eq!(config.mappings.len(), 2);
290        assert_eq!(config.mappings[0].path, "data.field1");
291        assert_eq!(config.mappings[1].path, "data.field2");
292    }
293
294    #[test]
295    fn test_map_config_missing_mappings() {
296        let input = json!({});
297        let result = MapConfig::from_json(&input);
298        assert!(result.is_err());
299    }
300
301    #[test]
302    fn test_map_config_invalid_mappings() {
303        let input = json!({
304            "mappings": "not_an_array"
305        });
306        let result = MapConfig::from_json(&input);
307        assert!(result.is_err());
308    }
309
310    #[test]
311    fn test_map_config_missing_path() {
312        let input = json!({
313            "mappings": [
314                {
315                    "logic": {"var": "data.source"}
316                }
317            ]
318        });
319        let result = MapConfig::from_json(&input);
320        assert!(result.is_err());
321    }
322
323    #[test]
324    fn test_map_config_missing_logic() {
325        let input = json!({
326            "mappings": [
327                {
328                    "path": "data.field1"
329                }
330            ]
331        });
332        let result = MapConfig::from_json(&input);
333        assert!(result.is_err());
334    }
335
336    #[test]
337    fn test_map_metadata_assignment() {
338        // Test that metadata field assignments work correctly
339        let datalogic = Arc::new(DataLogic::with_preserve_structure());
340
341        // Create test message
342        let mut message = Message::new(Arc::new(json!({})));
343        message.context["data"] = json!({
344            "SwiftMT": {
345                "message_type": "103"
346            }
347        });
348
349        // Create mapping config that assigns from data to metadata
350        let config = MapConfig {
351            mappings: vec![MapMapping {
352                path: "metadata.SwiftMT.message_type".to_string(),
353                logic: json!({"var": "data.SwiftMT.message_type"}),
354                logic_index: Some(0),
355            }],
356        };
357
358        // Compile the logic
359        let logic_cache = vec![datalogic.compile(&config.mappings[0].logic).unwrap()];
360
361        // Execute the mapping
362        let result = config.execute(&mut message, &datalogic, &logic_cache);
363        assert!(result.is_ok());
364
365        let (status, changes) = result.unwrap();
366        assert_eq!(status, 200);
367        assert_eq!(changes.len(), 1);
368
369        // Verify metadata was updated
370        assert_eq!(
371            message.context["metadata"]
372                .get("SwiftMT")
373                .and_then(|v| v.get("message_type")),
374            Some(&json!("103"))
375        );
376    }
377
378    #[test]
379    fn test_map_null_values_skip_assignment() {
380        // Test that null evaluation results skip the mapping entirely
381        let datalogic = Arc::new(DataLogic::with_preserve_structure());
382
383        // Create test message with existing values
384        let mut message = Message::new(Arc::new(json!({})));
385        message.context["data"] = json!({
386            "existing_field": "should_remain"
387        });
388        message.context["metadata"] = json!({
389            "existing_meta": "should_remain"
390        });
391
392        // Create mapping config that would return null
393        let config = MapConfig {
394            mappings: vec![
395                MapMapping {
396                    path: "data.new_field".to_string(),
397                    logic: json!({"var": "data.non_existent_field"}), // This will return null
398                    logic_index: Some(0),
399                },
400                MapMapping {
401                    path: "metadata.new_meta".to_string(),
402                    logic: json!({"var": "data.another_non_existent"}), // This will return null
403                    logic_index: Some(1),
404                },
405                MapMapping {
406                    path: "data.actual_field".to_string(),
407                    logic: json!("actual_value"), // This will succeed
408                    logic_index: Some(2),
409                },
410            ],
411        };
412
413        // Compile the logic
414        let logic_cache = vec![
415            datalogic.compile(&config.mappings[0].logic).unwrap(),
416            datalogic.compile(&config.mappings[1].logic).unwrap(),
417            datalogic.compile(&config.mappings[2].logic).unwrap(),
418        ];
419
420        // Execute the mapping
421        let result = config.execute(&mut message, &datalogic, &logic_cache);
422        assert!(result.is_ok());
423
424        let (status, changes) = result.unwrap();
425        assert_eq!(status, 200);
426        // Only one change should be recorded (the non-null mapping)
427        assert_eq!(changes.len(), 1);
428        assert_eq!(changes[0].path.as_ref(), "data.actual_field");
429
430        // Verify that null mappings were skipped (fields don't exist)
431        assert_eq!(message.context["data"].get("new_field"), None);
432        assert_eq!(message.context["metadata"].get("new_meta"), None);
433
434        // Verify existing fields remain unchanged
435        assert_eq!(
436            message.context["data"].get("existing_field"),
437            Some(&json!("should_remain"))
438        );
439        assert_eq!(
440            message.context["metadata"].get("existing_meta"),
441            Some(&json!("should_remain"))
442        );
443
444        // Verify the successful mapping
445        assert_eq!(
446            message.context["data"].get("actual_field"),
447            Some(&json!("actual_value"))
448        );
449    }
450
451    #[test]
452    fn test_map_multiple_fields_including_metadata() {
453        // Test mapping to data, metadata, and temp_data in one task
454        let datalogic = Arc::new(DataLogic::with_preserve_structure());
455
456        // Create test message
457        let mut message = Message::new(Arc::new(json!({})));
458        message.context["data"] = json!({
459            "ISO20022_MX": {
460                "document": {
461                    "TxInf": {
462                        "OrgnlGrpInf": {
463                            "OrgnlMsgNmId": "pacs.008.001.08"
464                        }
465                    }
466                }
467            },
468            "SwiftMT": {
469                "message_type": "103"
470            }
471        });
472
473        // Create mapping config with multiple mappings
474        let mut config = MapConfig {
475            mappings: vec![
476                MapMapping {
477                    path: "data.SwiftMT.message_type".to_string(),
478                    logic: json!("103"),
479                    logic_index: None,
480                },
481                MapMapping {
482                    path: "metadata.SwiftMT.message_type".to_string(),
483                    logic: json!({"var": "data.SwiftMT.message_type"}),
484                    logic_index: None,
485                },
486                MapMapping {
487                    path: "temp_data.original_msg_type".to_string(),
488                    logic: json!({"var": "data.ISO20022_MX.document.TxInf.OrgnlGrpInf.OrgnlMsgNmId"}),
489                    logic_index: None,
490                },
491            ],
492        };
493
494        // Compile the logic and set indices
495        let mut logic_cache = Vec::new();
496        for (i, mapping) in config.mappings.iter_mut().enumerate() {
497            logic_cache.push(datalogic.compile(&mapping.logic).unwrap());
498            mapping.logic_index = Some(i);
499        }
500
501        // Execute the mapping
502        let result = config.execute(&mut message, &datalogic, &logic_cache);
503        assert!(result.is_ok());
504
505        let (status, changes) = result.unwrap();
506        assert_eq!(status, 200);
507        assert_eq!(changes.len(), 3);
508
509        // Verify all fields were updated correctly
510        assert_eq!(
511            message.context["data"]
512                .get("SwiftMT")
513                .and_then(|v| v.get("message_type")),
514            Some(&json!("103"))
515        );
516        assert_eq!(
517            message.context["metadata"]
518                .get("SwiftMT")
519                .and_then(|v| v.get("message_type")),
520            Some(&json!("103"))
521        );
522        assert_eq!(
523            message.context["temp_data"].get("original_msg_type"),
524            Some(&json!("pacs.008.001.08"))
525        );
526    }
527}