dataflow_rs/engine/functions/
map.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use serde_json::{json, Value};
7
8/// A mapping function that transforms data using JSONLogic expressions.
9///
10/// This function allows mapping data from one location to another within
11/// a message, applying transformations using JSONLogic expressions.
12pub struct MapFunction {
13    // No longer needs data_logic field
14}
15
16impl Default for MapFunction {
17    fn default() -> Self {
18        Self::new()
19    }
20}
21
22impl MapFunction {
23    /// Create a new MapFunction
24    pub fn new() -> Self {
25        Self {}
26    }
27
28    /// Set a value at the specified path within the target object
29    fn set_value_at_path(&self, target: &mut Value, path: &str, value: Value) -> Result<Value> {
30        let mut current = target;
31        let mut old_value = Value::Null;
32        let path_parts: Vec<&str> = path.split('.').collect();
33
34        // Helper function to check if a string is a valid array index
35        fn is_numeric_index(s: &str) -> bool {
36            s.parse::<usize>().is_ok()
37        }
38
39        // Navigate to the parent of the target location
40        for (i, part) in path_parts.iter().enumerate() {
41            let is_numeric = is_numeric_index(part);
42
43            if i == path_parts.len() - 1 {
44                // We're at the last part, so set the value
45                if is_numeric {
46                    // Handle numeric index - ensure current is an array
47                    if !current.is_array() {
48                        // Convert to array if it's not already
49                        *current = Value::Array(vec![]);
50                    }
51
52                    if let Ok(index) = part.parse::<usize>() {
53                        if let Value::Array(arr) = current {
54                            // Extend array if needed
55                            while arr.len() <= index {
56                                arr.push(Value::Null);
57                            }
58                            // Save old value
59                            old_value = arr[index].clone();
60                            arr[index] = value.clone();
61                        }
62                    } else {
63                        return Err(DataflowError::Validation(format!(
64                            "Invalid array index: {}",
65                            part
66                        )));
67                    }
68                } else {
69                    // Handle object property
70                    if !current.is_object() {
71                        // Convert to object if it's not already
72                        *current = Value::Object(serde_json::Map::new());
73                    }
74
75                    if let Value::Object(map) = current {
76                        // Save the old value before replacing
77                        old_value = map.get(*part).cloned().unwrap_or(Value::Null);
78                        map.insert(part.to_string(), value.clone());
79                    }
80                }
81            } else {
82                // We need to navigate deeper
83                if is_numeric {
84                    // Handle numeric index - ensure current is an array
85                    if !current.is_array() {
86                        *current = Value::Array(vec![]);
87                    }
88
89                    if let Ok(index) = part.parse::<usize>() {
90                        if let Value::Array(arr) = current {
91                            // Extend array if needed
92                            while arr.len() <= index {
93                                arr.push(Value::Null);
94                            }
95                            // Ensure the indexed element exists and is ready for further navigation
96                            if arr[index].is_null() {
97                                // Look ahead to see if next part is numeric to decide what to create
98                                let next_part = path_parts.get(i + 1).unwrap_or(&"");
99                                if is_numeric_index(next_part) {
100                                    arr[index] = Value::Array(vec![]);
101                                } else {
102                                    arr[index] = json!({});
103                                }
104                            }
105                            current = &mut arr[index];
106                        }
107                    } else {
108                        return Err(DataflowError::Validation(format!(
109                            "Invalid array index: {}",
110                            part
111                        )));
112                    }
113                } else {
114                    // Handle object property
115                    if !current.is_object() {
116                        *current = Value::Object(serde_json::Map::new());
117                    }
118
119                    if let Value::Object(map) = current {
120                        if !map.contains_key(*part) {
121                            // Look ahead to see if next part is numeric to decide what to create
122                            let next_part = path_parts.get(i + 1).unwrap_or(&"");
123                            if is_numeric_index(next_part) {
124                                map.insert(part.to_string(), Value::Array(vec![]));
125                            } else {
126                                map.insert(part.to_string(), json!({}));
127                            }
128                        }
129                        current = map.get_mut(*part).unwrap();
130                    }
131                }
132            }
133        }
134
135        Ok(old_value)
136    }
137}
138
139#[async_trait]
140impl AsyncFunctionHandler for MapFunction {
141    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
142        // Extract mappings array from input
143        let mappings = input.get("mappings").ok_or_else(|| {
144            DataflowError::Validation("Missing 'mappings' array in input".to_string())
145        })?;
146
147        let mappings_arr = mappings
148            .as_array()
149            .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
150
151        let mut changes = Vec::new();
152
153        // Process each mapping
154        for mapping in mappings_arr {
155            // Get path where to store the result
156            let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
157                DataflowError::Validation("Missing 'path' in mapping".to_string())
158            })?;
159
160            // Get the logic to evaluate
161            let logic = mapping.get("logic").ok_or_else(|| {
162                DataflowError::Validation("Missing 'logic' in mapping".to_string())
163            })?;
164
165            // Clone message data for evaluation context
166            let data_clone = message.data.clone();
167            let metadata_clone = message.metadata.clone();
168            let temp_data_clone = message.temp_data.clone();
169
170            // Create a combined data object with message fields for evaluation
171            let data_for_eval = json!({
172                "data": data_clone,
173                "metadata": metadata_clone,
174                "temp_data": temp_data_clone,
175            });
176
177            // Determine target object based on path prefix
178            let (target_object, adjusted_path) =
179                if let Some(path) = target_path.strip_prefix("data.") {
180                    (&mut message.data, path)
181                } else if let Some(path) = target_path.strip_prefix("metadata.") {
182                    (&mut message.metadata, path)
183                } else if let Some(path) = target_path.strip_prefix("temp_data.") {
184                    (&mut message.temp_data, path)
185                } else if target_path == "data" {
186                    (&mut message.data, "")
187                } else if target_path == "metadata" {
188                    (&mut message.metadata, "")
189                } else if target_path == "temp_data" {
190                    (&mut message.temp_data, "")
191                } else {
192                    // Default to data
193                    (&mut message.data, target_path)
194                };
195
196            // Evaluate the logic using thread-local DataLogic
197            let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
198                let mut data_logic = data_logic_cell.borrow_mut();
199                data_logic.reset_arena();
200
201                data_logic
202                    .evaluate_json(logic, &data_for_eval, None)
203                    .map_err(|e| {
204                        DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {}", e))
205                    })
206            })?;
207
208            // Set the result at the target path
209            if adjusted_path.is_empty() {
210                // Replace the entire object
211                let old_value = std::mem::replace(target_object, result.clone());
212                changes.push(Change {
213                    path: target_path.to_string(),
214                    old_value,
215                    new_value: result,
216                });
217            } else {
218                // Set a specific path
219                let old_value =
220                    self.set_value_at_path(target_object, adjusted_path, result.clone())?;
221                changes.push(Change {
222                    path: target_path.to_string(),
223                    old_value,
224                    new_value: result,
225                });
226            }
227        }
228
229        Ok((200, changes))
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use crate::engine::message::Message;
237    use serde_json::json;
238
239    #[tokio::test]
240    async fn test_array_notation_simple() {
241        let map_fn = MapFunction::new();
242
243        // Test simple array notation: data.items.0.name
244        let mut message = Message::new(&json!({}));
245        message.data = json!({});
246
247        let input = json!({
248            "mappings": [
249                {
250                    "path": "data.items.0.name",
251                    "logic": "Test Item"
252                }
253            ]
254        });
255
256        let result = map_fn.execute(&mut message, &input).await;
257
258        assert!(result.is_ok());
259        let expected = json!({
260            "items": [
261                {
262                    "name": "Test Item"
263                }
264            ]
265        });
266        assert_eq!(message.data, expected);
267    }
268
269    #[tokio::test]
270    async fn test_array_notation_complex_path() {
271        let map_fn = MapFunction::new();
272
273        // Test complex path like the original example: data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId
274        let mut message = Message::new(&json!({}));
275        message.data = json!({});
276
277        let input = json!({
278            "mappings": [
279                {
280                    "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
281                    "logic": "INSTR123"
282                }
283            ]
284        });
285
286        let result = map_fn.execute(&mut message, &input).await;
287
288        assert!(result.is_ok());
289        let expected = json!({
290            "MX": {
291                "FIToFICstmrCdtTrf": {
292                    "CdtTrfTxInf": [
293                        {
294                            "PmtId": {
295                                "InstrId": "INSTR123"
296                            }
297                        }
298                    ]
299                }
300            }
301        });
302        assert_eq!(message.data, expected);
303    }
304
305    #[tokio::test]
306    async fn test_multiple_array_indices() {
307        let map_fn = MapFunction::new();
308
309        // Test multiple array indices in the same path: data.matrix.0.1.value
310        let mut message = Message::new(&json!({}));
311        message.data = json!({});
312
313        let input = json!({
314            "mappings": [
315                {
316                    "path": "data.matrix.0.1.value",
317                    "logic": "cell_01"
318                },
319                {
320                    "path": "data.matrix.1.0.value",
321                    "logic": "cell_10"
322                }
323            ]
324        });
325
326        let result = map_fn.execute(&mut message, &input).await;
327
328        assert!(result.is_ok());
329        let expected = json!({
330            "matrix": [
331                [
332                    null,
333                    {
334                        "value": "cell_01"
335                    }
336                ],
337                [
338                    {
339                        "value": "cell_10"
340                    }
341                ]
342            ]
343        });
344        assert_eq!(message.data, expected);
345    }
346
347    #[tokio::test]
348    async fn test_array_extension() {
349        let map_fn = MapFunction::new();
350
351        // Test that arrays are extended when accessing high indices
352        let mut message = Message::new(&json!({}));
353        message.data = json!({});
354
355        let input = json!({
356            "mappings": [
357                {
358                    "path": "data.items.5.name",
359                    "logic": "Item at index 5"
360                }
361            ]
362        });
363
364        let result = map_fn.execute(&mut message, &input).await;
365
366        assert!(result.is_ok());
367
368        // Should create an array with 6 elements (indices 0-5)
369        assert!(message.data["items"].is_array());
370        let items_array = message.data["items"].as_array().unwrap();
371        assert_eq!(items_array.len(), 6);
372
373        // First 5 elements should be null
374        for i in 0..5 {
375            assert_eq!(items_array[i], json!(null));
376        }
377
378        // Element at index 5 should have our value
379        assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
380    }
381
382    #[tokio::test]
383    async fn test_mixed_array_and_object_notation() {
384        let map_fn = MapFunction::new();
385
386        // Test mixing array and object notation: data.users.0.profile.addresses.1.city
387        let mut message = Message::new(&json!({}));
388        message.data = json!({});
389
390        let input = json!({
391            "mappings": [
392                {
393                    "path": "data.users.0.profile.addresses.1.city",
394                    "logic": "New York"
395                },
396                {
397                    "path": "data.users.0.profile.name",
398                    "logic": "John Doe"
399                }
400            ]
401        });
402
403        let result = map_fn.execute(&mut message, &input).await;
404
405        assert!(result.is_ok());
406        let expected = json!({
407            "users": [
408                {
409                    "profile": {
410                        "name": "John Doe",
411                        "addresses": [
412                            null,
413                            {
414                                "city": "New York"
415                            }
416                        ]
417                    }
418                }
419            ]
420        });
421        assert_eq!(message.data, expected);
422    }
423
424    #[tokio::test]
425    async fn test_overwrite_existing_value() {
426        let map_fn = MapFunction::new();
427
428        // Test overwriting an existing value in an array
429        let mut message = Message::new(&json!({}));
430        message.data = json!({
431            "items": [
432                {"name": "Old Value"},
433                {"name": "Another Item"}
434            ]
435        });
436
437        let input = json!({
438            "mappings": [
439                {
440                    "path": "data.items.0.name",
441                    "logic": "New Value"
442                }
443            ]
444        });
445
446        let result = map_fn.execute(&mut message, &input).await;
447
448        assert!(result.is_ok());
449        let expected = json!({
450            "items": [
451                {"name": "New Value"},
452                {"name": "Another Item"}
453            ]
454        });
455        assert_eq!(message.data, expected);
456
457        // Check that changes are recorded
458        let (_, changes) = result.unwrap();
459        assert_eq!(changes.len(), 1);
460        assert_eq!(changes[0].path, "data.items.0.name");
461        assert_eq!(changes[0].old_value, json!("Old Value"));
462        assert_eq!(changes[0].new_value, json!("New Value"));
463    }
464
465    #[tokio::test]
466    async fn test_array_notation_with_jsonlogic() {
467        let map_fn = MapFunction::new();
468
469        // Test array notation with JSONLogic expressions
470        let mut message = Message::new(&json!({}));
471        message.temp_data = json!({
472            "transactions": [
473                {"id": "tx1", "amount": 100},
474                {"id": "tx2", "amount": 200}
475            ]
476        });
477        message.data = json!({});
478
479        let input = json!({
480            "mappings": [
481                {
482                    "path": "data.processed.0.transaction_id",
483                    "logic": {"var": "temp_data.transactions.0.id"}
484                },
485                {
486                    "path": "data.processed.0.amount_cents",
487                    "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
488                }
489            ]
490        });
491
492        let result = map_fn.execute(&mut message, &input).await;
493
494        assert!(result.is_ok());
495        let expected = json!({
496            "processed": [
497                {
498                    "transaction_id": "tx1",
499                    "amount_cents": 10000
500                }
501            ]
502        });
503        assert_eq!(message.data, expected);
504    }
505
506    #[tokio::test]
507    async fn test_convert_object_to_array() {
508        let map_fn = MapFunction::new();
509
510        // Test converting an existing object to an array when numeric index is encountered
511        let mut message = Message::new(&json!({}));
512        message.data = json!({
513            "items": {
514                "existing_key": "existing_value"
515            }
516        });
517
518        let input = json!({
519            "mappings": [
520                {
521                    "path": "data.items.0.new_value",
522                    "logic": "array_item"
523                }
524            ]
525        });
526
527        let result = map_fn.execute(&mut message, &input).await;
528
529        assert!(result.is_ok());
530        // The object should be converted to an array
531        assert!(message.data["items"].is_array());
532        let expected = json!({
533            "items": [
534                {
535                    "new_value": "array_item"
536                }
537            ]
538        });
539        assert_eq!(message.data, expected);
540    }
541
542    #[tokio::test]
543    async fn test_non_numeric_index_handling() {
544        let map_fn = MapFunction::new();
545
546        // Test that non-numeric strings are treated as object keys, not array indices
547        let mut message = Message::new(&json!({}));
548        message.data = json!({});
549
550        let input = json!({
551            "mappings": [
552                {
553                    "path": "data.items.invalid_index.name",
554                    "logic": "test"
555                }
556            ]
557        });
558
559        let result = map_fn.execute(&mut message, &input).await;
560
561        // This should succeed and create an object structure
562        assert!(result.is_ok());
563        let expected = json!({
564            "items": {
565                "invalid_index": {
566                    "name": "test"
567                }
568            }
569        });
570        assert_eq!(message.data, expected);
571
572        // Verify that "invalid_index" was treated as an object key, not array index
573        assert!(message.data["items"].is_object());
574        assert!(!message.data["items"].is_array());
575    }
576}