dataflow_rs/engine/functions/
map.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use serde_json::{json, Value};
7
8/// A mapping function that transforms data using JSONLogic expressions.
9///
10/// This function allows mapping data from one location to another within
11/// a message, applying transformations using JSONLogic expressions.
12pub struct MapFunction {
13    // No longer needs data_logic field
14}
15
16impl Default for MapFunction {
17    fn default() -> Self {
18        Self::new()
19    }
20}
21
22impl MapFunction {
23    /// Create a new MapFunction
24    pub fn new() -> Self {
25        Self {}
26    }
27
28    /// Set a value at the specified path within the target object
29    fn set_value_at_path(&self, target: &mut Value, path: &str, value: Value) -> Result<Value> {
30        let mut current = target;
31        let mut old_value = Value::Null;
32        let path_parts: Vec<&str> = path.split('.').collect();
33
34        // Helper function to check if a string is a valid array index
35        fn is_numeric_index(s: &str) -> bool {
36            s.parse::<usize>().is_ok()
37        }
38
39        // Navigate to the parent of the target location
40        for (i, part) in path_parts.iter().enumerate() {
41            let is_numeric = is_numeric_index(part);
42
43            if i == path_parts.len() - 1 {
44                // We're at the last part, so set the value
45                if is_numeric {
46                    // Handle numeric index - ensure current is an array
47                    if !current.is_array() {
48                        // Convert to array if it's not already
49                        *current = Value::Array(vec![]);
50                    }
51
52                    if let Ok(index) = part.parse::<usize>() {
53                        if let Value::Array(arr) = current {
54                            // Extend array if needed
55                            while arr.len() <= index {
56                                arr.push(Value::Null);
57                            }
58                            // Save old value
59                            old_value = arr[index].clone();
60                            arr[index] = value.clone();
61                        }
62                    } else {
63                        return Err(DataflowError::Validation(format!(
64                            "Invalid array index: {}",
65                            part
66                        )));
67                    }
68                } else {
69                    // Handle object property
70                    if !current.is_object() {
71                        // Convert to object if it's not already
72                        *current = Value::Object(serde_json::Map::new());
73                    }
74
75                    if let Value::Object(map) = current {
76                        // Save the old value before replacing
77                        old_value = map.get(*part).cloned().unwrap_or(Value::Null);
78                        map.insert(part.to_string(), value.clone());
79                    }
80                }
81            } else {
82                // We need to navigate deeper
83                if is_numeric {
84                    // Handle numeric index - ensure current is an array
85                    if !current.is_array() {
86                        *current = Value::Array(vec![]);
87                    }
88
89                    if let Ok(index) = part.parse::<usize>() {
90                        if let Value::Array(arr) = current {
91                            // Extend array if needed
92                            while arr.len() <= index {
93                                arr.push(Value::Null);
94                            }
95                            // Ensure the indexed element exists and is ready for further navigation
96                            if arr[index].is_null() {
97                                // Look ahead to see if next part is numeric to decide what to create
98                                let next_part = path_parts.get(i + 1).unwrap_or(&"");
99                                if is_numeric_index(next_part) {
100                                    arr[index] = Value::Array(vec![]);
101                                } else {
102                                    arr[index] = json!({});
103                                }
104                            }
105                            current = &mut arr[index];
106                        }
107                    } else {
108                        return Err(DataflowError::Validation(format!(
109                            "Invalid array index: {}",
110                            part
111                        )));
112                    }
113                } else {
114                    // Handle object property
115                    if !current.is_object() {
116                        *current = Value::Object(serde_json::Map::new());
117                    }
118
119                    if let Value::Object(map) = current {
120                        if !map.contains_key(*part) {
121                            // Look ahead to see if next part is numeric to decide what to create
122                            let next_part = path_parts.get(i + 1).unwrap_or(&"");
123                            if is_numeric_index(next_part) {
124                                map.insert(part.to_string(), Value::Array(vec![]));
125                            } else {
126                                map.insert(part.to_string(), json!({}));
127                            }
128                        }
129                        current = map.get_mut(*part).unwrap();
130                    }
131                }
132            }
133        }
134
135        Ok(old_value)
136    }
137}
138
139#[async_trait]
140impl AsyncFunctionHandler for MapFunction {
141    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
142        // Extract mappings array from input
143        let mappings = input.get("mappings").ok_or_else(|| {
144            DataflowError::Validation("Missing 'mappings' array in input".to_string())
145        })?;
146
147        let mappings_arr = mappings
148            .as_array()
149            .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
150
151        let mut changes = Vec::new();
152
153        // Process each mapping
154        for mapping in mappings_arr {
155            // Get path where to store the result
156            let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
157                DataflowError::Validation("Missing 'path' in mapping".to_string())
158            })?;
159
160            // Get the logic to evaluate
161            let logic = mapping.get("logic").ok_or_else(|| {
162                DataflowError::Validation("Missing 'logic' in mapping".to_string())
163            })?;
164
165            // Clone message data for evaluation context
166            let data_clone = message.data.clone();
167            let metadata_clone = message.metadata.clone();
168            let temp_data_clone = message.temp_data.clone();
169
170            // Create a combined data object with message fields for evaluation
171            let data_for_eval = json!({
172                "data": data_clone,
173                "metadata": metadata_clone,
174                "temp_data": temp_data_clone,
175            });
176
177            // Determine target object based on path prefix
178            let (target_object, adjusted_path) =
179                if let Some(path) = target_path.strip_prefix("data.") {
180                    (&mut message.data, path)
181                } else if let Some(path) = target_path.strip_prefix("metadata.") {
182                    (&mut message.metadata, path)
183                } else if let Some(path) = target_path.strip_prefix("temp_data.") {
184                    (&mut message.temp_data, path)
185                } else if target_path == "data" {
186                    (&mut message.data, "")
187                } else if target_path == "metadata" {
188                    (&mut message.metadata, "")
189                } else if target_path == "temp_data" {
190                    (&mut message.temp_data, "")
191                } else {
192                    // Default to data
193                    (&mut message.data, target_path)
194                };
195
196            // Evaluate the logic using thread-local DataLogic
197            let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
198                let mut data_logic = data_logic_cell.borrow_mut();
199                data_logic.reset_arena();
200
201                data_logic
202                    .evaluate_json(logic, &data_for_eval, None)
203                    .map_err(|e| {
204                        DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {}", e))
205                    })
206            })?;
207
208            if result.is_null() {
209                continue;
210            }
211
212            // Set the result at the target path
213            if adjusted_path.is_empty() {
214                // Replace the entire object
215                let old_value = std::mem::replace(target_object, result.clone());
216                changes.push(Change {
217                    path: target_path.to_string(),
218                    old_value,
219                    new_value: result,
220                });
221            } else {
222                // Set a specific path
223                let old_value =
224                    self.set_value_at_path(target_object, adjusted_path, result.clone())?;
225                changes.push(Change {
226                    path: target_path.to_string(),
227                    old_value,
228                    new_value: result,
229                });
230            }
231        }
232
233        Ok((200, changes))
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use crate::engine::message::Message;
241    use serde_json::json;
242
243    #[tokio::test]
244    async fn test_array_notation_simple() {
245        let map_fn = MapFunction::new();
246
247        // Test simple array notation: data.items.0.name
248        let mut message = Message::new(&json!({}));
249        message.data = json!({});
250
251        let input = json!({
252            "mappings": [
253                {
254                    "path": "data.items.0.name",
255                    "logic": "Test Item"
256                }
257            ]
258        });
259
260        let result = map_fn.execute(&mut message, &input).await;
261
262        assert!(result.is_ok());
263        let expected = json!({
264            "items": [
265                {
266                    "name": "Test Item"
267                }
268            ]
269        });
270        assert_eq!(message.data, expected);
271    }
272
273    #[tokio::test]
274    async fn test_array_notation_complex_path() {
275        let map_fn = MapFunction::new();
276
277        // Test complex path like the original example: data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId
278        let mut message = Message::new(&json!({}));
279        message.data = json!({});
280
281        let input = json!({
282            "mappings": [
283                {
284                    "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
285                    "logic": "INSTR123"
286                }
287            ]
288        });
289
290        let result = map_fn.execute(&mut message, &input).await;
291
292        assert!(result.is_ok());
293        let expected = json!({
294            "MX": {
295                "FIToFICstmrCdtTrf": {
296                    "CdtTrfTxInf": [
297                        {
298                            "PmtId": {
299                                "InstrId": "INSTR123"
300                            }
301                        }
302                    ]
303                }
304            }
305        });
306        assert_eq!(message.data, expected);
307    }
308
309    #[tokio::test]
310    async fn test_multiple_array_indices() {
311        let map_fn = MapFunction::new();
312
313        // Test multiple array indices in the same path: data.matrix.0.1.value
314        let mut message = Message::new(&json!({}));
315        message.data = json!({});
316
317        let input = json!({
318            "mappings": [
319                {
320                    "path": "data.matrix.0.1.value",
321                    "logic": "cell_01"
322                },
323                {
324                    "path": "data.matrix.1.0.value",
325                    "logic": "cell_10"
326                }
327            ]
328        });
329
330        let result = map_fn.execute(&mut message, &input).await;
331
332        assert!(result.is_ok());
333        let expected = json!({
334            "matrix": [
335                [
336                    null,
337                    {
338                        "value": "cell_01"
339                    }
340                ],
341                [
342                    {
343                        "value": "cell_10"
344                    }
345                ]
346            ]
347        });
348        assert_eq!(message.data, expected);
349    }
350
351    #[tokio::test]
352    async fn test_array_extension() {
353        let map_fn = MapFunction::new();
354
355        // Test that arrays are extended when accessing high indices
356        let mut message = Message::new(&json!({}));
357        message.data = json!({});
358
359        let input = json!({
360            "mappings": [
361                {
362                    "path": "data.items.5.name",
363                    "logic": "Item at index 5"
364                }
365            ]
366        });
367
368        let result = map_fn.execute(&mut message, &input).await;
369
370        assert!(result.is_ok());
371
372        // Should create an array with 6 elements (indices 0-5)
373        assert!(message.data["items"].is_array());
374        let items_array = message.data["items"].as_array().unwrap();
375        assert_eq!(items_array.len(), 6);
376
377        // First 5 elements should be null
378        for i in 0..5 {
379            assert_eq!(items_array[i], json!(null));
380        }
381
382        // Element at index 5 should have our value
383        assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
384    }
385
386    #[tokio::test]
387    async fn test_mixed_array_and_object_notation() {
388        let map_fn = MapFunction::new();
389
390        // Test mixing array and object notation: data.users.0.profile.addresses.1.city
391        let mut message = Message::new(&json!({}));
392        message.data = json!({});
393
394        let input = json!({
395            "mappings": [
396                {
397                    "path": "data.users.0.profile.addresses.1.city",
398                    "logic": "New York"
399                },
400                {
401                    "path": "data.users.0.profile.name",
402                    "logic": "John Doe"
403                }
404            ]
405        });
406
407        let result = map_fn.execute(&mut message, &input).await;
408
409        assert!(result.is_ok());
410        let expected = json!({
411            "users": [
412                {
413                    "profile": {
414                        "name": "John Doe",
415                        "addresses": [
416                            null,
417                            {
418                                "city": "New York"
419                            }
420                        ]
421                    }
422                }
423            ]
424        });
425        assert_eq!(message.data, expected);
426    }
427
428    #[tokio::test]
429    async fn test_overwrite_existing_value() {
430        let map_fn = MapFunction::new();
431
432        // Test overwriting an existing value in an array
433        let mut message = Message::new(&json!({}));
434        message.data = json!({
435            "items": [
436                {"name": "Old Value"},
437                {"name": "Another Item"}
438            ]
439        });
440
441        let input = json!({
442            "mappings": [
443                {
444                    "path": "data.items.0.name",
445                    "logic": "New Value"
446                }
447            ]
448        });
449
450        let result = map_fn.execute(&mut message, &input).await;
451
452        assert!(result.is_ok());
453        let expected = json!({
454            "items": [
455                {"name": "New Value"},
456                {"name": "Another Item"}
457            ]
458        });
459        assert_eq!(message.data, expected);
460
461        // Check that changes are recorded
462        let (_, changes) = result.unwrap();
463        assert_eq!(changes.len(), 1);
464        assert_eq!(changes[0].path, "data.items.0.name");
465        assert_eq!(changes[0].old_value, json!("Old Value"));
466        assert_eq!(changes[0].new_value, json!("New Value"));
467    }
468
469    #[tokio::test]
470    async fn test_array_notation_with_jsonlogic() {
471        let map_fn = MapFunction::new();
472
473        // Test array notation with JSONLogic expressions
474        let mut message = Message::new(&json!({}));
475        message.temp_data = json!({
476            "transactions": [
477                {"id": "tx1", "amount": 100},
478                {"id": "tx2", "amount": 200}
479            ]
480        });
481        message.data = json!({});
482
483        let input = json!({
484            "mappings": [
485                {
486                    "path": "data.processed.0.transaction_id",
487                    "logic": {"var": "temp_data.transactions.0.id"}
488                },
489                {
490                    "path": "data.processed.0.amount_cents",
491                    "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
492                }
493            ]
494        });
495
496        let result = map_fn.execute(&mut message, &input).await;
497
498        assert!(result.is_ok());
499        let expected = json!({
500            "processed": [
501                {
502                    "transaction_id": "tx1",
503                    "amount_cents": 10000
504                }
505            ]
506        });
507        assert_eq!(message.data, expected);
508    }
509
510    #[tokio::test]
511    async fn test_convert_object_to_array() {
512        let map_fn = MapFunction::new();
513
514        // Test converting an existing object to an array when numeric index is encountered
515        let mut message = Message::new(&json!({}));
516        message.data = json!({
517            "items": {
518                "existing_key": "existing_value"
519            }
520        });
521
522        let input = json!({
523            "mappings": [
524                {
525                    "path": "data.items.0.new_value",
526                    "logic": "array_item"
527                }
528            ]
529        });
530
531        let result = map_fn.execute(&mut message, &input).await;
532
533        assert!(result.is_ok());
534        // The object should be converted to an array
535        assert!(message.data["items"].is_array());
536        let expected = json!({
537            "items": [
538                {
539                    "new_value": "array_item"
540                }
541            ]
542        });
543        assert_eq!(message.data, expected);
544    }
545
546    #[tokio::test]
547    async fn test_non_numeric_index_handling() {
548        let map_fn = MapFunction::new();
549
550        // Test that non-numeric strings are treated as object keys, not array indices
551        let mut message = Message::new(&json!({}));
552        message.data = json!({});
553
554        let input = json!({
555            "mappings": [
556                {
557                    "path": "data.items.invalid_index.name",
558                    "logic": "test"
559                }
560            ]
561        });
562
563        let result = map_fn.execute(&mut message, &input).await;
564
565        // This should succeed and create an object structure
566        assert!(result.is_ok());
567        let expected = json!({
568            "items": {
569                "invalid_index": {
570                    "name": "test"
571                }
572            }
573        });
574        assert_eq!(message.data, expected);
575
576        // Verify that "invalid_index" was treated as an object key, not array index
577        assert!(message.data["items"].is_object());
578        assert!(!message.data["items"].is_array());
579    }
580}