dataflow_rs/engine/functions/
map.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use log::error;
7use serde_json::{json, Value};
8
9pub struct MapFunction {
14 }
16
17impl Default for MapFunction {
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl MapFunction {
24 pub fn new() -> Self {
26 Self {}
27 }
28
29 fn set_value_at_path(&self, target: &mut Value, path: &str, value: Value) -> Result<Value> {
31 let mut current = target;
32 let mut old_value = Value::Null;
33 let path_parts: Vec<&str> = path.split('.').collect();
34
35 fn is_numeric_index(s: &str) -> bool {
37 s.parse::<usize>().is_ok()
38 }
39
40 for (i, part) in path_parts.iter().enumerate() {
42 let is_numeric = is_numeric_index(part);
43
44 if i == path_parts.len() - 1 {
45 if is_numeric {
47 if !current.is_array() {
49 *current = Value::Array(vec![]);
51 }
52
53 if let Ok(index) = part.parse::<usize>() {
54 if let Value::Array(arr) = current {
55 while arr.len() <= index {
57 arr.push(Value::Null);
58 }
59 old_value = arr[index].clone();
61 arr[index] = value.clone();
62 }
63 } else {
64 error!("Invalid array index: {part}");
65 return Err(DataflowError::Validation(format!(
66 "Invalid array index: {part}"
67 )));
68 }
69 } else {
70 if !current.is_object() {
72 *current = Value::Object(serde_json::Map::new());
74 }
75
76 if let Value::Object(map) = current {
77 let mut key = part.to_string();
79 if key.starts_with("#") {
80 key = key.strip_prefix("#").unwrap_or(&key).to_string();
81 }
82 old_value = map.get(&key).cloned().unwrap_or(Value::Null);
83 map.insert(key, value.clone());
84 }
85 }
86 } else {
87 if is_numeric {
89 if !current.is_array() {
91 *current = Value::Array(vec![]);
92 }
93
94 if let Ok(index) = part.parse::<usize>() {
95 if let Value::Array(arr) = current {
96 while arr.len() <= index {
98 arr.push(Value::Null);
99 }
100 if arr[index].is_null() {
102 let next_part = path_parts.get(i + 1).unwrap_or(&"");
104 if is_numeric_index(next_part) {
105 arr[index] = Value::Array(vec![]);
106 } else {
107 arr[index] = json!({});
108 }
109 }
110 current = &mut arr[index];
111 }
112 } else {
113 error!("Invalid array index: {part}");
114 return Err(DataflowError::Validation(format!(
115 "Invalid array index: {part}"
116 )));
117 }
118 } else {
119 if !current.is_object() {
121 *current = Value::Object(serde_json::Map::new());
122 }
123
124 if let Value::Object(map) = current {
125 let mut key = part.to_string();
126 if key.starts_with("#") {
127 key = key.strip_prefix("#").unwrap_or(&key).to_string();
128 }
129 if !map.contains_key(&key) {
130 let next_part = path_parts.get(i + 1).unwrap_or(&"");
132 if is_numeric_index(next_part) {
133 map.insert(part.to_string(), Value::Array(vec![]));
134 } else {
135 map.insert(key.clone(), json!({}));
136 }
137 }
138 current = map.get_mut(&key).unwrap();
139 }
140 }
141 }
142 }
143
144 Ok(old_value)
145 }
146}
147
148#[async_trait]
149impl AsyncFunctionHandler for MapFunction {
150 async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
151 let mappings = input.get("mappings").ok_or_else(|| {
153 DataflowError::Validation("Missing 'mappings' array in input".to_string())
154 })?;
155
156 let mappings_arr = mappings
157 .as_array()
158 .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
159
160 let mut changes = Vec::new();
161
162 for mapping in mappings_arr {
164 let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
166 DataflowError::Validation("Missing 'path' in mapping".to_string())
167 })?;
168
169 let logic = mapping.get("logic").ok_or_else(|| {
171 DataflowError::Validation("Missing 'logic' in mapping".to_string())
172 })?;
173
174 let data_clone = message.data.clone();
176 let metadata_clone = message.metadata.clone();
177 let temp_data_clone = message.temp_data.clone();
178
179 let data_for_eval = json!({
181 "data": data_clone,
182 "metadata": metadata_clone,
183 "temp_data": temp_data_clone,
184 });
185
186 let (target_object, adjusted_path) =
188 if let Some(path) = target_path.strip_prefix("data.") {
189 (&mut message.data, path)
190 } else if let Some(path) = target_path.strip_prefix("metadata.") {
191 (&mut message.metadata, path)
192 } else if let Some(path) = target_path.strip_prefix("temp_data.") {
193 (&mut message.temp_data, path)
194 } else if target_path == "data" {
195 (&mut message.data, "")
196 } else if target_path == "metadata" {
197 (&mut message.metadata, "")
198 } else if target_path == "temp_data" {
199 (&mut message.temp_data, "")
200 } else {
201 (&mut message.data, target_path)
203 };
204
205 let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
207 let mut data_logic = data_logic_cell.borrow_mut();
208 data_logic.reset_arena();
209
210 data_logic
211 .evaluate_json(logic, &data_for_eval, None)
212 .map_err(|e| {
213 error!("Failed to evaluate logic: {e} for {logic}, {data_for_eval}");
214 DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {e}"))
215 })
216 })?;
217
218 if result.is_null() {
219 continue;
220 }
221
222 if adjusted_path.is_empty() {
224 let old_value = std::mem::replace(target_object, result.clone());
226 changes.push(Change {
227 path: target_path.to_string(),
228 old_value,
229 new_value: result,
230 });
231 } else {
232 let old_value =
234 self.set_value_at_path(target_object, adjusted_path, result.clone())?;
235 changes.push(Change {
236 path: target_path.to_string(),
237 old_value,
238 new_value: result,
239 });
240 }
241 }
242
243 Ok((200, changes))
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use crate::engine::message::Message;
251 use serde_json::json;
252
253 #[tokio::test]
254 async fn test_array_notation_simple() {
255 let map_fn = MapFunction::new();
256
257 let mut message = Message::new(&json!({}));
259 message.data = json!({});
260
261 let input = json!({
262 "mappings": [
263 {
264 "path": "data.items.0.name",
265 "logic": "Test Item"
266 }
267 ]
268 });
269
270 let result = map_fn.execute(&mut message, &input).await;
271
272 assert!(result.is_ok());
273 let expected = json!({
274 "items": [
275 {
276 "name": "Test Item"
277 }
278 ]
279 });
280 assert_eq!(message.data, expected);
281 }
282
283 #[tokio::test]
284 async fn test_array_notation_complex_path() {
285 let map_fn = MapFunction::new();
286
287 let mut message = Message::new(&json!({}));
289 message.data = json!({});
290
291 let input = json!({
292 "mappings": [
293 {
294 "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
295 "logic": "INSTR123"
296 }
297 ]
298 });
299
300 let result = map_fn.execute(&mut message, &input).await;
301
302 assert!(result.is_ok());
303 let expected = json!({
304 "MX": {
305 "FIToFICstmrCdtTrf": {
306 "CdtTrfTxInf": [
307 {
308 "PmtId": {
309 "InstrId": "INSTR123"
310 }
311 }
312 ]
313 }
314 }
315 });
316 assert_eq!(message.data, expected);
317 }
318
319 #[tokio::test]
320 async fn test_multiple_array_indices() {
321 let map_fn = MapFunction::new();
322
323 let mut message = Message::new(&json!({}));
325 message.data = json!({});
326
327 let input = json!({
328 "mappings": [
329 {
330 "path": "data.matrix.0.1.value",
331 "logic": "cell_01"
332 },
333 {
334 "path": "data.matrix.1.0.value",
335 "logic": "cell_10"
336 }
337 ]
338 });
339
340 let result = map_fn.execute(&mut message, &input).await;
341
342 assert!(result.is_ok());
343 let expected = json!({
344 "matrix": [
345 [
346 null,
347 {
348 "value": "cell_01"
349 }
350 ],
351 [
352 {
353 "value": "cell_10"
354 }
355 ]
356 ]
357 });
358 assert_eq!(message.data, expected);
359 }
360
361 #[tokio::test]
362 async fn test_array_extension() {
363 let map_fn = MapFunction::new();
364
365 let mut message = Message::new(&json!({}));
367 message.data = json!({});
368
369 let input = json!({
370 "mappings": [
371 {
372 "path": "data.items.5.name",
373 "logic": "Item at index 5"
374 }
375 ]
376 });
377
378 let result = map_fn.execute(&mut message, &input).await;
379
380 assert!(result.is_ok());
381
382 assert!(message.data["items"].is_array());
384 let items_array = message.data["items"].as_array().unwrap();
385 assert_eq!(items_array.len(), 6);
386
387 for i in 0..5 {
389 assert_eq!(items_array[i], json!(null));
390 }
391
392 assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
394 }
395
396 #[tokio::test]
397 async fn test_mixed_array_and_object_notation() {
398 let map_fn = MapFunction::new();
399
400 let mut message = Message::new(&json!({}));
402 message.data = json!({});
403
404 let input = json!({
405 "mappings": [
406 {
407 "path": "data.users.0.profile.addresses.1.city",
408 "logic": "New York"
409 },
410 {
411 "path": "data.users.0.profile.name",
412 "logic": "John Doe"
413 }
414 ]
415 });
416
417 let result = map_fn.execute(&mut message, &input).await;
418
419 assert!(result.is_ok());
420 let expected = json!({
421 "users": [
422 {
423 "profile": {
424 "name": "John Doe",
425 "addresses": [
426 null,
427 {
428 "city": "New York"
429 }
430 ]
431 }
432 }
433 ]
434 });
435 assert_eq!(message.data, expected);
436 }
437
438 #[tokio::test]
439 async fn test_overwrite_existing_value() {
440 let map_fn = MapFunction::new();
441
442 let mut message = Message::new(&json!({}));
444 message.data = json!({
445 "items": [
446 {"name": "Old Value"},
447 {"name": "Another Item"}
448 ]
449 });
450
451 let input = json!({
452 "mappings": [
453 {
454 "path": "data.items.0.name",
455 "logic": "New Value"
456 }
457 ]
458 });
459
460 let result = map_fn.execute(&mut message, &input).await;
461
462 assert!(result.is_ok());
463 let expected = json!({
464 "items": [
465 {"name": "New Value"},
466 {"name": "Another Item"}
467 ]
468 });
469 assert_eq!(message.data, expected);
470
471 let (_, changes) = result.unwrap();
473 assert_eq!(changes.len(), 1);
474 assert_eq!(changes[0].path, "data.items.0.name");
475 assert_eq!(changes[0].old_value, json!("Old Value"));
476 assert_eq!(changes[0].new_value, json!("New Value"));
477 }
478
479 #[tokio::test]
480 async fn test_array_notation_with_jsonlogic() {
481 let map_fn = MapFunction::new();
482
483 let mut message = Message::new(&json!({}));
485 message.temp_data = json!({
486 "transactions": [
487 {"id": "tx1", "amount": 100},
488 {"id": "tx2", "amount": 200}
489 ]
490 });
491 message.data = json!({});
492
493 let input = json!({
494 "mappings": [
495 {
496 "path": "data.processed.0.transaction_id",
497 "logic": {"var": "temp_data.transactions.0.id"}
498 },
499 {
500 "path": "data.processed.0.amount_cents",
501 "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
502 }
503 ]
504 });
505
506 let result = map_fn.execute(&mut message, &input).await;
507
508 assert!(result.is_ok());
509 let expected = json!({
510 "processed": [
511 {
512 "transaction_id": "tx1",
513 "amount_cents": 10000
514 }
515 ]
516 });
517 assert_eq!(message.data, expected);
518 }
519
520 #[tokio::test]
521 async fn test_convert_object_to_array() {
522 let map_fn = MapFunction::new();
523
524 let mut message = Message::new(&json!({}));
526 message.data = json!({
527 "items": {
528 "existing_key": "existing_value"
529 }
530 });
531
532 let input = json!({
533 "mappings": [
534 {
535 "path": "data.items.0.new_value",
536 "logic": "array_item"
537 }
538 ]
539 });
540
541 let result = map_fn.execute(&mut message, &input).await;
542
543 assert!(result.is_ok());
544 assert!(message.data["items"].is_array());
546 let expected = json!({
547 "items": [
548 {
549 "new_value": "array_item"
550 }
551 ]
552 });
553 assert_eq!(message.data, expected);
554 }
555
556 #[tokio::test]
557 async fn test_non_numeric_index_handling() {
558 let map_fn = MapFunction::new();
559
560 let mut message = Message::new(&json!({}));
562 message.data = json!({});
563
564 let input = json!({
565 "mappings": [
566 {
567 "path": "data.items.invalid_index.name",
568 "logic": "test"
569 }
570 ]
571 });
572
573 let result = map_fn.execute(&mut message, &input).await;
574
575 assert!(result.is_ok());
577 let expected = json!({
578 "items": {
579 "invalid_index": {
580 "name": "test"
581 }
582 }
583 });
584 assert_eq!(message.data, expected);
585
586 assert!(message.data["items"].is_object());
588 assert!(!message.data["items"].is_array());
589 }
590}