dataflow_rs/engine/functions/
map.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use log::error;
7use serde_json::{json, Value};
8
9/// A mapping function that transforms data using JSONLogic expressions.
10///
11/// This function allows mapping data from one location to another within
12/// a message, applying transformations using JSONLogic expressions.
13pub struct MapFunction {
14    // No longer needs data_logic field
15}
16
17impl Default for MapFunction {
18    fn default() -> Self {
19        Self::new()
20    }
21}
22
23impl MapFunction {
24    /// Create a new MapFunction
25    pub fn new() -> Self {
26        Self {}
27    }
28
29    /// Set a value at the specified path within the target object
30    fn set_value_at_path(&self, target: &mut Value, path: &str, value: Value) -> Result<Value> {
31        let mut current = target;
32        let mut old_value = Value::Null;
33        let path_parts: Vec<&str> = path.split('.').collect();
34
35        // Helper function to check if a string is a valid array index
36        fn is_numeric_index(s: &str) -> bool {
37            s.parse::<usize>().is_ok()
38        }
39
40        // Navigate to the parent of the target location
41        for (i, part) in path_parts.iter().enumerate() {
42            let is_numeric = is_numeric_index(part);
43
44            if i == path_parts.len() - 1 {
45                // We're at the last part, so set the value
46                if is_numeric {
47                    // Handle numeric index - ensure current is an array
48                    if !current.is_array() {
49                        // Convert to array if it's not already
50                        *current = Value::Array(vec![]);
51                    }
52
53                    if let Ok(index) = part.parse::<usize>() {
54                        if let Value::Array(arr) = current {
55                            // Extend array if needed
56                            while arr.len() <= index {
57                                arr.push(Value::Null);
58                            }
59                            // Save old value
60                            old_value = arr[index].clone();
61                            arr[index] = value.clone();
62                        }
63                    } else {
64                        error!("Invalid array index: {part}");
65                        return Err(DataflowError::Validation(format!(
66                            "Invalid array index: {part}"
67                        )));
68                    }
69                } else {
70                    // Handle object property
71                    if !current.is_object() {
72                        // Convert to object if it's not already
73                        *current = Value::Object(serde_json::Map::new());
74                    }
75
76                    if let Value::Object(map) = current {
77                        // Save the old value before replacing
78                        let mut key = part.to_string();
79                        if key.starts_with("#") {
80                            key = key.strip_prefix("#").unwrap_or(&key).to_string();
81                        }
82                        old_value = map.get(&key).cloned().unwrap_or(Value::Null);
83                        map.insert(key, value.clone());
84                    }
85                }
86            } else {
87                // We need to navigate deeper
88                if is_numeric {
89                    // Handle numeric index - ensure current is an array
90                    if !current.is_array() {
91                        *current = Value::Array(vec![]);
92                    }
93
94                    if let Ok(index) = part.parse::<usize>() {
95                        if let Value::Array(arr) = current {
96                            // Extend array if needed
97                            while arr.len() <= index {
98                                arr.push(Value::Null);
99                            }
100                            // Ensure the indexed element exists and is ready for further navigation
101                            if arr[index].is_null() {
102                                // Look ahead to see if next part is numeric to decide what to create
103                                let next_part = path_parts.get(i + 1).unwrap_or(&"");
104                                if is_numeric_index(next_part) {
105                                    arr[index] = Value::Array(vec![]);
106                                } else {
107                                    arr[index] = json!({});
108                                }
109                            }
110                            current = &mut arr[index];
111                        }
112                    } else {
113                        error!("Invalid array index: {part}");
114                        return Err(DataflowError::Validation(format!(
115                            "Invalid array index: {part}"
116                        )));
117                    }
118                } else {
119                    // Handle object property
120                    if !current.is_object() {
121                        *current = Value::Object(serde_json::Map::new());
122                    }
123
124                    if let Value::Object(map) = current {
125                        let mut key = part.to_string();
126                        if key.starts_with("#") {
127                            key = key.strip_prefix("#").unwrap_or(&key).to_string();
128                        }
129                        if !map.contains_key(&key) {
130                            // Look ahead to see if next part is numeric to decide what to create
131                            let next_part = path_parts.get(i + 1).unwrap_or(&"");
132                            if is_numeric_index(next_part) {
133                                map.insert(part.to_string(), Value::Array(vec![]));
134                            } else {
135                                map.insert(key.clone(), json!({}));
136                            }
137                        }
138                        current = map.get_mut(&key).unwrap();
139                    }
140                }
141            }
142        }
143
144        Ok(old_value)
145    }
146}
147
148#[async_trait]
149impl AsyncFunctionHandler for MapFunction {
150    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
151        // Extract mappings array from input
152        let mappings = input.get("mappings").ok_or_else(|| {
153            DataflowError::Validation("Missing 'mappings' array in input".to_string())
154        })?;
155
156        let mappings_arr = mappings
157            .as_array()
158            .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
159
160        let mut changes = Vec::new();
161
162        // Process each mapping
163        for mapping in mappings_arr {
164            // Get path where to store the result
165            let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
166                DataflowError::Validation("Missing 'path' in mapping".to_string())
167            })?;
168
169            // Get the logic to evaluate
170            let logic = mapping.get("logic").ok_or_else(|| {
171                DataflowError::Validation("Missing 'logic' in mapping".to_string())
172            })?;
173
174            // Clone message data for evaluation context
175            let data_clone = message.data.clone();
176            let metadata_clone = message.metadata.clone();
177            let temp_data_clone = message.temp_data.clone();
178
179            // Create a combined data object with message fields for evaluation
180            let data_for_eval = json!({
181                "data": data_clone,
182                "metadata": metadata_clone,
183                "temp_data": temp_data_clone,
184            });
185
186            // Determine target object based on path prefix
187            let (target_object, adjusted_path) =
188                if let Some(path) = target_path.strip_prefix("data.") {
189                    (&mut message.data, path)
190                } else if let Some(path) = target_path.strip_prefix("metadata.") {
191                    (&mut message.metadata, path)
192                } else if let Some(path) = target_path.strip_prefix("temp_data.") {
193                    (&mut message.temp_data, path)
194                } else if target_path == "data" {
195                    (&mut message.data, "")
196                } else if target_path == "metadata" {
197                    (&mut message.metadata, "")
198                } else if target_path == "temp_data" {
199                    (&mut message.temp_data, "")
200                } else {
201                    // Default to data
202                    (&mut message.data, target_path)
203                };
204
205            // Evaluate the logic using thread-local DataLogic
206            let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
207                let mut data_logic = data_logic_cell.borrow_mut();
208                data_logic.reset_arena();
209
210                data_logic
211                    .evaluate_json(logic, &data_for_eval, None)
212                    .map_err(|e| {
213                        error!("Failed to evaluate logic: {e} for {logic}, {data_for_eval}");
214                        DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {e}"))
215                    })
216            })?;
217
218            if result.is_null() {
219                continue;
220            }
221
222            // Set the result at the target path
223            if adjusted_path.is_empty() {
224                // Replace the entire object
225                let old_value = std::mem::replace(target_object, result.clone());
226                changes.push(Change {
227                    path: target_path.to_string(),
228                    old_value,
229                    new_value: result,
230                });
231            } else {
232                // Set a specific path
233                let old_value =
234                    self.set_value_at_path(target_object, adjusted_path, result.clone())?;
235                changes.push(Change {
236                    path: target_path.to_string(),
237                    old_value,
238                    new_value: result,
239                });
240            }
241        }
242
243        Ok((200, changes))
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use crate::engine::message::Message;
251    use serde_json::json;
252
253    #[tokio::test]
254    async fn test_array_notation_simple() {
255        let map_fn = MapFunction::new();
256
257        // Test simple array notation: data.items.0.name
258        let mut message = Message::new(&json!({}));
259        message.data = json!({});
260
261        let input = json!({
262            "mappings": [
263                {
264                    "path": "data.items.0.name",
265                    "logic": "Test Item"
266                }
267            ]
268        });
269
270        let result = map_fn.execute(&mut message, &input).await;
271
272        assert!(result.is_ok());
273        let expected = json!({
274            "items": [
275                {
276                    "name": "Test Item"
277                }
278            ]
279        });
280        assert_eq!(message.data, expected);
281    }
282
283    #[tokio::test]
284    async fn test_array_notation_complex_path() {
285        let map_fn = MapFunction::new();
286
287        // Test complex path like the original example: data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId
288        let mut message = Message::new(&json!({}));
289        message.data = json!({});
290
291        let input = json!({
292            "mappings": [
293                {
294                    "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
295                    "logic": "INSTR123"
296                }
297            ]
298        });
299
300        let result = map_fn.execute(&mut message, &input).await;
301
302        assert!(result.is_ok());
303        let expected = json!({
304            "MX": {
305                "FIToFICstmrCdtTrf": {
306                    "CdtTrfTxInf": [
307                        {
308                            "PmtId": {
309                                "InstrId": "INSTR123"
310                            }
311                        }
312                    ]
313                }
314            }
315        });
316        assert_eq!(message.data, expected);
317    }
318
319    #[tokio::test]
320    async fn test_multiple_array_indices() {
321        let map_fn = MapFunction::new();
322
323        // Test multiple array indices in the same path: data.matrix.0.1.value
324        let mut message = Message::new(&json!({}));
325        message.data = json!({});
326
327        let input = json!({
328            "mappings": [
329                {
330                    "path": "data.matrix.0.1.value",
331                    "logic": "cell_01"
332                },
333                {
334                    "path": "data.matrix.1.0.value",
335                    "logic": "cell_10"
336                }
337            ]
338        });
339
340        let result = map_fn.execute(&mut message, &input).await;
341
342        assert!(result.is_ok());
343        let expected = json!({
344            "matrix": [
345                [
346                    null,
347                    {
348                        "value": "cell_01"
349                    }
350                ],
351                [
352                    {
353                        "value": "cell_10"
354                    }
355                ]
356            ]
357        });
358        assert_eq!(message.data, expected);
359    }
360
361    #[tokio::test]
362    async fn test_array_extension() {
363        let map_fn = MapFunction::new();
364
365        // Test that arrays are extended when accessing high indices
366        let mut message = Message::new(&json!({}));
367        message.data = json!({});
368
369        let input = json!({
370            "mappings": [
371                {
372                    "path": "data.items.5.name",
373                    "logic": "Item at index 5"
374                }
375            ]
376        });
377
378        let result = map_fn.execute(&mut message, &input).await;
379
380        assert!(result.is_ok());
381
382        // Should create an array with 6 elements (indices 0-5)
383        assert!(message.data["items"].is_array());
384        let items_array = message.data["items"].as_array().unwrap();
385        assert_eq!(items_array.len(), 6);
386
387        // First 5 elements should be null
388        for i in 0..5 {
389            assert_eq!(items_array[i], json!(null));
390        }
391
392        // Element at index 5 should have our value
393        assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
394    }
395
396    #[tokio::test]
397    async fn test_mixed_array_and_object_notation() {
398        let map_fn = MapFunction::new();
399
400        // Test mixing array and object notation: data.users.0.profile.addresses.1.city
401        let mut message = Message::new(&json!({}));
402        message.data = json!({});
403
404        let input = json!({
405            "mappings": [
406                {
407                    "path": "data.users.0.profile.addresses.1.city",
408                    "logic": "New York"
409                },
410                {
411                    "path": "data.users.0.profile.name",
412                    "logic": "John Doe"
413                }
414            ]
415        });
416
417        let result = map_fn.execute(&mut message, &input).await;
418
419        assert!(result.is_ok());
420        let expected = json!({
421            "users": [
422                {
423                    "profile": {
424                        "name": "John Doe",
425                        "addresses": [
426                            null,
427                            {
428                                "city": "New York"
429                            }
430                        ]
431                    }
432                }
433            ]
434        });
435        assert_eq!(message.data, expected);
436    }
437
438    #[tokio::test]
439    async fn test_overwrite_existing_value() {
440        let map_fn = MapFunction::new();
441
442        // Test overwriting an existing value in an array
443        let mut message = Message::new(&json!({}));
444        message.data = json!({
445            "items": [
446                {"name": "Old Value"},
447                {"name": "Another Item"}
448            ]
449        });
450
451        let input = json!({
452            "mappings": [
453                {
454                    "path": "data.items.0.name",
455                    "logic": "New Value"
456                }
457            ]
458        });
459
460        let result = map_fn.execute(&mut message, &input).await;
461
462        assert!(result.is_ok());
463        let expected = json!({
464            "items": [
465                {"name": "New Value"},
466                {"name": "Another Item"}
467            ]
468        });
469        assert_eq!(message.data, expected);
470
471        // Check that changes are recorded
472        let (_, changes) = result.unwrap();
473        assert_eq!(changes.len(), 1);
474        assert_eq!(changes[0].path, "data.items.0.name");
475        assert_eq!(changes[0].old_value, json!("Old Value"));
476        assert_eq!(changes[0].new_value, json!("New Value"));
477    }
478
479    #[tokio::test]
480    async fn test_array_notation_with_jsonlogic() {
481        let map_fn = MapFunction::new();
482
483        // Test array notation with JSONLogic expressions
484        let mut message = Message::new(&json!({}));
485        message.temp_data = json!({
486            "transactions": [
487                {"id": "tx1", "amount": 100},
488                {"id": "tx2", "amount": 200}
489            ]
490        });
491        message.data = json!({});
492
493        let input = json!({
494            "mappings": [
495                {
496                    "path": "data.processed.0.transaction_id",
497                    "logic": {"var": "temp_data.transactions.0.id"}
498                },
499                {
500                    "path": "data.processed.0.amount_cents",
501                    "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
502                }
503            ]
504        });
505
506        let result = map_fn.execute(&mut message, &input).await;
507
508        assert!(result.is_ok());
509        let expected = json!({
510            "processed": [
511                {
512                    "transaction_id": "tx1",
513                    "amount_cents": 10000
514                }
515            ]
516        });
517        assert_eq!(message.data, expected);
518    }
519
520    #[tokio::test]
521    async fn test_convert_object_to_array() {
522        let map_fn = MapFunction::new();
523
524        // Test converting an existing object to an array when numeric index is encountered
525        let mut message = Message::new(&json!({}));
526        message.data = json!({
527            "items": {
528                "existing_key": "existing_value"
529            }
530        });
531
532        let input = json!({
533            "mappings": [
534                {
535                    "path": "data.items.0.new_value",
536                    "logic": "array_item"
537                }
538            ]
539        });
540
541        let result = map_fn.execute(&mut message, &input).await;
542
543        assert!(result.is_ok());
544        // The object should be converted to an array
545        assert!(message.data["items"].is_array());
546        let expected = json!({
547            "items": [
548                {
549                    "new_value": "array_item"
550                }
551            ]
552        });
553        assert_eq!(message.data, expected);
554    }
555
556    #[tokio::test]
557    async fn test_non_numeric_index_handling() {
558        let map_fn = MapFunction::new();
559
560        // Test that non-numeric strings are treated as object keys, not array indices
561        let mut message = Message::new(&json!({}));
562        message.data = json!({});
563
564        let input = json!({
565            "mappings": [
566                {
567                    "path": "data.items.invalid_index.name",
568                    "logic": "test"
569                }
570            ]
571        });
572
573        let result = map_fn.execute(&mut message, &input).await;
574
575        // This should succeed and create an object structure
576        assert!(result.is_ok());
577        let expected = json!({
578            "items": {
579                "invalid_index": {
580                    "name": "test"
581                }
582            }
583        });
584        assert_eq!(message.data, expected);
585
586        // Verify that "invalid_index" was treated as an object key, not array index
587        assert!(message.data["items"].is_object());
588        assert!(!message.data["items"].is_array());
589    }
590}