dataflow_rs/engine/functions/
map.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use log::error;
7use serde_json::{json, Value};
8
9pub struct MapFunction {
14 }
16
17impl Default for MapFunction {
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl MapFunction {
24 pub fn new() -> Self {
26 Self {}
27 }
28
29 fn set_value_at_path(&self, target: &mut Value, path: &str, value: &Value) -> Result<Value> {
31 let mut current = target;
32 let mut old_value = Value::Null;
33 let path_parts: Vec<&str> = path.split('.').collect();
34
35 fn is_numeric_index(s: &str) -> bool {
37 s.parse::<usize>().is_ok()
38 }
39
40 for (i, part) in path_parts.iter().enumerate() {
42 let is_numeric = is_numeric_index(part);
43
44 if i == path_parts.len() - 1 {
45 if is_numeric {
47 if !current.is_array() {
49 *current = Value::Array(vec![]);
51 }
52
53 if let Ok(index) = part.parse::<usize>() {
54 if let Value::Array(arr) = current {
55 while arr.len() <= index {
57 arr.push(Value::Null);
58 }
59 old_value = arr[index].clone();
61 arr[index] = value.clone();
62 }
63 } else {
64 error!("Invalid array index: {part}");
65 return Err(DataflowError::Validation(format!(
66 "Invalid array index: {part}"
67 )));
68 }
69 } else {
70 if !current.is_object() {
72 *current = Value::Object(serde_json::Map::new());
74 }
75
76 if let Value::Object(map) = current {
77 let mut key = part.to_string();
79 if key.starts_with("#") {
80 key = key.strip_prefix("#").unwrap_or(&key).to_string();
81 }
82 old_value = map.get(&key).cloned().unwrap_or(Value::Null);
83 let value_to_insert = if old_value.is_object() && value.is_object() {
85 let mut merged_map = old_value.as_object().unwrap().clone();
86 if let Some(new_map) = value.as_object() {
87 for (k, v) in new_map {
89 merged_map.insert(k.clone(), v.clone());
90 }
91 }
92 Value::Object(merged_map)
93 } else {
94 value.clone()
95 };
96 map.insert(key, value_to_insert);
97 }
98 }
99 } else {
100 if is_numeric {
102 if !current.is_array() {
104 *current = Value::Array(vec![]);
105 }
106
107 if let Ok(index) = part.parse::<usize>() {
108 if let Value::Array(arr) = current {
109 while arr.len() <= index {
111 arr.push(Value::Null);
112 }
113 if arr[index].is_null() {
115 let next_part = path_parts.get(i + 1).unwrap_or(&"");
117 if is_numeric_index(next_part) {
118 arr[index] = Value::Array(vec![]);
119 } else {
120 arr[index] = json!({});
121 }
122 }
123 current = &mut arr[index];
124 }
125 } else {
126 error!("Invalid array index: {part}");
127 return Err(DataflowError::Validation(format!(
128 "Invalid array index: {part}"
129 )));
130 }
131 } else {
132 if !current.is_object() {
134 *current = Value::Object(serde_json::Map::new());
135 }
136
137 if let Value::Object(map) = current {
138 let mut key = part.to_string();
139 if key.starts_with("#") {
140 key = key.strip_prefix("#").unwrap_or(&key).to_string();
141 }
142 if !map.contains_key(&key) {
143 let next_part = path_parts.get(i + 1).unwrap_or(&"");
145 if is_numeric_index(next_part) {
146 map.insert(part.to_string(), Value::Array(vec![]));
147 } else {
148 map.insert(key.clone(), json!({}));
149 }
150 }
151 current = map.get_mut(&key).unwrap();
152 }
153 }
154 }
155 }
156
157 Ok(old_value)
158 }
159}
160
161#[async_trait]
162impl AsyncFunctionHandler for MapFunction {
163 async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
164 let mappings = input.get("mappings").ok_or_else(|| {
166 DataflowError::Validation("Missing 'mappings' array in input".to_string())
167 })?;
168
169 let mappings_arr = mappings
170 .as_array()
171 .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
172
173 let mut changes = Vec::new();
174
175 for mapping in mappings_arr {
177 let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
179 DataflowError::Validation("Missing 'path' in mapping".to_string())
180 })?;
181
182 let logic = mapping.get("logic").ok_or_else(|| {
184 DataflowError::Validation("Missing 'logic' in mapping".to_string())
185 })?;
186
187 let data_clone = message.data.clone();
189 let metadata_clone = message.metadata.clone();
190 let temp_data_clone = message.temp_data.clone();
191
192 let data_for_eval = json!({
194 "data": data_clone,
195 "metadata": metadata_clone,
196 "temp_data": temp_data_clone,
197 });
198
199 let (target_object, adjusted_path) =
201 if let Some(path) = target_path.strip_prefix("data.") {
202 (&mut message.data, path)
203 } else if let Some(path) = target_path.strip_prefix("metadata.") {
204 (&mut message.metadata, path)
205 } else if let Some(path) = target_path.strip_prefix("temp_data.") {
206 (&mut message.temp_data, path)
207 } else if target_path == "data" {
208 (&mut message.data, "")
209 } else if target_path == "metadata" {
210 (&mut message.metadata, "")
211 } else if target_path == "temp_data" {
212 (&mut message.temp_data, "")
213 } else {
214 (&mut message.data, target_path)
216 };
217
218 let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
220 let mut data_logic = data_logic_cell.borrow_mut();
221 data_logic.reset_arena();
222
223 data_logic
224 .evaluate_json(logic, &data_for_eval, None)
225 .map_err(|e| {
226 error!("Failed to evaluate logic: {e} for {logic}, {data_for_eval}");
227 DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {e}"))
228 })
229 })?;
230
231 if result.is_null() {
232 continue;
233 }
234
235 if adjusted_path.is_empty() {
237 let old_value = if target_object.is_object() && result.is_object() {
239 let mut merged_map = target_object.as_object().unwrap().clone();
240 if let Some(new_map) = result.as_object() {
241 for (k, v) in new_map {
243 merged_map.insert(k.clone(), v.clone());
244 }
245 }
246 std::mem::replace(target_object, Value::Object(merged_map))
247 } else {
248 std::mem::replace(target_object, result.clone())
249 };
250 changes.push(Change {
251 path: target_path.to_string(),
252 old_value,
253 new_value: result,
254 });
255 } else {
256 let old_value = self.set_value_at_path(target_object, adjusted_path, &result)?;
258 changes.push(Change {
259 path: target_path.to_string(),
260 old_value,
261 new_value: result,
262 });
263 }
264 }
265
266 Ok((200, changes))
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use crate::engine::message::Message;
274 use serde_json::json;
275
276 #[tokio::test]
277 async fn test_array_notation_simple() {
278 let map_fn = MapFunction::new();
279
280 let mut message = Message::new(&json!({}));
282 message.data = json!({});
283
284 let input = json!({
285 "mappings": [
286 {
287 "path": "data.items.0.name",
288 "logic": "Test Item"
289 }
290 ]
291 });
292
293 let result = map_fn.execute(&mut message, &input).await;
294
295 assert!(result.is_ok());
296 let expected = json!({
297 "items": [
298 {
299 "name": "Test Item"
300 }
301 ]
302 });
303 assert_eq!(message.data, expected);
304 }
305
306 #[tokio::test]
307 async fn test_array_notation_complex_path() {
308 let map_fn = MapFunction::new();
309
310 let mut message = Message::new(&json!({}));
312 message.data = json!({});
313
314 let input = json!({
315 "mappings": [
316 {
317 "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
318 "logic": "INSTR123"
319 }
320 ]
321 });
322
323 let result = map_fn.execute(&mut message, &input).await;
324
325 assert!(result.is_ok());
326 let expected = json!({
327 "MX": {
328 "FIToFICstmrCdtTrf": {
329 "CdtTrfTxInf": [
330 {
331 "PmtId": {
332 "InstrId": "INSTR123"
333 }
334 }
335 ]
336 }
337 }
338 });
339 assert_eq!(message.data, expected);
340 }
341
342 #[tokio::test]
343 async fn test_multiple_array_indices() {
344 let map_fn = MapFunction::new();
345
346 let mut message = Message::new(&json!({}));
348 message.data = json!({});
349
350 let input = json!({
351 "mappings": [
352 {
353 "path": "data.matrix.0.1.value",
354 "logic": "cell_01"
355 },
356 {
357 "path": "data.matrix.1.0.value",
358 "logic": "cell_10"
359 }
360 ]
361 });
362
363 let result = map_fn.execute(&mut message, &input).await;
364
365 assert!(result.is_ok());
366 let expected = json!({
367 "matrix": [
368 [
369 null,
370 {
371 "value": "cell_01"
372 }
373 ],
374 [
375 {
376 "value": "cell_10"
377 }
378 ]
379 ]
380 });
381 assert_eq!(message.data, expected);
382 }
383
384 #[tokio::test]
385 async fn test_array_extension() {
386 let map_fn = MapFunction::new();
387
388 let mut message = Message::new(&json!({}));
390 message.data = json!({});
391
392 let input = json!({
393 "mappings": [
394 {
395 "path": "data.items.5.name",
396 "logic": "Item at index 5"
397 }
398 ]
399 });
400
401 let result = map_fn.execute(&mut message, &input).await;
402
403 assert!(result.is_ok());
404
405 assert!(message.data["items"].is_array());
407 let items_array = message.data["items"].as_array().unwrap();
408 assert_eq!(items_array.len(), 6);
409
410 for i in 0..5 {
412 assert_eq!(items_array[i], json!(null));
413 }
414
415 assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
417 }
418
419 #[tokio::test]
420 async fn test_mixed_array_and_object_notation() {
421 let map_fn = MapFunction::new();
422
423 let mut message = Message::new(&json!({}));
425 message.data = json!({});
426
427 let input = json!({
428 "mappings": [
429 {
430 "path": "data.users.0.profile.addresses.1.city",
431 "logic": "New York"
432 },
433 {
434 "path": "data.users.0.profile.name",
435 "logic": "John Doe"
436 }
437 ]
438 });
439
440 let result = map_fn.execute(&mut message, &input).await;
441
442 assert!(result.is_ok());
443 let expected = json!({
444 "users": [
445 {
446 "profile": {
447 "name": "John Doe",
448 "addresses": [
449 null,
450 {
451 "city": "New York"
452 }
453 ]
454 }
455 }
456 ]
457 });
458 assert_eq!(message.data, expected);
459 }
460
461 #[tokio::test]
462 async fn test_overwrite_existing_value() {
463 let map_fn = MapFunction::new();
464
465 let mut message = Message::new(&json!({}));
467 message.data = json!({
468 "items": [
469 {"name": "Old Value"},
470 {"name": "Another Item"}
471 ]
472 });
473
474 let input = json!({
475 "mappings": [
476 {
477 "path": "data.items.0.name",
478 "logic": "New Value"
479 }
480 ]
481 });
482
483 let result = map_fn.execute(&mut message, &input).await;
484
485 assert!(result.is_ok());
486 let expected = json!({
487 "items": [
488 {"name": "New Value"},
489 {"name": "Another Item"}
490 ]
491 });
492 assert_eq!(message.data, expected);
493
494 let (_, changes) = result.unwrap();
496 assert_eq!(changes.len(), 1);
497 assert_eq!(changes[0].path, "data.items.0.name");
498 assert_eq!(changes[0].old_value, json!("Old Value"));
499 assert_eq!(changes[0].new_value, json!("New Value"));
500 }
501
502 #[tokio::test]
503 async fn test_array_notation_with_jsonlogic() {
504 let map_fn = MapFunction::new();
505
506 let mut message = Message::new(&json!({}));
508 message.temp_data = json!({
509 "transactions": [
510 {"id": "tx1", "amount": 100},
511 {"id": "tx2", "amount": 200}
512 ]
513 });
514 message.data = json!({});
515
516 let input = json!({
517 "mappings": [
518 {
519 "path": "data.processed.0.transaction_id",
520 "logic": {"var": "temp_data.transactions.0.id"}
521 },
522 {
523 "path": "data.processed.0.amount_cents",
524 "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
525 }
526 ]
527 });
528
529 let result = map_fn.execute(&mut message, &input).await;
530
531 assert!(result.is_ok());
532 let expected = json!({
533 "processed": [
534 {
535 "transaction_id": "tx1",
536 "amount_cents": 10000
537 }
538 ]
539 });
540 assert_eq!(message.data, expected);
541 }
542
543 #[tokio::test]
544 async fn test_convert_object_to_array() {
545 let map_fn = MapFunction::new();
546
547 let mut message = Message::new(&json!({}));
549 message.data = json!({
550 "items": {
551 "existing_key": "existing_value"
552 }
553 });
554
555 let input = json!({
556 "mappings": [
557 {
558 "path": "data.items.0.new_value",
559 "logic": "array_item"
560 }
561 ]
562 });
563
564 let result = map_fn.execute(&mut message, &input).await;
565
566 assert!(result.is_ok());
567 assert!(message.data["items"].is_array());
569 let expected = json!({
570 "items": [
571 {
572 "new_value": "array_item"
573 }
574 ]
575 });
576 assert_eq!(message.data, expected);
577 }
578
579 #[tokio::test]
580 async fn test_non_numeric_index_handling() {
581 let map_fn = MapFunction::new();
582
583 let mut message = Message::new(&json!({}));
585 message.data = json!({});
586
587 let input = json!({
588 "mappings": [
589 {
590 "path": "data.items.invalid_index.name",
591 "logic": "test"
592 }
593 ]
594 });
595
596 let result = map_fn.execute(&mut message, &input).await;
597
598 assert!(result.is_ok());
600 let expected = json!({
601 "items": {
602 "invalid_index": {
603 "name": "test"
604 }
605 }
606 });
607 assert_eq!(message.data, expected);
608
609 assert!(message.data["items"].is_object());
611 assert!(!message.data["items"].is_array());
612 }
613
614 #[tokio::test]
615 async fn test_object_merge_on_mapping() {
616 let map_fn = MapFunction::new();
617
618 let mut message = Message::new(&json!({}));
620 message.data = json!({
621 "config": {
622 "database": {
623 "host": "localhost",
624 "port": 5432,
625 "username": "admin"
626 }
627 }
628 });
629
630 let input = json!({
632 "mappings": [
633 {
634 "path": "data.config.database",
635 "logic": {
636 "password": "secret123",
637 "ssl": true,
638 "port": 5433 }
640 }
641 ]
642 });
643
644 let result = map_fn.execute(&mut message, &input).await;
645
646 assert!(result.is_ok());
647 let expected = json!({
648 "config": {
649 "database": {
650 "host": "localhost",
651 "port": 5433, "username": "admin",
653 "password": "secret123", "ssl": true }
656 }
657 });
658 assert_eq!(message.data, expected);
659
660 let (_, changes) = result.unwrap();
662 assert_eq!(changes.len(), 1);
663 assert_eq!(changes[0].path, "data.config.database");
664 assert_eq!(
665 changes[0].old_value,
666 json!({
667 "host": "localhost",
668 "port": 5432,
669 "username": "admin"
670 })
671 );
672 assert_eq!(
673 changes[0].new_value,
674 json!({
675 "password": "secret123",
676 "ssl": true,
677 "port": 5433
678 })
679 );
680 }
681
682 #[tokio::test]
683 async fn test_object_merge_with_nested_path() {
684 let map_fn = MapFunction::new();
685
686 let mut message = Message::new(&json!({}));
688 message.data = json!({
689 "user": {
690 "profile": {
691 "name": "John Doe",
692 "age": 30
693 }
694 }
695 });
696
697 let input = json!({
698 "mappings": [
699 {
700 "path": "data.user.profile",
701 "logic": {
702 "email": "john@example.com",
703 "age": 31, "city": "New York"
705 }
706 }
707 ]
708 });
709
710 let result = map_fn.execute(&mut message, &input).await;
711
712 assert!(result.is_ok());
713 let expected = json!({
714 "user": {
715 "profile": {
716 "name": "John Doe", "age": 31, "email": "john@example.com", "city": "New York" }
721 }
722 });
723 assert_eq!(message.data, expected);
724 }
725
726 #[tokio::test]
727 async fn test_non_object_replacement() {
728 let map_fn = MapFunction::new();
729
730 let mut message = Message::new(&json!({}));
732 message.data = json!({
733 "settings": {
734 "value": "old_string"
735 }
736 });
737
738 let input = json!({
739 "mappings": [
740 {
741 "path": "data.settings.value",
742 "logic": {"new": "object"}
743 }
744 ]
745 });
746
747 let result = map_fn.execute(&mut message, &input).await;
748
749 assert!(result.is_ok());
750 let expected = json!({
752 "settings": {
753 "value": {"new": "object"}
754 }
755 });
756 assert_eq!(message.data, expected);
757 }
758}