dataflow_rs/engine/functions/
map.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use serde_json::{json, Value};
7
8/// A mapping function that transforms data using JSONLogic expressions.
9///
10/// This function allows mapping data from one location to another within
11/// a message, applying transformations using JSONLogic expressions.
12pub struct MapFunction {
13    // No longer needs data_logic field
14}
15
16impl Default for MapFunction {
17    fn default() -> Self {
18        Self::new()
19    }
20}
21
22impl MapFunction {
23    /// Create a new MapFunction
24    pub fn new() -> Self {
25        Self {}
26    }
27
28    /// Set a value at the specified path within the target object
29    fn set_value_at_path(&self, target: &mut Value, path: &str, value: Value) -> Result<Value> {
30        let mut current = target;
31        let mut old_value = Value::Null;
32        let path_parts: Vec<&str> = path.split('.').collect();
33
34        // Helper function to check if a string is a valid array index
35        fn is_numeric_index(s: &str) -> bool {
36            s.parse::<usize>().is_ok()
37        }
38
39        // Navigate to the parent of the target location
40        for (i, part) in path_parts.iter().enumerate() {
41            let is_numeric = is_numeric_index(part);
42
43            if i == path_parts.len() - 1 {
44                // We're at the last part, so set the value
45                if is_numeric {
46                    // Handle numeric index - ensure current is an array
47                    if !current.is_array() {
48                        // Convert to array if it's not already
49                        *current = Value::Array(vec![]);
50                    }
51
52                    if let Ok(index) = part.parse::<usize>() {
53                        if let Value::Array(arr) = current {
54                            // Extend array if needed
55                            while arr.len() <= index {
56                                arr.push(Value::Null);
57                            }
58                            // Save old value
59                            old_value = arr[index].clone();
60                            arr[index] = value.clone();
61                        }
62                    } else {
63                        return Err(DataflowError::Validation(format!(
64                            "Invalid array index: {}",
65                            part
66                        )));
67                    }
68                } else {
69                    // Handle object property
70                    if !current.is_object() {
71                        // Convert to object if it's not already
72                        *current = Value::Object(serde_json::Map::new());
73                    }
74
75                    if let Value::Object(map) = current {
76                        // Save the old value before replacing
77                        old_value = map.get(*part).cloned().unwrap_or(Value::Null);
78                        map.insert(part.to_string(), value.clone());
79                    }
80                }
81            } else {
82                // We need to navigate deeper
83                if is_numeric {
84                    // Handle numeric index - ensure current is an array
85                    if !current.is_array() {
86                        *current = Value::Array(vec![]);
87                    }
88
89                    if let Ok(index) = part.parse::<usize>() {
90                        if let Value::Array(arr) = current {
91                            // Extend array if needed
92                            while arr.len() <= index {
93                                arr.push(Value::Null);
94                            }
95                            // Ensure the indexed element exists and is ready for further navigation
96                            if arr[index].is_null() {
97                                // Look ahead to see if next part is numeric to decide what to create
98                                let next_part = path_parts.get(i + 1).unwrap_or(&"");
99                                if is_numeric_index(next_part) {
100                                    arr[index] = Value::Array(vec![]);
101                                } else {
102                                    arr[index] = json!({});
103                                }
104                            }
105                            current = &mut arr[index];
106                        }
107                    } else {
108                        return Err(DataflowError::Validation(format!(
109                            "Invalid array index: {}",
110                            part
111                        )));
112                    }
113                } else {
114                    // Handle object property
115                    if !current.is_object() {
116                        *current = Value::Object(serde_json::Map::new());
117                    }
118
119                    if let Value::Object(map) = current {
120                        if !map.contains_key(*part) {
121                            // Look ahead to see if next part is numeric to decide what to create
122                            let next_part = path_parts.get(i + 1).unwrap_or(&"");
123                            if is_numeric_index(next_part) {
124                                map.insert(part.to_string(), Value::Array(vec![]));
125                            } else {
126                                map.insert(part.to_string(), json!({}));
127                            }
128                        }
129                        current = map.get_mut(*part).unwrap();
130                    }
131                }
132            }
133        }
134
135        Ok(old_value)
136    }
137}
138
139#[async_trait]
140impl AsyncFunctionHandler for MapFunction {
141    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
142        // Extract mappings array from input
143        let mappings = input.get("mappings").ok_or_else(|| {
144            DataflowError::Validation("Missing 'mappings' array in input".to_string())
145        })?;
146
147        let mappings_arr = mappings
148            .as_array()
149            .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
150
151        let mut changes = Vec::new();
152
153        // Process each mapping
154        for mapping in mappings_arr {
155            // Get path where to store the result
156            let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
157                DataflowError::Validation("Missing 'path' in mapping".to_string())
158            })?;
159
160            // Get the logic to evaluate
161            let logic = mapping.get("logic").ok_or_else(|| {
162                DataflowError::Validation("Missing 'logic' in mapping".to_string())
163            })?;
164
165            // Clone message data for evaluation context
166            let data_clone = message.data.clone();
167            let metadata_clone = message.metadata.clone();
168            let temp_data_clone = message.temp_data.clone();
169
170            // Create a combined data object with message fields for evaluation
171            let data_for_eval = json!({
172                "data": data_clone,
173                "metadata": metadata_clone,
174                "temp_data": temp_data_clone,
175            });
176
177            // Determine target object based on path prefix
178            let (target_object, adjusted_path) =
179                if let Some(path) = target_path.strip_prefix("data.") {
180                    (&mut message.data, path)
181                } else if let Some(path) = target_path.strip_prefix("metadata.") {
182                    (&mut message.metadata, path)
183                } else if let Some(path) = target_path.strip_prefix("temp_data.") {
184                    (&mut message.temp_data, path)
185                } else if target_path == "data" {
186                    (&mut message.data, "")
187                } else if target_path == "metadata" {
188                    (&mut message.metadata, "")
189                } else if target_path == "temp_data" {
190                    (&mut message.temp_data, "")
191                } else {
192                    // Default to data
193                    (&mut message.data, target_path)
194                };
195
196            // Evaluate the logic using thread-local DataLogic
197            let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
198                let data_logic = data_logic_cell.borrow_mut();
199
200                data_logic
201                    .evaluate_json(logic, &data_for_eval, None)
202                    .map_err(|e| {
203                        DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {}", e))
204                    })
205            })?;
206
207            // Set the result at the target path
208            if adjusted_path.is_empty() {
209                // Replace the entire object
210                let old_value = std::mem::replace(target_object, result.clone());
211                changes.push(Change {
212                    path: target_path.to_string(),
213                    old_value,
214                    new_value: result,
215                });
216            } else {
217                // Set a specific path
218                let old_value =
219                    self.set_value_at_path(target_object, adjusted_path, result.clone())?;
220                changes.push(Change {
221                    path: target_path.to_string(),
222                    old_value,
223                    new_value: result,
224                });
225            }
226        }
227
228        Ok((200, changes))
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::engine::message::Message;
236    use serde_json::json;
237
238    #[tokio::test]
239    async fn test_array_notation_simple() {
240        let map_fn = MapFunction::new();
241
242        // Test simple array notation: data.items.0.name
243        let mut message = Message::new(&json!({}));
244        message.data = json!({});
245
246        let input = json!({
247            "mappings": [
248                {
249                    "path": "data.items.0.name",
250                    "logic": "Test Item"
251                }
252            ]
253        });
254
255        let result = map_fn.execute(&mut message, &input).await;
256
257        assert!(result.is_ok());
258        let expected = json!({
259            "items": [
260                {
261                    "name": "Test Item"
262                }
263            ]
264        });
265        assert_eq!(message.data, expected);
266    }
267
268    #[tokio::test]
269    async fn test_array_notation_complex_path() {
270        let map_fn = MapFunction::new();
271
272        // Test complex path like the original example: data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId
273        let mut message = Message::new(&json!({}));
274        message.data = json!({});
275
276        let input = json!({
277            "mappings": [
278                {
279                    "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
280                    "logic": "INSTR123"
281                }
282            ]
283        });
284
285        let result = map_fn.execute(&mut message, &input).await;
286
287        assert!(result.is_ok());
288        let expected = json!({
289            "MX": {
290                "FIToFICstmrCdtTrf": {
291                    "CdtTrfTxInf": [
292                        {
293                            "PmtId": {
294                                "InstrId": "INSTR123"
295                            }
296                        }
297                    ]
298                }
299            }
300        });
301        assert_eq!(message.data, expected);
302    }
303
304    #[tokio::test]
305    async fn test_multiple_array_indices() {
306        let map_fn = MapFunction::new();
307
308        // Test multiple array indices in the same path: data.matrix.0.1.value
309        let mut message = Message::new(&json!({}));
310        message.data = json!({});
311
312        let input = json!({
313            "mappings": [
314                {
315                    "path": "data.matrix.0.1.value",
316                    "logic": "cell_01"
317                },
318                {
319                    "path": "data.matrix.1.0.value",
320                    "logic": "cell_10"
321                }
322            ]
323        });
324
325        let result = map_fn.execute(&mut message, &input).await;
326
327        assert!(result.is_ok());
328        let expected = json!({
329            "matrix": [
330                [
331                    null,
332                    {
333                        "value": "cell_01"
334                    }
335                ],
336                [
337                    {
338                        "value": "cell_10"
339                    }
340                ]
341            ]
342        });
343        assert_eq!(message.data, expected);
344    }
345
346    #[tokio::test]
347    async fn test_array_extension() {
348        let map_fn = MapFunction::new();
349
350        // Test that arrays are extended when accessing high indices
351        let mut message = Message::new(&json!({}));
352        message.data = json!({});
353
354        let input = json!({
355            "mappings": [
356                {
357                    "path": "data.items.5.name",
358                    "logic": "Item at index 5"
359                }
360            ]
361        });
362
363        let result = map_fn.execute(&mut message, &input).await;
364
365        assert!(result.is_ok());
366
367        // Should create an array with 6 elements (indices 0-5)
368        assert!(message.data["items"].is_array());
369        let items_array = message.data["items"].as_array().unwrap();
370        assert_eq!(items_array.len(), 6);
371
372        // First 5 elements should be null
373        for i in 0..5 {
374            assert_eq!(items_array[i], json!(null));
375        }
376
377        // Element at index 5 should have our value
378        assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
379    }
380
381    #[tokio::test]
382    async fn test_mixed_array_and_object_notation() {
383        let map_fn = MapFunction::new();
384
385        // Test mixing array and object notation: data.users.0.profile.addresses.1.city
386        let mut message = Message::new(&json!({}));
387        message.data = json!({});
388
389        let input = json!({
390            "mappings": [
391                {
392                    "path": "data.users.0.profile.addresses.1.city",
393                    "logic": "New York"
394                },
395                {
396                    "path": "data.users.0.profile.name",
397                    "logic": "John Doe"
398                }
399            ]
400        });
401
402        let result = map_fn.execute(&mut message, &input).await;
403
404        assert!(result.is_ok());
405        let expected = json!({
406            "users": [
407                {
408                    "profile": {
409                        "name": "John Doe",
410                        "addresses": [
411                            null,
412                            {
413                                "city": "New York"
414                            }
415                        ]
416                    }
417                }
418            ]
419        });
420        assert_eq!(message.data, expected);
421    }
422
423    #[tokio::test]
424    async fn test_overwrite_existing_value() {
425        let map_fn = MapFunction::new();
426
427        // Test overwriting an existing value in an array
428        let mut message = Message::new(&json!({}));
429        message.data = json!({
430            "items": [
431                {"name": "Old Value"},
432                {"name": "Another Item"}
433            ]
434        });
435
436        let input = json!({
437            "mappings": [
438                {
439                    "path": "data.items.0.name",
440                    "logic": "New Value"
441                }
442            ]
443        });
444
445        let result = map_fn.execute(&mut message, &input).await;
446
447        assert!(result.is_ok());
448        let expected = json!({
449            "items": [
450                {"name": "New Value"},
451                {"name": "Another Item"}
452            ]
453        });
454        assert_eq!(message.data, expected);
455
456        // Check that changes are recorded
457        let (_, changes) = result.unwrap();
458        assert_eq!(changes.len(), 1);
459        assert_eq!(changes[0].path, "data.items.0.name");
460        assert_eq!(changes[0].old_value, json!("Old Value"));
461        assert_eq!(changes[0].new_value, json!("New Value"));
462    }
463
464    #[tokio::test]
465    async fn test_array_notation_with_jsonlogic() {
466        let map_fn = MapFunction::new();
467
468        // Test array notation with JSONLogic expressions
469        let mut message = Message::new(&json!({}));
470        message.temp_data = json!({
471            "transactions": [
472                {"id": "tx1", "amount": 100},
473                {"id": "tx2", "amount": 200}
474            ]
475        });
476        message.data = json!({});
477
478        let input = json!({
479            "mappings": [
480                {
481                    "path": "data.processed.0.transaction_id",
482                    "logic": {"var": "temp_data.transactions.0.id"}
483                },
484                {
485                    "path": "data.processed.0.amount_cents",
486                    "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
487                }
488            ]
489        });
490
491        let result = map_fn.execute(&mut message, &input).await;
492
493        assert!(result.is_ok());
494        let expected = json!({
495            "processed": [
496                {
497                    "transaction_id": "tx1",
498                    "amount_cents": 10000
499                }
500            ]
501        });
502        assert_eq!(message.data, expected);
503    }
504
505    #[tokio::test]
506    async fn test_convert_object_to_array() {
507        let map_fn = MapFunction::new();
508
509        // Test converting an existing object to an array when numeric index is encountered
510        let mut message = Message::new(&json!({}));
511        message.data = json!({
512            "items": {
513                "existing_key": "existing_value"
514            }
515        });
516
517        let input = json!({
518            "mappings": [
519                {
520                    "path": "data.items.0.new_value",
521                    "logic": "array_item"
522                }
523            ]
524        });
525
526        let result = map_fn.execute(&mut message, &input).await;
527
528        assert!(result.is_ok());
529        // The object should be converted to an array
530        assert!(message.data["items"].is_array());
531        let expected = json!({
532            "items": [
533                {
534                    "new_value": "array_item"
535                }
536            ]
537        });
538        assert_eq!(message.data, expected);
539    }
540
541    #[tokio::test]
542    async fn test_non_numeric_index_handling() {
543        let map_fn = MapFunction::new();
544
545        // Test that non-numeric strings are treated as object keys, not array indices
546        let mut message = Message::new(&json!({}));
547        message.data = json!({});
548
549        let input = json!({
550            "mappings": [
551                {
552                    "path": "data.items.invalid_index.name",
553                    "logic": "test"
554                }
555            ]
556        });
557
558        let result = map_fn.execute(&mut message, &input).await;
559
560        // This should succeed and create an object structure
561        assert!(result.is_ok());
562        let expected = json!({
563            "items": {
564                "invalid_index": {
565                    "name": "test"
566                }
567            }
568        });
569        assert_eq!(message.data, expected);
570
571        // Verify that "invalid_index" was treated as an object key, not array index
572        assert!(message.data["items"].is_object());
573        assert!(!message.data["items"].is_array());
574    }
575}