dataflow_rs/engine/functions/
map.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use log::error;
7use serde_json::{json, Value};
8
9/// A mapping function that transforms data using JSONLogic expressions.
10///
11/// This function allows mapping data from one location to another within
12/// a message, applying transformations using JSONLogic expressions.
13pub struct MapFunction {
14    // No longer needs data_logic field
15}
16
17impl Default for MapFunction {
18    fn default() -> Self {
19        Self::new()
20    }
21}
22
23impl MapFunction {
24    /// Create a new MapFunction
25    pub fn new() -> Self {
26        Self {}
27    }
28
29    /// Set a value at the specified path within the target object
30    fn set_value_at_path(&self, target: &mut Value, path: &str, value: &Value) -> Result<Value> {
31        let mut current = target;
32        let mut old_value = Value::Null;
33        let path_parts: Vec<&str> = path.split('.').collect();
34
35        // Helper function to check if a string is a valid array index
36        fn is_numeric_index(s: &str) -> bool {
37            s.parse::<usize>().is_ok()
38        }
39
40        // Navigate to the parent of the target location
41        for (i, part) in path_parts.iter().enumerate() {
42            let is_numeric = is_numeric_index(part);
43
44            if i == path_parts.len() - 1 {
45                // We're at the last part, so set the value
46                if is_numeric {
47                    // Handle numeric index - ensure current is an array
48                    if !current.is_array() {
49                        // Convert to array if it's not already
50                        *current = Value::Array(vec![]);
51                    }
52
53                    if let Ok(index) = part.parse::<usize>() {
54                        if let Value::Array(arr) = current {
55                            // Extend array if needed
56                            while arr.len() <= index {
57                                arr.push(Value::Null);
58                            }
59                            // Save old value
60                            old_value = arr[index].clone();
61                            arr[index] = value.clone();
62                        }
63                    } else {
64                        error!("Invalid array index: {part}");
65                        return Err(DataflowError::Validation(format!(
66                            "Invalid array index: {part}"
67                        )));
68                    }
69                } else {
70                    // Handle object property
71                    if !current.is_object() {
72                        // Convert to object if it's not already
73                        *current = Value::Object(serde_json::Map::new());
74                    }
75
76                    if let Value::Object(map) = current {
77                        // Save the old value before replacing
78                        let mut key = part.to_string();
79                        if key.starts_with("#") {
80                            key = key.strip_prefix("#").unwrap_or(&key).to_string();
81                        }
82                        old_value = map.get(&key).cloned().unwrap_or(Value::Null);
83                        // Merge objects if both old and new values are objects
84                        let value_to_insert = if old_value.is_object() && value.is_object() {
85                            let mut merged_map = old_value.as_object().unwrap().clone();
86                            if let Some(new_map) = value.as_object() {
87                                // New values override old values
88                                for (k, v) in new_map {
89                                    merged_map.insert(k.clone(), v.clone());
90                                }
91                            }
92                            Value::Object(merged_map)
93                        } else {
94                            value.clone()
95                        };
96                        map.insert(key, value_to_insert);
97                    }
98                }
99            } else {
100                // We need to navigate deeper
101                if is_numeric {
102                    // Handle numeric index - ensure current is an array
103                    if !current.is_array() {
104                        *current = Value::Array(vec![]);
105                    }
106
107                    if let Ok(index) = part.parse::<usize>() {
108                        if let Value::Array(arr) = current {
109                            // Extend array if needed
110                            while arr.len() <= index {
111                                arr.push(Value::Null);
112                            }
113                            // Ensure the indexed element exists and is ready for further navigation
114                            if arr[index].is_null() {
115                                // Look ahead to see if next part is numeric to decide what to create
116                                let next_part = path_parts.get(i + 1).unwrap_or(&"");
117                                if is_numeric_index(next_part) {
118                                    arr[index] = Value::Array(vec![]);
119                                } else {
120                                    arr[index] = json!({});
121                                }
122                            }
123                            current = &mut arr[index];
124                        }
125                    } else {
126                        error!("Invalid array index: {part}");
127                        return Err(DataflowError::Validation(format!(
128                            "Invalid array index: {part}"
129                        )));
130                    }
131                } else {
132                    // Handle object property
133                    if !current.is_object() {
134                        *current = Value::Object(serde_json::Map::new());
135                    }
136
137                    if let Value::Object(map) = current {
138                        let mut key = part.to_string();
139                        if key.starts_with("#") {
140                            key = key.strip_prefix("#").unwrap_or(&key).to_string();
141                        }
142                        if !map.contains_key(&key) {
143                            // Look ahead to see if next part is numeric to decide what to create
144                            let next_part = path_parts.get(i + 1).unwrap_or(&"");
145                            if is_numeric_index(next_part) {
146                                map.insert(part.to_string(), Value::Array(vec![]));
147                            } else {
148                                map.insert(key.clone(), json!({}));
149                            }
150                        }
151                        current = map.get_mut(&key).unwrap();
152                    }
153                }
154            }
155        }
156
157        Ok(old_value)
158    }
159}
160
161#[async_trait]
162impl AsyncFunctionHandler for MapFunction {
163    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
164        // Extract mappings array from input
165        let mappings = input.get("mappings").ok_or_else(|| {
166            DataflowError::Validation("Missing 'mappings' array in input".to_string())
167        })?;
168
169        let mappings_arr = mappings
170            .as_array()
171            .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
172
173        let mut changes = Vec::new();
174
175        // Process each mapping
176        for mapping in mappings_arr {
177            // Get path where to store the result
178            let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
179                DataflowError::Validation("Missing 'path' in mapping".to_string())
180            })?;
181
182            // Get the logic to evaluate
183            let logic = mapping.get("logic").ok_or_else(|| {
184                DataflowError::Validation("Missing 'logic' in mapping".to_string())
185            })?;
186
187            // Clone message data for evaluation context
188            let data_clone = message.data.clone();
189            let metadata_clone = message.metadata.clone();
190            let temp_data_clone = message.temp_data.clone();
191
192            // Create a combined data object with message fields for evaluation
193            let data_for_eval = json!({
194                "data": data_clone,
195                "metadata": metadata_clone,
196                "temp_data": temp_data_clone,
197            });
198
199            // Determine target object based on path prefix
200            let (target_object, adjusted_path) =
201                if let Some(path) = target_path.strip_prefix("data.") {
202                    (&mut message.data, path)
203                } else if let Some(path) = target_path.strip_prefix("metadata.") {
204                    (&mut message.metadata, path)
205                } else if let Some(path) = target_path.strip_prefix("temp_data.") {
206                    (&mut message.temp_data, path)
207                } else if target_path == "data" {
208                    (&mut message.data, "")
209                } else if target_path == "metadata" {
210                    (&mut message.metadata, "")
211                } else if target_path == "temp_data" {
212                    (&mut message.temp_data, "")
213                } else {
214                    // Default to data
215                    (&mut message.data, target_path)
216                };
217
218            // Evaluate the logic using thread-local DataLogic
219            let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
220                let mut data_logic = data_logic_cell.borrow_mut();
221                data_logic.reset_arena();
222
223                data_logic
224                    .evaluate_json(logic, &data_for_eval, None)
225                    .map_err(|e| {
226                        error!("Failed to evaluate logic: {e} for {logic}, {data_for_eval}");
227                        DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {e}"))
228                    })
229            })?;
230
231            if result.is_null() {
232                continue;
233            }
234
235            // Set the result at the target path
236            if adjusted_path.is_empty() {
237                // Replace the entire object
238                let old_value = if target_object.is_object() && result.is_object() {
239                    let mut merged_map = target_object.as_object().unwrap().clone();
240                    if let Some(new_map) = result.as_object() {
241                        // New values override old values
242                        for (k, v) in new_map {
243                            merged_map.insert(k.clone(), v.clone());
244                        }
245                    }
246                    std::mem::replace(target_object, Value::Object(merged_map))
247                } else {
248                    std::mem::replace(target_object, result.clone())
249                };
250                changes.push(Change {
251                    path: target_path.to_string(),
252                    old_value,
253                    new_value: result,
254                });
255            } else {
256                // Set a specific path
257                let old_value = self.set_value_at_path(target_object, adjusted_path, &result)?;
258                changes.push(Change {
259                    path: target_path.to_string(),
260                    old_value,
261                    new_value: result,
262                });
263            }
264        }
265
266        Ok((200, changes))
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use crate::engine::message::Message;
274    use serde_json::json;
275
276    #[tokio::test]
277    async fn test_array_notation_simple() {
278        let map_fn = MapFunction::new();
279
280        // Test simple array notation: data.items.0.name
281        let mut message = Message::new(&json!({}));
282        message.data = json!({});
283
284        let input = json!({
285            "mappings": [
286                {
287                    "path": "data.items.0.name",
288                    "logic": "Test Item"
289                }
290            ]
291        });
292
293        let result = map_fn.execute(&mut message, &input).await;
294
295        assert!(result.is_ok());
296        let expected = json!({
297            "items": [
298                {
299                    "name": "Test Item"
300                }
301            ]
302        });
303        assert_eq!(message.data, expected);
304    }
305
306    #[tokio::test]
307    async fn test_array_notation_complex_path() {
308        let map_fn = MapFunction::new();
309
310        // Test complex path like the original example: data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId
311        let mut message = Message::new(&json!({}));
312        message.data = json!({});
313
314        let input = json!({
315            "mappings": [
316                {
317                    "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
318                    "logic": "INSTR123"
319                }
320            ]
321        });
322
323        let result = map_fn.execute(&mut message, &input).await;
324
325        assert!(result.is_ok());
326        let expected = json!({
327            "MX": {
328                "FIToFICstmrCdtTrf": {
329                    "CdtTrfTxInf": [
330                        {
331                            "PmtId": {
332                                "InstrId": "INSTR123"
333                            }
334                        }
335                    ]
336                }
337            }
338        });
339        assert_eq!(message.data, expected);
340    }
341
342    #[tokio::test]
343    async fn test_multiple_array_indices() {
344        let map_fn = MapFunction::new();
345
346        // Test multiple array indices in the same path: data.matrix.0.1.value
347        let mut message = Message::new(&json!({}));
348        message.data = json!({});
349
350        let input = json!({
351            "mappings": [
352                {
353                    "path": "data.matrix.0.1.value",
354                    "logic": "cell_01"
355                },
356                {
357                    "path": "data.matrix.1.0.value",
358                    "logic": "cell_10"
359                }
360            ]
361        });
362
363        let result = map_fn.execute(&mut message, &input).await;
364
365        assert!(result.is_ok());
366        let expected = json!({
367            "matrix": [
368                [
369                    null,
370                    {
371                        "value": "cell_01"
372                    }
373                ],
374                [
375                    {
376                        "value": "cell_10"
377                    }
378                ]
379            ]
380        });
381        assert_eq!(message.data, expected);
382    }
383
384    #[tokio::test]
385    async fn test_array_extension() {
386        let map_fn = MapFunction::new();
387
388        // Test that arrays are extended when accessing high indices
389        let mut message = Message::new(&json!({}));
390        message.data = json!({});
391
392        let input = json!({
393            "mappings": [
394                {
395                    "path": "data.items.5.name",
396                    "logic": "Item at index 5"
397                }
398            ]
399        });
400
401        let result = map_fn.execute(&mut message, &input).await;
402
403        assert!(result.is_ok());
404
405        // Should create an array with 6 elements (indices 0-5)
406        assert!(message.data["items"].is_array());
407        let items_array = message.data["items"].as_array().unwrap();
408        assert_eq!(items_array.len(), 6);
409
410        // First 5 elements should be null
411        for i in 0..5 {
412            assert_eq!(items_array[i], json!(null));
413        }
414
415        // Element at index 5 should have our value
416        assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
417    }
418
419    #[tokio::test]
420    async fn test_mixed_array_and_object_notation() {
421        let map_fn = MapFunction::new();
422
423        // Test mixing array and object notation: data.users.0.profile.addresses.1.city
424        let mut message = Message::new(&json!({}));
425        message.data = json!({});
426
427        let input = json!({
428            "mappings": [
429                {
430                    "path": "data.users.0.profile.addresses.1.city",
431                    "logic": "New York"
432                },
433                {
434                    "path": "data.users.0.profile.name",
435                    "logic": "John Doe"
436                }
437            ]
438        });
439
440        let result = map_fn.execute(&mut message, &input).await;
441
442        assert!(result.is_ok());
443        let expected = json!({
444            "users": [
445                {
446                    "profile": {
447                        "name": "John Doe",
448                        "addresses": [
449                            null,
450                            {
451                                "city": "New York"
452                            }
453                        ]
454                    }
455                }
456            ]
457        });
458        assert_eq!(message.data, expected);
459    }
460
461    #[tokio::test]
462    async fn test_overwrite_existing_value() {
463        let map_fn = MapFunction::new();
464
465        // Test overwriting an existing value in an array
466        let mut message = Message::new(&json!({}));
467        message.data = json!({
468            "items": [
469                {"name": "Old Value"},
470                {"name": "Another Item"}
471            ]
472        });
473
474        let input = json!({
475            "mappings": [
476                {
477                    "path": "data.items.0.name",
478                    "logic": "New Value"
479                }
480            ]
481        });
482
483        let result = map_fn.execute(&mut message, &input).await;
484
485        assert!(result.is_ok());
486        let expected = json!({
487            "items": [
488                {"name": "New Value"},
489                {"name": "Another Item"}
490            ]
491        });
492        assert_eq!(message.data, expected);
493
494        // Check that changes are recorded
495        let (_, changes) = result.unwrap();
496        assert_eq!(changes.len(), 1);
497        assert_eq!(changes[0].path, "data.items.0.name");
498        assert_eq!(changes[0].old_value, json!("Old Value"));
499        assert_eq!(changes[0].new_value, json!("New Value"));
500    }
501
502    #[tokio::test]
503    async fn test_array_notation_with_jsonlogic() {
504        let map_fn = MapFunction::new();
505
506        // Test array notation with JSONLogic expressions
507        let mut message = Message::new(&json!({}));
508        message.temp_data = json!({
509            "transactions": [
510                {"id": "tx1", "amount": 100},
511                {"id": "tx2", "amount": 200}
512            ]
513        });
514        message.data = json!({});
515
516        let input = json!({
517            "mappings": [
518                {
519                    "path": "data.processed.0.transaction_id",
520                    "logic": {"var": "temp_data.transactions.0.id"}
521                },
522                {
523                    "path": "data.processed.0.amount_cents",
524                    "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
525                }
526            ]
527        });
528
529        let result = map_fn.execute(&mut message, &input).await;
530
531        assert!(result.is_ok());
532        let expected = json!({
533            "processed": [
534                {
535                    "transaction_id": "tx1",
536                    "amount_cents": 10000
537                }
538            ]
539        });
540        assert_eq!(message.data, expected);
541    }
542
543    #[tokio::test]
544    async fn test_convert_object_to_array() {
545        let map_fn = MapFunction::new();
546
547        // Test converting an existing object to an array when numeric index is encountered
548        let mut message = Message::new(&json!({}));
549        message.data = json!({
550            "items": {
551                "existing_key": "existing_value"
552            }
553        });
554
555        let input = json!({
556            "mappings": [
557                {
558                    "path": "data.items.0.new_value",
559                    "logic": "array_item"
560                }
561            ]
562        });
563
564        let result = map_fn.execute(&mut message, &input).await;
565
566        assert!(result.is_ok());
567        // The object should be converted to an array
568        assert!(message.data["items"].is_array());
569        let expected = json!({
570            "items": [
571                {
572                    "new_value": "array_item"
573                }
574            ]
575        });
576        assert_eq!(message.data, expected);
577    }
578
579    #[tokio::test]
580    async fn test_non_numeric_index_handling() {
581        let map_fn = MapFunction::new();
582
583        // Test that non-numeric strings are treated as object keys, not array indices
584        let mut message = Message::new(&json!({}));
585        message.data = json!({});
586
587        let input = json!({
588            "mappings": [
589                {
590                    "path": "data.items.invalid_index.name",
591                    "logic": "test"
592                }
593            ]
594        });
595
596        let result = map_fn.execute(&mut message, &input).await;
597
598        // This should succeed and create an object structure
599        assert!(result.is_ok());
600        let expected = json!({
601            "items": {
602                "invalid_index": {
603                    "name": "test"
604                }
605            }
606        });
607        assert_eq!(message.data, expected);
608
609        // Verify that "invalid_index" was treated as an object key, not array index
610        assert!(message.data["items"].is_object());
611        assert!(!message.data["items"].is_array());
612    }
613
614    #[tokio::test]
615    async fn test_object_merge_on_mapping() {
616        let map_fn = MapFunction::new();
617
618        // Test that when mapping to an existing object, the values are merged
619        let mut message = Message::new(&json!({}));
620        message.data = json!({
621            "config": {
622                "database": {
623                    "host": "localhost",
624                    "port": 5432,
625                    "username": "admin"
626                }
627            }
628        });
629
630        // First mapping - add new fields to existing object
631        let input = json!({
632            "mappings": [
633                {
634                    "path": "data.config.database",
635                    "logic": {
636                        "password": "secret123",
637                        "ssl": true,
638                        "port": 5433  // This should override the existing port
639                    }
640                }
641            ]
642        });
643
644        let result = map_fn.execute(&mut message, &input).await;
645
646        assert!(result.is_ok());
647        let expected = json!({
648            "config": {
649                "database": {
650                    "host": "localhost",
651                    "port": 5433,  // Updated
652                    "username": "admin",
653                    "password": "secret123",  // Added
654                    "ssl": true  // Added
655                }
656            }
657        });
658        assert_eq!(message.data, expected);
659
660        // Check that changes are recorded correctly
661        let (_, changes) = result.unwrap();
662        assert_eq!(changes.len(), 1);
663        assert_eq!(changes[0].path, "data.config.database");
664        assert_eq!(
665            changes[0].old_value,
666            json!({
667                "host": "localhost",
668                "port": 5432,
669                "username": "admin"
670            })
671        );
672        assert_eq!(
673            changes[0].new_value,
674            json!({
675                "password": "secret123",
676                "ssl": true,
677                "port": 5433
678            })
679        );
680    }
681
682    #[tokio::test]
683    async fn test_object_merge_with_nested_path() {
684        let map_fn = MapFunction::new();
685
686        // Test object merging with a nested path
687        let mut message = Message::new(&json!({}));
688        message.data = json!({
689            "user": {
690                "profile": {
691                    "name": "John Doe",
692                    "age": 30
693                }
694            }
695        });
696
697        let input = json!({
698            "mappings": [
699                {
700                    "path": "data.user.profile",
701                    "logic": {
702                        "email": "john@example.com",
703                        "age": 31,  // Override
704                        "city": "New York"
705                    }
706                }
707            ]
708        });
709
710        let result = map_fn.execute(&mut message, &input).await;
711
712        assert!(result.is_ok());
713        let expected = json!({
714            "user": {
715                "profile": {
716                    "name": "John Doe",  // Preserved
717                    "age": 31,  // Updated
718                    "email": "john@example.com",  // Added
719                    "city": "New York"  // Added
720                }
721            }
722        });
723        assert_eq!(message.data, expected);
724    }
725
726    #[tokio::test]
727    async fn test_non_object_replacement() {
728        let map_fn = MapFunction::new();
729
730        // Test that non-object values are replaced, not merged
731        let mut message = Message::new(&json!({}));
732        message.data = json!({
733            "settings": {
734                "value": "old_string"
735            }
736        });
737
738        let input = json!({
739            "mappings": [
740                {
741                    "path": "data.settings.value",
742                    "logic": {"new": "object"}
743                }
744            ]
745        });
746
747        let result = map_fn.execute(&mut message, &input).await;
748
749        assert!(result.is_ok());
750        // String should be replaced with object, not merged
751        let expected = json!({
752            "settings": {
753                "value": {"new": "object"}
754            }
755        });
756        assert_eq!(message.data, expected);
757    }
758}