dataflow_rs/engine/functions/
map.rs

1use crate::engine::AsyncFunctionHandler;
2use crate::engine::error::{DataflowError, Result};
3use crate::engine::message::{Change, Message};
4use async_trait::async_trait;
5use datalogic_rs::DataLogic;
6use log::error;
7use serde_json::{Value, json};
8
9/// A mapping function that transforms data using JSONLogic expressions.
10///
11/// This function allows mapping data from one location to another within
12/// a message, applying transformations using JSONLogic expressions.
13pub struct MapFunction;
14
15impl Default for MapFunction {
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl MapFunction {
22    /// Create a new MapFunction
23    pub fn new() -> Self {
24        Self
25    }
26
27    /// Set a value at the specified path within the target object
28    fn set_value_at_path(&self, target: &mut Value, path: &str, value: &Value) -> Result<Value> {
29        let mut current = target;
30        let mut old_value = Value::Null;
31        let path_parts: Vec<&str> = path.split('.').collect();
32
33        // Helper function to check if a string is a valid array index
34        fn is_numeric_index(s: &str) -> bool {
35            s.parse::<usize>().is_ok()
36        }
37
38        // Navigate to the parent of the target location
39        for (i, part) in path_parts.iter().enumerate() {
40            let is_numeric = is_numeric_index(part);
41
42            if i == path_parts.len() - 1 {
43                // We're at the last part, so set the value
44                if is_numeric {
45                    // Handle numeric index - ensure current is an array
46                    if !current.is_array() {
47                        // Convert to array if it's not already
48                        *current = Value::Array(vec![]);
49                    }
50
51                    if let Ok(index) = part.parse::<usize>() {
52                        if let Value::Array(arr) = current {
53                            // Extend array if needed
54                            while arr.len() <= index {
55                                arr.push(Value::Null);
56                            }
57                            // Save old value
58                            old_value = arr[index].clone();
59                            arr[index] = value.clone();
60                        }
61                    } else {
62                        error!("Invalid array index: {part}");
63                        return Err(DataflowError::Validation(format!(
64                            "Invalid array index: {part}"
65                        )));
66                    }
67                } else {
68                    // Handle object property
69                    if !current.is_object() {
70                        // Convert to object if it's not already
71                        *current = Value::Object(serde_json::Map::new());
72                    }
73
74                    if let Value::Object(map) = current {
75                        // Save the old value before replacing
76                        let mut key = part.to_string();
77                        if key.starts_with("#") {
78                            key = key.strip_prefix("#").unwrap_or(&key).to_string();
79                        }
80                        old_value = map.get(&key).cloned().unwrap_or(Value::Null);
81                        // Merge objects if both old and new values are objects
82                        let value_to_insert = if old_value.is_object() && value.is_object() {
83                            let mut merged_map = old_value.as_object().unwrap().clone();
84                            if let Some(new_map) = value.as_object() {
85                                // New values override old values
86                                for (k, v) in new_map {
87                                    merged_map.insert(k.clone(), v.clone());
88                                }
89                            }
90                            Value::Object(merged_map)
91                        } else {
92                            value.clone()
93                        };
94                        map.insert(key, value_to_insert);
95                    }
96                }
97            } else {
98                // We need to navigate deeper
99                if is_numeric {
100                    // Handle numeric index - ensure current is an array
101                    if !current.is_array() {
102                        *current = Value::Array(vec![]);
103                    }
104
105                    if let Ok(index) = part.parse::<usize>() {
106                        if let Value::Array(arr) = current {
107                            // Extend array if needed
108                            while arr.len() <= index {
109                                arr.push(Value::Null);
110                            }
111                            // Ensure the indexed element exists and is ready for further navigation
112                            if arr[index].is_null() {
113                                // Look ahead to see if next part is numeric to decide what to create
114                                let next_part = path_parts.get(i + 1).unwrap_or(&"");
115                                if is_numeric_index(next_part) {
116                                    arr[index] = Value::Array(vec![]);
117                                } else {
118                                    arr[index] = json!({});
119                                }
120                            }
121                            current = &mut arr[index];
122                        }
123                    } else {
124                        error!("Invalid array index: {part}");
125                        return Err(DataflowError::Validation(format!(
126                            "Invalid array index: {part}"
127                        )));
128                    }
129                } else {
130                    // Handle object property
131                    if !current.is_object() {
132                        *current = Value::Object(serde_json::Map::new());
133                    }
134
135                    if let Value::Object(map) = current {
136                        let mut key = part.to_string();
137                        if key.starts_with("#") {
138                            key = key.strip_prefix("#").unwrap_or(&key).to_string();
139                        }
140                        if !map.contains_key(&key) {
141                            // Look ahead to see if next part is numeric to decide what to create
142                            let next_part = path_parts.get(i + 1).unwrap_or(&"");
143                            if is_numeric_index(next_part) {
144                                map.insert(part.to_string(), Value::Array(vec![]));
145                            } else {
146                                map.insert(key.clone(), json!({}));
147                            }
148                        }
149                        current = map.get_mut(&key).unwrap();
150                    }
151                }
152            }
153        }
154
155        Ok(old_value)
156    }
157}
158
159#[async_trait]
160impl AsyncFunctionHandler for MapFunction {
161    async fn execute(
162        &self,
163        message: &mut Message,
164        input: &Value,
165        data_logic: &mut DataLogic,
166    ) -> Result<(usize, Vec<Change>)> {
167        // Extract mappings array from input
168        let mappings = input.get("mappings").ok_or_else(|| {
169            DataflowError::Validation("Missing 'mappings' array in input".to_string())
170        })?;
171
172        let mappings_arr = mappings
173            .as_array()
174            .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
175
176        let mut changes = Vec::new();
177
178        // Process each mapping
179        for mapping in mappings_arr {
180            // Get path where to store the result
181            let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
182                DataflowError::Validation("Missing 'path' in mapping".to_string())
183            })?;
184
185            // Get the logic to evaluate
186            let logic = mapping.get("logic").ok_or_else(|| {
187                DataflowError::Validation("Missing 'logic' in mapping".to_string())
188            })?;
189
190            // Clone message data for evaluation context - do this for each iteration
191            // to ensure subsequent mappings see changes from previous mappings
192            let data_clone = message.data.clone();
193            let metadata_clone = message.metadata.clone();
194            let temp_data_clone = message.temp_data.clone();
195
196            // Create a combined data object with message fields for evaluation
197            let data_for_eval = json!({
198                "data": data_clone,
199                "metadata": metadata_clone,
200                "temp_data": temp_data_clone,
201            });
202
203            // Determine target object based on path prefix
204            let (target_object, adjusted_path) =
205                if let Some(path) = target_path.strip_prefix("data.") {
206                    (&mut message.data, path)
207                } else if let Some(path) = target_path.strip_prefix("metadata.") {
208                    (&mut message.metadata, path)
209                } else if let Some(path) = target_path.strip_prefix("temp_data.") {
210                    (&mut message.temp_data, path)
211                } else if target_path == "data" {
212                    (&mut message.data, "")
213                } else if target_path == "metadata" {
214                    (&mut message.metadata, "")
215                } else if target_path == "temp_data" {
216                    (&mut message.temp_data, "")
217                } else {
218                    // Default to data
219                    (&mut message.data, target_path)
220                };
221
222            // Evaluate the logic using provided DataLogic
223            data_logic.reset_arena();
224            let result = data_logic
225                .evaluate_json(logic, &data_for_eval, None)
226                .map_err(|e| {
227                    error!("Failed to evaluate logic: {e} for {logic}, {data_for_eval}");
228                    DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {e}"))
229                })?;
230
231            if result.is_null() {
232                continue;
233            }
234
235            // Set the result at the target path
236            if adjusted_path.is_empty() {
237                // Replace the entire object
238                let old_value = if target_object.is_object() && result.is_object() {
239                    let mut merged_map = target_object.as_object().unwrap().clone();
240                    if let Some(new_map) = result.as_object() {
241                        // New values override old values
242                        for (k, v) in new_map {
243                            merged_map.insert(k.clone(), v.clone());
244                        }
245                    }
246                    std::mem::replace(target_object, Value::Object(merged_map))
247                } else {
248                    std::mem::replace(target_object, result.clone())
249                };
250                changes.push(Change {
251                    path: target_path.to_string(),
252                    old_value,
253                    new_value: result,
254                });
255            } else {
256                // Set a specific path
257                let old_value = self.set_value_at_path(target_object, adjusted_path, &result)?;
258                changes.push(Change {
259                    path: target_path.to_string(),
260                    old_value,
261                    new_value: result,
262                });
263            }
264        }
265
266        Ok((200, changes))
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use crate::engine::message::Message;
274    use datalogic_rs::DataLogic;
275    use serde_json::json;
276
277    #[tokio::test]
278    async fn test_array_notation_simple() {
279        let map_fn = MapFunction::new();
280
281        // Test simple array notation: data.items.0.name
282        let mut message = Message::new(&json!({}));
283        message.data = json!({});
284
285        let input = json!({
286            "mappings": [
287                {
288                    "path": "data.items.0.name",
289                    "logic": "Test Item"
290                }
291            ]
292        });
293
294        let mut data_logic = DataLogic::with_preserve_structure();
295        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
296
297        assert!(result.is_ok());
298        let expected = json!({
299            "items": [
300                {
301                    "name": "Test Item"
302                }
303            ]
304        });
305        assert_eq!(message.data, expected);
306    }
307
308    #[tokio::test]
309    async fn test_array_notation_complex_path() {
310        let map_fn = MapFunction::new();
311
312        // Test complex path like the original example: data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId
313        let mut message = Message::new(&json!({}));
314        message.data = json!({});
315
316        let input = json!({
317            "mappings": [
318                {
319                    "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
320                    "logic": "INSTR123"
321                }
322            ]
323        });
324
325        let mut data_logic = DataLogic::with_preserve_structure();
326        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
327
328        assert!(result.is_ok());
329        let expected = json!({
330            "MX": {
331                "FIToFICstmrCdtTrf": {
332                    "CdtTrfTxInf": [
333                        {
334                            "PmtId": {
335                                "InstrId": "INSTR123"
336                            }
337                        }
338                    ]
339                }
340            }
341        });
342        assert_eq!(message.data, expected);
343    }
344
345    #[tokio::test]
346    async fn test_multiple_array_indices() {
347        let map_fn = MapFunction::new();
348
349        // Test multiple array indices in the same path: data.matrix.0.1.value
350        let mut message = Message::new(&json!({}));
351        message.data = json!({});
352
353        let input = json!({
354            "mappings": [
355                {
356                    "path": "data.matrix.0.1.value",
357                    "logic": "cell_01"
358                },
359                {
360                    "path": "data.matrix.1.0.value",
361                    "logic": "cell_10"
362                }
363            ]
364        });
365
366        let mut data_logic = DataLogic::with_preserve_structure();
367        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
368
369        assert!(result.is_ok());
370        let expected = json!({
371            "matrix": [
372                [
373                    null,
374                    {
375                        "value": "cell_01"
376                    }
377                ],
378                [
379                    {
380                        "value": "cell_10"
381                    }
382                ]
383            ]
384        });
385        assert_eq!(message.data, expected);
386    }
387
388    #[tokio::test]
389    async fn test_array_extension() {
390        let map_fn = MapFunction::new();
391
392        // Test that arrays are extended when accessing high indices
393        let mut message = Message::new(&json!({}));
394        message.data = json!({});
395
396        let input = json!({
397            "mappings": [
398                {
399                    "path": "data.items.5.name",
400                    "logic": "Item at index 5"
401                }
402            ]
403        });
404
405        let mut data_logic = DataLogic::with_preserve_structure();
406        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
407
408        assert!(result.is_ok());
409
410        // Should create an array with 6 elements (indices 0-5)
411        assert!(message.data["items"].is_array());
412        let items_array = message.data["items"].as_array().unwrap();
413        assert_eq!(items_array.len(), 6);
414
415        // First 5 elements should be null
416        for i in 0..5 {
417            assert_eq!(items_array[i], json!(null));
418        }
419
420        // Element at index 5 should have our value
421        assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
422    }
423
424    #[tokio::test]
425    async fn test_mixed_array_and_object_notation() {
426        let map_fn = MapFunction::new();
427
428        // Test mixing array and object notation: data.users.0.profile.addresses.1.city
429        let mut message = Message::new(&json!({}));
430        message.data = json!({});
431
432        let input = json!({
433            "mappings": [
434                {
435                    "path": "data.users.0.profile.addresses.1.city",
436                    "logic": "New York"
437                },
438                {
439                    "path": "data.users.0.profile.name",
440                    "logic": "John Doe"
441                }
442            ]
443        });
444
445        let mut data_logic = DataLogic::with_preserve_structure();
446        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
447
448        assert!(result.is_ok());
449        let expected = json!({
450            "users": [
451                {
452                    "profile": {
453                        "name": "John Doe",
454                        "addresses": [
455                            null,
456                            {
457                                "city": "New York"
458                            }
459                        ]
460                    }
461                }
462            ]
463        });
464        assert_eq!(message.data, expected);
465    }
466
467    #[tokio::test]
468    async fn test_overwrite_existing_value() {
469        let map_fn = MapFunction::new();
470
471        // Test overwriting an existing value in an array
472        let mut message = Message::new(&json!({}));
473        message.data = json!({
474            "items": [
475                {"name": "Old Value"},
476                {"name": "Another Item"}
477            ]
478        });
479
480        let input = json!({
481            "mappings": [
482                {
483                    "path": "data.items.0.name",
484                    "logic": "New Value"
485                }
486            ]
487        });
488
489        let mut data_logic = DataLogic::with_preserve_structure();
490        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
491
492        assert!(result.is_ok());
493        let expected = json!({
494            "items": [
495                {"name": "New Value"},
496                {"name": "Another Item"}
497            ]
498        });
499        assert_eq!(message.data, expected);
500
501        // Check that changes are recorded
502        let (_, changes) = result.unwrap();
503        assert_eq!(changes.len(), 1);
504        assert_eq!(changes[0].path, "data.items.0.name");
505        assert_eq!(changes[0].old_value, json!("Old Value"));
506        assert_eq!(changes[0].new_value, json!("New Value"));
507    }
508
509    #[tokio::test]
510    async fn test_array_notation_with_jsonlogic() {
511        let map_fn = MapFunction::new();
512
513        // Test array notation with JSONLogic expressions
514        let mut message = Message::new(&json!({}));
515        message.temp_data = json!({
516            "transactions": [
517                {"id": "tx1", "amount": 100},
518                {"id": "tx2", "amount": 200}
519            ]
520        });
521        message.data = json!({});
522
523        let input = json!({
524            "mappings": [
525                {
526                    "path": "data.processed.0.transaction_id",
527                    "logic": {"var": "temp_data.transactions.0.id"}
528                },
529                {
530                    "path": "data.processed.0.amount_cents",
531                    "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
532                }
533            ]
534        });
535
536        let mut data_logic = DataLogic::with_preserve_structure();
537        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
538
539        assert!(result.is_ok());
540        let expected = json!({
541            "processed": [
542                {
543                    "transaction_id": "tx1",
544                    "amount_cents": 10000
545                }
546            ]
547        });
548        assert_eq!(message.data, expected);
549    }
550
551    #[tokio::test]
552    async fn test_convert_object_to_array() {
553        let map_fn = MapFunction::new();
554
555        // Test converting an existing object to an array when numeric index is encountered
556        let mut message = Message::new(&json!({}));
557        message.data = json!({
558            "items": {
559                "existing_key": "existing_value"
560            }
561        });
562
563        let input = json!({
564            "mappings": [
565                {
566                    "path": "data.items.0.new_value",
567                    "logic": "array_item"
568                }
569            ]
570        });
571
572        let mut data_logic = DataLogic::with_preserve_structure();
573        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
574
575        assert!(result.is_ok());
576        // The object should be converted to an array
577        assert!(message.data["items"].is_array());
578        let expected = json!({
579            "items": [
580                {
581                    "new_value": "array_item"
582                }
583            ]
584        });
585        assert_eq!(message.data, expected);
586    }
587
588    #[tokio::test]
589    async fn test_non_numeric_index_handling() {
590        let map_fn = MapFunction::new();
591
592        // Test that non-numeric strings are treated as object keys, not array indices
593        let mut message = Message::new(&json!({}));
594        message.data = json!({});
595
596        let input = json!({
597            "mappings": [
598                {
599                    "path": "data.items.invalid_index.name",
600                    "logic": "test"
601                }
602            ]
603        });
604
605        let mut data_logic = DataLogic::with_preserve_structure();
606        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
607
608        // This should succeed and create an object structure
609        assert!(result.is_ok());
610        let expected = json!({
611            "items": {
612                "invalid_index": {
613                    "name": "test"
614                }
615            }
616        });
617        assert_eq!(message.data, expected);
618
619        // Verify that "invalid_index" was treated as an object key, not array index
620        assert!(message.data["items"].is_object());
621        assert!(!message.data["items"].is_array());
622    }
623
624    #[tokio::test]
625    async fn test_object_merge_on_mapping() {
626        let map_fn = MapFunction::new();
627
628        // Test that when mapping to an existing object, the values are merged
629        let mut message = Message::new(&json!({}));
630        message.data = json!({
631            "config": {
632                "database": {
633                    "host": "localhost",
634                    "port": 5432,
635                    "username": "admin"
636                }
637            }
638        });
639
640        // First mapping - add new fields to existing object
641        let input = json!({
642            "mappings": [
643                {
644                    "path": "data.config.database",
645                    "logic": {
646                        "password": "secret123",
647                        "ssl": true,
648                        "port": 5433  // This should override the existing port
649                    }
650                }
651            ]
652        });
653
654        let mut data_logic = DataLogic::with_preserve_structure();
655        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
656
657        assert!(result.is_ok());
658        let expected = json!({
659            "config": {
660                "database": {
661                    "host": "localhost",
662                    "port": 5433,  // Updated
663                    "username": "admin",
664                    "password": "secret123",  // Added
665                    "ssl": true  // Added
666                }
667            }
668        });
669        assert_eq!(message.data, expected);
670
671        // Check that changes are recorded correctly
672        let (_, changes) = result.unwrap();
673        assert_eq!(changes.len(), 1);
674        assert_eq!(changes[0].path, "data.config.database");
675        assert_eq!(
676            changes[0].old_value,
677            json!({
678                "host": "localhost",
679                "port": 5432,
680                "username": "admin"
681            })
682        );
683        assert_eq!(
684            changes[0].new_value,
685            json!({
686                "password": "secret123",
687                "ssl": true,
688                "port": 5433
689            })
690        );
691    }
692
693    #[tokio::test]
694    async fn test_object_merge_with_nested_path() {
695        let map_fn = MapFunction::new();
696
697        // Test object merging with a nested path
698        let mut message = Message::new(&json!({}));
699        message.data = json!({
700            "user": {
701                "profile": {
702                    "name": "John Doe",
703                    "age": 30
704                }
705            }
706        });
707
708        let input = json!({
709            "mappings": [
710                {
711                    "path": "data.user.profile",
712                    "logic": {
713                        "email": "john@example.com",
714                        "age": 31,  // Override
715                        "city": "New York"
716                    }
717                }
718            ]
719        });
720
721        let mut data_logic = DataLogic::with_preserve_structure();
722        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
723
724        assert!(result.is_ok());
725        let expected = json!({
726            "user": {
727                "profile": {
728                    "name": "John Doe",  // Preserved
729                    "age": 31,  // Updated
730                    "email": "john@example.com",  // Added
731                    "city": "New York"  // Added
732                }
733            }
734        });
735        assert_eq!(message.data, expected);
736    }
737
738    #[tokio::test]
739    async fn test_non_object_replacement() {
740        let map_fn = MapFunction::new();
741
742        // Test that non-object values are replaced, not merged
743        let mut message = Message::new(&json!({}));
744        message.data = json!({
745            "settings": {
746                "value": "old_string"
747            }
748        });
749
750        let input = json!({
751            "mappings": [
752                {
753                    "path": "data.settings.value",
754                    "logic": {"new": "object"}
755                }
756            ]
757        });
758
759        let mut data_logic = DataLogic::with_preserve_structure();
760        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
761
762        assert!(result.is_ok());
763        // String should be replaced with object, not merged
764        let expected = json!({
765            "settings": {
766                "value": {"new": "object"}
767            }
768        });
769        assert_eq!(message.data, expected);
770    }
771
772    #[tokio::test]
773    async fn test_parent_child_mapping_issue_fix() {
774        let map_fn = MapFunction::new();
775
776        // Test case for GitHub issue #1: Multiple mappings where parent overwrites child
777        let mut message = Message::new(&json!({}));
778        message.data = json!({});
779
780        // First map to parent, then to child - child should preserve parent's other fields
781        let input = json!({
782            "mappings": [
783                {
784                    "path": "data.Parent.Child",
785                    "logic": {
786                        "Field1": "Value1",
787                        "Field2": "Value2"
788                    }
789                },
790                {
791                    "path": "data.Parent.Child.NestedObject",
792                    "logic": {
793                        "NestedField": "NestedValue"
794                    }
795                }
796            ]
797        });
798
799        let mut data_logic = DataLogic::with_preserve_structure();
800        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
801
802        assert!(result.is_ok());
803        let expected = json!({
804            "Parent": {
805                "Child": {
806                    "Field1": "Value1",
807                    "Field2": "Value2",
808                    "NestedObject": {
809                        "NestedField": "NestedValue"
810                    }
811                }
812            }
813        });
814        assert_eq!(message.data, expected);
815    }
816
817    #[tokio::test]
818    async fn test_multiple_mappings_with_dependencies() {
819        let map_fn = MapFunction::new();
820
821        // Test that later mappings can use values set by earlier mappings
822        let mut message = Message::new(&json!({}));
823        message.data = json!({});
824
825        let input = json!({
826            "mappings": [
827                {
828                    "path": "data.config.database.host",
829                    "logic": "localhost"
830                },
831                {
832                    "path": "data.config.database.port",
833                    "logic": 5432
834                },
835                {
836                    // This mapping references the previously set values
837                    "path": "data.config.connectionString",
838                    "logic": {
839                        "cat": [
840                            "postgresql://",
841                            {"var": "data.config.database.host"},
842                            ":",
843                            {"var": "data.config.database.port"}
844                        ]
845                    }
846                }
847            ]
848        });
849
850        let mut data_logic = DataLogic::with_preserve_structure();
851        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
852
853        assert!(result.is_ok());
854        let expected = json!({
855            "config": {
856                "database": {
857                    "host": "localhost",
858                    "port": 5432
859                },
860                "connectionString": "postgresql://localhost:5432"
861            }
862        });
863        assert_eq!(message.data, expected);
864    }
865
866    #[tokio::test]
867    async fn test_nested_path_after_parent_mapping() {
868        let map_fn = MapFunction::new();
869
870        // Test complex scenario: map to parent object, then add nested fields
871        let mut message = Message::new(&json!({}));
872        message.data = json!({});
873
874        let input = json!({
875            "mappings": [
876                {
877                    "path": "data.transaction",
878                    "logic": {
879                        "id": "TX-001",
880                        "amount": 1000,
881                        "currency": "USD"
882                    }
883                },
884                {
885                    "path": "data.transaction.metadata",
886                    "logic": {
887                        "timestamp": "2024-01-01T12:00:00Z",
888                        "source": "API"
889                    }
890                },
891                {
892                    "path": "data.transaction.metadata.tags",
893                    "logic": ["urgent", "international"]
894                }
895            ]
896        });
897
898        let mut data_logic = DataLogic::with_preserve_structure();
899        let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
900
901        assert!(result.is_ok());
902        let expected = json!({
903            "transaction": {
904                "id": "TX-001",
905                "amount": 1000,
906                "currency": "USD",
907                "metadata": {
908                    "timestamp": "2024-01-01T12:00:00Z",
909                    "source": "API",
910                    "tags": ["urgent", "international"]
911                }
912            }
913        });
914        assert_eq!(message.data, expected);
915    }
916}