1use crate::engine::AsyncFunctionHandler;
2use crate::engine::error::{DataflowError, Result};
3use crate::engine::message::{Change, Message};
4use async_trait::async_trait;
5use datalogic_rs::DataLogic;
6use log::error;
7use serde_json::{Value, json};
8
9pub struct MapFunction;
14
15impl Default for MapFunction {
16 fn default() -> Self {
17 Self::new()
18 }
19}
20
21impl MapFunction {
22 pub fn new() -> Self {
24 Self
25 }
26
27 fn set_value_at_path(&self, target: &mut Value, path: &str, value: &Value) -> Result<Value> {
29 let mut current = target;
30 let mut old_value = Value::Null;
31 let path_parts: Vec<&str> = path.split('.').collect();
32
33 fn is_numeric_index(s: &str) -> bool {
35 s.parse::<usize>().is_ok()
36 }
37
38 for (i, part) in path_parts.iter().enumerate() {
40 let is_numeric = is_numeric_index(part);
41
42 if i == path_parts.len() - 1 {
43 if is_numeric {
45 if !current.is_array() {
47 *current = Value::Array(vec![]);
49 }
50
51 if let Ok(index) = part.parse::<usize>() {
52 if let Value::Array(arr) = current {
53 while arr.len() <= index {
55 arr.push(Value::Null);
56 }
57 old_value = arr[index].clone();
59 arr[index] = value.clone();
60 }
61 } else {
62 error!("Invalid array index: {part}");
63 return Err(DataflowError::Validation(format!(
64 "Invalid array index: {part}"
65 )));
66 }
67 } else {
68 if !current.is_object() {
70 *current = Value::Object(serde_json::Map::new());
72 }
73
74 if let Value::Object(map) = current {
75 let mut key = part.to_string();
77 if key.starts_with("#") {
78 key = key.strip_prefix("#").unwrap_or(&key).to_string();
79 }
80 old_value = map.get(&key).cloned().unwrap_or(Value::Null);
81 let value_to_insert = if old_value.is_object() && value.is_object() {
83 let mut merged_map = old_value.as_object().unwrap().clone();
84 if let Some(new_map) = value.as_object() {
85 for (k, v) in new_map {
87 merged_map.insert(k.clone(), v.clone());
88 }
89 }
90 Value::Object(merged_map)
91 } else {
92 value.clone()
93 };
94 map.insert(key, value_to_insert);
95 }
96 }
97 } else {
98 if is_numeric {
100 if !current.is_array() {
102 *current = Value::Array(vec![]);
103 }
104
105 if let Ok(index) = part.parse::<usize>() {
106 if let Value::Array(arr) = current {
107 while arr.len() <= index {
109 arr.push(Value::Null);
110 }
111 if arr[index].is_null() {
113 let next_part = path_parts.get(i + 1).unwrap_or(&"");
115 if is_numeric_index(next_part) {
116 arr[index] = Value::Array(vec![]);
117 } else {
118 arr[index] = json!({});
119 }
120 }
121 current = &mut arr[index];
122 }
123 } else {
124 error!("Invalid array index: {part}");
125 return Err(DataflowError::Validation(format!(
126 "Invalid array index: {part}"
127 )));
128 }
129 } else {
130 if !current.is_object() {
132 *current = Value::Object(serde_json::Map::new());
133 }
134
135 if let Value::Object(map) = current {
136 let mut key = part.to_string();
137 if key.starts_with("#") {
138 key = key.strip_prefix("#").unwrap_or(&key).to_string();
139 }
140 if !map.contains_key(&key) {
141 let next_part = path_parts.get(i + 1).unwrap_or(&"");
143 if is_numeric_index(next_part) {
144 map.insert(part.to_string(), Value::Array(vec![]));
145 } else {
146 map.insert(key.clone(), json!({}));
147 }
148 }
149 current = map.get_mut(&key).unwrap();
150 }
151 }
152 }
153 }
154
155 Ok(old_value)
156 }
157}
158
159#[async_trait]
160impl AsyncFunctionHandler for MapFunction {
161 async fn execute(
162 &self,
163 message: &mut Message,
164 input: &Value,
165 data_logic: &mut DataLogic,
166 ) -> Result<(usize, Vec<Change>)> {
167 let mappings = input.get("mappings").ok_or_else(|| {
169 DataflowError::Validation("Missing 'mappings' array in input".to_string())
170 })?;
171
172 let mappings_arr = mappings
173 .as_array()
174 .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
175
176 let mut changes = Vec::new();
177
178 for mapping in mappings_arr {
180 let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
182 DataflowError::Validation("Missing 'path' in mapping".to_string())
183 })?;
184
185 let logic = mapping.get("logic").ok_or_else(|| {
187 DataflowError::Validation("Missing 'logic' in mapping".to_string())
188 })?;
189
190 let data_clone = message.data.clone();
193 let metadata_clone = message.metadata.clone();
194 let temp_data_clone = message.temp_data.clone();
195
196 let data_for_eval = json!({
198 "data": data_clone,
199 "metadata": metadata_clone,
200 "temp_data": temp_data_clone,
201 });
202
203 let (target_object, adjusted_path) =
205 if let Some(path) = target_path.strip_prefix("data.") {
206 (&mut message.data, path)
207 } else if let Some(path) = target_path.strip_prefix("metadata.") {
208 (&mut message.metadata, path)
209 } else if let Some(path) = target_path.strip_prefix("temp_data.") {
210 (&mut message.temp_data, path)
211 } else if target_path == "data" {
212 (&mut message.data, "")
213 } else if target_path == "metadata" {
214 (&mut message.metadata, "")
215 } else if target_path == "temp_data" {
216 (&mut message.temp_data, "")
217 } else {
218 (&mut message.data, target_path)
220 };
221
222 data_logic.reset_arena();
224 let result = data_logic
225 .evaluate_json(logic, &data_for_eval, None)
226 .map_err(|e| {
227 error!("Failed to evaluate logic: {e} for {logic}, {data_for_eval}");
228 DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {e}"))
229 })?;
230
231 if result.is_null() {
232 continue;
233 }
234
235 if adjusted_path.is_empty() {
237 let old_value = if target_object.is_object() && result.is_object() {
239 let mut merged_map = target_object.as_object().unwrap().clone();
240 if let Some(new_map) = result.as_object() {
241 for (k, v) in new_map {
243 merged_map.insert(k.clone(), v.clone());
244 }
245 }
246 std::mem::replace(target_object, Value::Object(merged_map))
247 } else {
248 std::mem::replace(target_object, result.clone())
249 };
250 changes.push(Change {
251 path: target_path.to_string(),
252 old_value,
253 new_value: result,
254 });
255 } else {
256 let old_value = self.set_value_at_path(target_object, adjusted_path, &result)?;
258 changes.push(Change {
259 path: target_path.to_string(),
260 old_value,
261 new_value: result,
262 });
263 }
264 }
265
266 Ok((200, changes))
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use crate::engine::message::Message;
274 use datalogic_rs::DataLogic;
275 use serde_json::json;
276
277 #[tokio::test]
278 async fn test_array_notation_simple() {
279 let map_fn = MapFunction::new();
280
281 let mut message = Message::new(&json!({}));
283 message.data = json!({});
284
285 let input = json!({
286 "mappings": [
287 {
288 "path": "data.items.0.name",
289 "logic": "Test Item"
290 }
291 ]
292 });
293
294 let mut data_logic = DataLogic::with_preserve_structure();
295 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
296
297 assert!(result.is_ok());
298 let expected = json!({
299 "items": [
300 {
301 "name": "Test Item"
302 }
303 ]
304 });
305 assert_eq!(message.data, expected);
306 }
307
308 #[tokio::test]
309 async fn test_array_notation_complex_path() {
310 let map_fn = MapFunction::new();
311
312 let mut message = Message::new(&json!({}));
314 message.data = json!({});
315
316 let input = json!({
317 "mappings": [
318 {
319 "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
320 "logic": "INSTR123"
321 }
322 ]
323 });
324
325 let mut data_logic = DataLogic::with_preserve_structure();
326 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
327
328 assert!(result.is_ok());
329 let expected = json!({
330 "MX": {
331 "FIToFICstmrCdtTrf": {
332 "CdtTrfTxInf": [
333 {
334 "PmtId": {
335 "InstrId": "INSTR123"
336 }
337 }
338 ]
339 }
340 }
341 });
342 assert_eq!(message.data, expected);
343 }
344
345 #[tokio::test]
346 async fn test_multiple_array_indices() {
347 let map_fn = MapFunction::new();
348
349 let mut message = Message::new(&json!({}));
351 message.data = json!({});
352
353 let input = json!({
354 "mappings": [
355 {
356 "path": "data.matrix.0.1.value",
357 "logic": "cell_01"
358 },
359 {
360 "path": "data.matrix.1.0.value",
361 "logic": "cell_10"
362 }
363 ]
364 });
365
366 let mut data_logic = DataLogic::with_preserve_structure();
367 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
368
369 assert!(result.is_ok());
370 let expected = json!({
371 "matrix": [
372 [
373 null,
374 {
375 "value": "cell_01"
376 }
377 ],
378 [
379 {
380 "value": "cell_10"
381 }
382 ]
383 ]
384 });
385 assert_eq!(message.data, expected);
386 }
387
388 #[tokio::test]
389 async fn test_array_extension() {
390 let map_fn = MapFunction::new();
391
392 let mut message = Message::new(&json!({}));
394 message.data = json!({});
395
396 let input = json!({
397 "mappings": [
398 {
399 "path": "data.items.5.name",
400 "logic": "Item at index 5"
401 }
402 ]
403 });
404
405 let mut data_logic = DataLogic::with_preserve_structure();
406 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
407
408 assert!(result.is_ok());
409
410 assert!(message.data["items"].is_array());
412 let items_array = message.data["items"].as_array().unwrap();
413 assert_eq!(items_array.len(), 6);
414
415 for i in 0..5 {
417 assert_eq!(items_array[i], json!(null));
418 }
419
420 assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
422 }
423
424 #[tokio::test]
425 async fn test_mixed_array_and_object_notation() {
426 let map_fn = MapFunction::new();
427
428 let mut message = Message::new(&json!({}));
430 message.data = json!({});
431
432 let input = json!({
433 "mappings": [
434 {
435 "path": "data.users.0.profile.addresses.1.city",
436 "logic": "New York"
437 },
438 {
439 "path": "data.users.0.profile.name",
440 "logic": "John Doe"
441 }
442 ]
443 });
444
445 let mut data_logic = DataLogic::with_preserve_structure();
446 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
447
448 assert!(result.is_ok());
449 let expected = json!({
450 "users": [
451 {
452 "profile": {
453 "name": "John Doe",
454 "addresses": [
455 null,
456 {
457 "city": "New York"
458 }
459 ]
460 }
461 }
462 ]
463 });
464 assert_eq!(message.data, expected);
465 }
466
467 #[tokio::test]
468 async fn test_overwrite_existing_value() {
469 let map_fn = MapFunction::new();
470
471 let mut message = Message::new(&json!({}));
473 message.data = json!({
474 "items": [
475 {"name": "Old Value"},
476 {"name": "Another Item"}
477 ]
478 });
479
480 let input = json!({
481 "mappings": [
482 {
483 "path": "data.items.0.name",
484 "logic": "New Value"
485 }
486 ]
487 });
488
489 let mut data_logic = DataLogic::with_preserve_structure();
490 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
491
492 assert!(result.is_ok());
493 let expected = json!({
494 "items": [
495 {"name": "New Value"},
496 {"name": "Another Item"}
497 ]
498 });
499 assert_eq!(message.data, expected);
500
501 let (_, changes) = result.unwrap();
503 assert_eq!(changes.len(), 1);
504 assert_eq!(changes[0].path, "data.items.0.name");
505 assert_eq!(changes[0].old_value, json!("Old Value"));
506 assert_eq!(changes[0].new_value, json!("New Value"));
507 }
508
509 #[tokio::test]
510 async fn test_array_notation_with_jsonlogic() {
511 let map_fn = MapFunction::new();
512
513 let mut message = Message::new(&json!({}));
515 message.temp_data = json!({
516 "transactions": [
517 {"id": "tx1", "amount": 100},
518 {"id": "tx2", "amount": 200}
519 ]
520 });
521 message.data = json!({});
522
523 let input = json!({
524 "mappings": [
525 {
526 "path": "data.processed.0.transaction_id",
527 "logic": {"var": "temp_data.transactions.0.id"}
528 },
529 {
530 "path": "data.processed.0.amount_cents",
531 "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
532 }
533 ]
534 });
535
536 let mut data_logic = DataLogic::with_preserve_structure();
537 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
538
539 assert!(result.is_ok());
540 let expected = json!({
541 "processed": [
542 {
543 "transaction_id": "tx1",
544 "amount_cents": 10000
545 }
546 ]
547 });
548 assert_eq!(message.data, expected);
549 }
550
551 #[tokio::test]
552 async fn test_convert_object_to_array() {
553 let map_fn = MapFunction::new();
554
555 let mut message = Message::new(&json!({}));
557 message.data = json!({
558 "items": {
559 "existing_key": "existing_value"
560 }
561 });
562
563 let input = json!({
564 "mappings": [
565 {
566 "path": "data.items.0.new_value",
567 "logic": "array_item"
568 }
569 ]
570 });
571
572 let mut data_logic = DataLogic::with_preserve_structure();
573 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
574
575 assert!(result.is_ok());
576 assert!(message.data["items"].is_array());
578 let expected = json!({
579 "items": [
580 {
581 "new_value": "array_item"
582 }
583 ]
584 });
585 assert_eq!(message.data, expected);
586 }
587
588 #[tokio::test]
589 async fn test_non_numeric_index_handling() {
590 let map_fn = MapFunction::new();
591
592 let mut message = Message::new(&json!({}));
594 message.data = json!({});
595
596 let input = json!({
597 "mappings": [
598 {
599 "path": "data.items.invalid_index.name",
600 "logic": "test"
601 }
602 ]
603 });
604
605 let mut data_logic = DataLogic::with_preserve_structure();
606 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
607
608 assert!(result.is_ok());
610 let expected = json!({
611 "items": {
612 "invalid_index": {
613 "name": "test"
614 }
615 }
616 });
617 assert_eq!(message.data, expected);
618
619 assert!(message.data["items"].is_object());
621 assert!(!message.data["items"].is_array());
622 }
623
624 #[tokio::test]
625 async fn test_object_merge_on_mapping() {
626 let map_fn = MapFunction::new();
627
628 let mut message = Message::new(&json!({}));
630 message.data = json!({
631 "config": {
632 "database": {
633 "host": "localhost",
634 "port": 5432,
635 "username": "admin"
636 }
637 }
638 });
639
640 let input = json!({
642 "mappings": [
643 {
644 "path": "data.config.database",
645 "logic": {
646 "password": "secret123",
647 "ssl": true,
648 "port": 5433 }
650 }
651 ]
652 });
653
654 let mut data_logic = DataLogic::with_preserve_structure();
655 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
656
657 assert!(result.is_ok());
658 let expected = json!({
659 "config": {
660 "database": {
661 "host": "localhost",
662 "port": 5433, "username": "admin",
664 "password": "secret123", "ssl": true }
667 }
668 });
669 assert_eq!(message.data, expected);
670
671 let (_, changes) = result.unwrap();
673 assert_eq!(changes.len(), 1);
674 assert_eq!(changes[0].path, "data.config.database");
675 assert_eq!(
676 changes[0].old_value,
677 json!({
678 "host": "localhost",
679 "port": 5432,
680 "username": "admin"
681 })
682 );
683 assert_eq!(
684 changes[0].new_value,
685 json!({
686 "password": "secret123",
687 "ssl": true,
688 "port": 5433
689 })
690 );
691 }
692
693 #[tokio::test]
694 async fn test_object_merge_with_nested_path() {
695 let map_fn = MapFunction::new();
696
697 let mut message = Message::new(&json!({}));
699 message.data = json!({
700 "user": {
701 "profile": {
702 "name": "John Doe",
703 "age": 30
704 }
705 }
706 });
707
708 let input = json!({
709 "mappings": [
710 {
711 "path": "data.user.profile",
712 "logic": {
713 "email": "john@example.com",
714 "age": 31, "city": "New York"
716 }
717 }
718 ]
719 });
720
721 let mut data_logic = DataLogic::with_preserve_structure();
722 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
723
724 assert!(result.is_ok());
725 let expected = json!({
726 "user": {
727 "profile": {
728 "name": "John Doe", "age": 31, "email": "john@example.com", "city": "New York" }
733 }
734 });
735 assert_eq!(message.data, expected);
736 }
737
738 #[tokio::test]
739 async fn test_non_object_replacement() {
740 let map_fn = MapFunction::new();
741
742 let mut message = Message::new(&json!({}));
744 message.data = json!({
745 "settings": {
746 "value": "old_string"
747 }
748 });
749
750 let input = json!({
751 "mappings": [
752 {
753 "path": "data.settings.value",
754 "logic": {"new": "object"}
755 }
756 ]
757 });
758
759 let mut data_logic = DataLogic::with_preserve_structure();
760 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
761
762 assert!(result.is_ok());
763 let expected = json!({
765 "settings": {
766 "value": {"new": "object"}
767 }
768 });
769 assert_eq!(message.data, expected);
770 }
771
772 #[tokio::test]
773 async fn test_parent_child_mapping_issue_fix() {
774 let map_fn = MapFunction::new();
775
776 let mut message = Message::new(&json!({}));
778 message.data = json!({});
779
780 let input = json!({
782 "mappings": [
783 {
784 "path": "data.Parent.Child",
785 "logic": {
786 "Field1": "Value1",
787 "Field2": "Value2"
788 }
789 },
790 {
791 "path": "data.Parent.Child.NestedObject",
792 "logic": {
793 "NestedField": "NestedValue"
794 }
795 }
796 ]
797 });
798
799 let mut data_logic = DataLogic::with_preserve_structure();
800 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
801
802 assert!(result.is_ok());
803 let expected = json!({
804 "Parent": {
805 "Child": {
806 "Field1": "Value1",
807 "Field2": "Value2",
808 "NestedObject": {
809 "NestedField": "NestedValue"
810 }
811 }
812 }
813 });
814 assert_eq!(message.data, expected);
815 }
816
817 #[tokio::test]
818 async fn test_multiple_mappings_with_dependencies() {
819 let map_fn = MapFunction::new();
820
821 let mut message = Message::new(&json!({}));
823 message.data = json!({});
824
825 let input = json!({
826 "mappings": [
827 {
828 "path": "data.config.database.host",
829 "logic": "localhost"
830 },
831 {
832 "path": "data.config.database.port",
833 "logic": 5432
834 },
835 {
836 "path": "data.config.connectionString",
838 "logic": {
839 "cat": [
840 "postgresql://",
841 {"var": "data.config.database.host"},
842 ":",
843 {"var": "data.config.database.port"}
844 ]
845 }
846 }
847 ]
848 });
849
850 let mut data_logic = DataLogic::with_preserve_structure();
851 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
852
853 assert!(result.is_ok());
854 let expected = json!({
855 "config": {
856 "database": {
857 "host": "localhost",
858 "port": 5432
859 },
860 "connectionString": "postgresql://localhost:5432"
861 }
862 });
863 assert_eq!(message.data, expected);
864 }
865
866 #[tokio::test]
867 async fn test_nested_path_after_parent_mapping() {
868 let map_fn = MapFunction::new();
869
870 let mut message = Message::new(&json!({}));
872 message.data = json!({});
873
874 let input = json!({
875 "mappings": [
876 {
877 "path": "data.transaction",
878 "logic": {
879 "id": "TX-001",
880 "amount": 1000,
881 "currency": "USD"
882 }
883 },
884 {
885 "path": "data.transaction.metadata",
886 "logic": {
887 "timestamp": "2024-01-01T12:00:00Z",
888 "source": "API"
889 }
890 },
891 {
892 "path": "data.transaction.metadata.tags",
893 "logic": ["urgent", "international"]
894 }
895 ]
896 });
897
898 let mut data_logic = DataLogic::with_preserve_structure();
899 let result = map_fn.execute(&mut message, &input, &mut data_logic).await;
900
901 assert!(result.is_ok());
902 let expected = json!({
903 "transaction": {
904 "id": "TX-001",
905 "amount": 1000,
906 "currency": "USD",
907 "metadata": {
908 "timestamp": "2024-01-01T12:00:00Z",
909 "source": "API",
910 "tags": ["urgent", "international"]
911 }
912 }
913 });
914 assert_eq!(message.data, expected);
915 }
916}