dataflow_rs/engine/functions/
map.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use serde_json::{json, Value};
7
8pub struct MapFunction {
13 }
15
16impl Default for MapFunction {
17 fn default() -> Self {
18 Self::new()
19 }
20}
21
22impl MapFunction {
23 pub fn new() -> Self {
25 Self {}
26 }
27
28 fn set_value_at_path(&self, target: &mut Value, path: &str, value: Value) -> Result<Value> {
30 let mut current = target;
31 let mut old_value = Value::Null;
32 let path_parts: Vec<&str> = path.split('.').collect();
33
34 fn is_numeric_index(s: &str) -> bool {
36 s.parse::<usize>().is_ok()
37 }
38
39 for (i, part) in path_parts.iter().enumerate() {
41 let is_numeric = is_numeric_index(part);
42
43 if i == path_parts.len() - 1 {
44 if is_numeric {
46 if !current.is_array() {
48 *current = Value::Array(vec![]);
50 }
51
52 if let Ok(index) = part.parse::<usize>() {
53 if let Value::Array(arr) = current {
54 while arr.len() <= index {
56 arr.push(Value::Null);
57 }
58 old_value = arr[index].clone();
60 arr[index] = value.clone();
61 }
62 } else {
63 return Err(DataflowError::Validation(format!(
64 "Invalid array index: {}",
65 part
66 )));
67 }
68 } else {
69 if !current.is_object() {
71 *current = Value::Object(serde_json::Map::new());
73 }
74
75 if let Value::Object(map) = current {
76 old_value = map.get(*part).cloned().unwrap_or(Value::Null);
78 map.insert(part.to_string(), value.clone());
79 }
80 }
81 } else {
82 if is_numeric {
84 if !current.is_array() {
86 *current = Value::Array(vec![]);
87 }
88
89 if let Ok(index) = part.parse::<usize>() {
90 if let Value::Array(arr) = current {
91 while arr.len() <= index {
93 arr.push(Value::Null);
94 }
95 if arr[index].is_null() {
97 let next_part = path_parts.get(i + 1).unwrap_or(&"");
99 if is_numeric_index(next_part) {
100 arr[index] = Value::Array(vec![]);
101 } else {
102 arr[index] = json!({});
103 }
104 }
105 current = &mut arr[index];
106 }
107 } else {
108 return Err(DataflowError::Validation(format!(
109 "Invalid array index: {}",
110 part
111 )));
112 }
113 } else {
114 if !current.is_object() {
116 *current = Value::Object(serde_json::Map::new());
117 }
118
119 if let Value::Object(map) = current {
120 if !map.contains_key(*part) {
121 let next_part = path_parts.get(i + 1).unwrap_or(&"");
123 if is_numeric_index(next_part) {
124 map.insert(part.to_string(), Value::Array(vec![]));
125 } else {
126 map.insert(part.to_string(), json!({}));
127 }
128 }
129 current = map.get_mut(*part).unwrap();
130 }
131 }
132 }
133 }
134
135 Ok(old_value)
136 }
137}
138
139#[async_trait]
140impl AsyncFunctionHandler for MapFunction {
141 async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
142 let mappings = input.get("mappings").ok_or_else(|| {
144 DataflowError::Validation("Missing 'mappings' array in input".to_string())
145 })?;
146
147 let mappings_arr = mappings
148 .as_array()
149 .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
150
151 let mut changes = Vec::new();
152
153 for mapping in mappings_arr {
155 let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
157 DataflowError::Validation("Missing 'path' in mapping".to_string())
158 })?;
159
160 let logic = mapping.get("logic").ok_or_else(|| {
162 DataflowError::Validation("Missing 'logic' in mapping".to_string())
163 })?;
164
165 let data_clone = message.data.clone();
167 let metadata_clone = message.metadata.clone();
168 let temp_data_clone = message.temp_data.clone();
169
170 let data_for_eval = json!({
172 "data": data_clone,
173 "metadata": metadata_clone,
174 "temp_data": temp_data_clone,
175 });
176
177 let (target_object, adjusted_path) =
179 if let Some(path) = target_path.strip_prefix("data.") {
180 (&mut message.data, path)
181 } else if let Some(path) = target_path.strip_prefix("metadata.") {
182 (&mut message.metadata, path)
183 } else if let Some(path) = target_path.strip_prefix("temp_data.") {
184 (&mut message.temp_data, path)
185 } else if target_path == "data" {
186 (&mut message.data, "")
187 } else if target_path == "metadata" {
188 (&mut message.metadata, "")
189 } else if target_path == "temp_data" {
190 (&mut message.temp_data, "")
191 } else {
192 (&mut message.data, target_path)
194 };
195
196 let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
198 let mut data_logic = data_logic_cell.borrow_mut();
199 data_logic.reset_arena();
200
201 data_logic
202 .evaluate_json(logic, &data_for_eval, None)
203 .map_err(|e| {
204 DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {}", e))
205 })
206 })?;
207
208 if result.is_null() {
209 continue;
210 }
211
212 if adjusted_path.is_empty() {
214 let old_value = std::mem::replace(target_object, result.clone());
216 changes.push(Change {
217 path: target_path.to_string(),
218 old_value,
219 new_value: result,
220 });
221 } else {
222 let old_value =
224 self.set_value_at_path(target_object, adjusted_path, result.clone())?;
225 changes.push(Change {
226 path: target_path.to_string(),
227 old_value,
228 new_value: result,
229 });
230 }
231 }
232
233 Ok((200, changes))
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::engine::message::Message;
241 use serde_json::json;
242
243 #[tokio::test]
244 async fn test_array_notation_simple() {
245 let map_fn = MapFunction::new();
246
247 let mut message = Message::new(&json!({}));
249 message.data = json!({});
250
251 let input = json!({
252 "mappings": [
253 {
254 "path": "data.items.0.name",
255 "logic": "Test Item"
256 }
257 ]
258 });
259
260 let result = map_fn.execute(&mut message, &input).await;
261
262 assert!(result.is_ok());
263 let expected = json!({
264 "items": [
265 {
266 "name": "Test Item"
267 }
268 ]
269 });
270 assert_eq!(message.data, expected);
271 }
272
273 #[tokio::test]
274 async fn test_array_notation_complex_path() {
275 let map_fn = MapFunction::new();
276
277 let mut message = Message::new(&json!({}));
279 message.data = json!({});
280
281 let input = json!({
282 "mappings": [
283 {
284 "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
285 "logic": "INSTR123"
286 }
287 ]
288 });
289
290 let result = map_fn.execute(&mut message, &input).await;
291
292 assert!(result.is_ok());
293 let expected = json!({
294 "MX": {
295 "FIToFICstmrCdtTrf": {
296 "CdtTrfTxInf": [
297 {
298 "PmtId": {
299 "InstrId": "INSTR123"
300 }
301 }
302 ]
303 }
304 }
305 });
306 assert_eq!(message.data, expected);
307 }
308
309 #[tokio::test]
310 async fn test_multiple_array_indices() {
311 let map_fn = MapFunction::new();
312
313 let mut message = Message::new(&json!({}));
315 message.data = json!({});
316
317 let input = json!({
318 "mappings": [
319 {
320 "path": "data.matrix.0.1.value",
321 "logic": "cell_01"
322 },
323 {
324 "path": "data.matrix.1.0.value",
325 "logic": "cell_10"
326 }
327 ]
328 });
329
330 let result = map_fn.execute(&mut message, &input).await;
331
332 assert!(result.is_ok());
333 let expected = json!({
334 "matrix": [
335 [
336 null,
337 {
338 "value": "cell_01"
339 }
340 ],
341 [
342 {
343 "value": "cell_10"
344 }
345 ]
346 ]
347 });
348 assert_eq!(message.data, expected);
349 }
350
351 #[tokio::test]
352 async fn test_array_extension() {
353 let map_fn = MapFunction::new();
354
355 let mut message = Message::new(&json!({}));
357 message.data = json!({});
358
359 let input = json!({
360 "mappings": [
361 {
362 "path": "data.items.5.name",
363 "logic": "Item at index 5"
364 }
365 ]
366 });
367
368 let result = map_fn.execute(&mut message, &input).await;
369
370 assert!(result.is_ok());
371
372 assert!(message.data["items"].is_array());
374 let items_array = message.data["items"].as_array().unwrap();
375 assert_eq!(items_array.len(), 6);
376
377 for i in 0..5 {
379 assert_eq!(items_array[i], json!(null));
380 }
381
382 assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
384 }
385
386 #[tokio::test]
387 async fn test_mixed_array_and_object_notation() {
388 let map_fn = MapFunction::new();
389
390 let mut message = Message::new(&json!({}));
392 message.data = json!({});
393
394 let input = json!({
395 "mappings": [
396 {
397 "path": "data.users.0.profile.addresses.1.city",
398 "logic": "New York"
399 },
400 {
401 "path": "data.users.0.profile.name",
402 "logic": "John Doe"
403 }
404 ]
405 });
406
407 let result = map_fn.execute(&mut message, &input).await;
408
409 assert!(result.is_ok());
410 let expected = json!({
411 "users": [
412 {
413 "profile": {
414 "name": "John Doe",
415 "addresses": [
416 null,
417 {
418 "city": "New York"
419 }
420 ]
421 }
422 }
423 ]
424 });
425 assert_eq!(message.data, expected);
426 }
427
428 #[tokio::test]
429 async fn test_overwrite_existing_value() {
430 let map_fn = MapFunction::new();
431
432 let mut message = Message::new(&json!({}));
434 message.data = json!({
435 "items": [
436 {"name": "Old Value"},
437 {"name": "Another Item"}
438 ]
439 });
440
441 let input = json!({
442 "mappings": [
443 {
444 "path": "data.items.0.name",
445 "logic": "New Value"
446 }
447 ]
448 });
449
450 let result = map_fn.execute(&mut message, &input).await;
451
452 assert!(result.is_ok());
453 let expected = json!({
454 "items": [
455 {"name": "New Value"},
456 {"name": "Another Item"}
457 ]
458 });
459 assert_eq!(message.data, expected);
460
461 let (_, changes) = result.unwrap();
463 assert_eq!(changes.len(), 1);
464 assert_eq!(changes[0].path, "data.items.0.name");
465 assert_eq!(changes[0].old_value, json!("Old Value"));
466 assert_eq!(changes[0].new_value, json!("New Value"));
467 }
468
469 #[tokio::test]
470 async fn test_array_notation_with_jsonlogic() {
471 let map_fn = MapFunction::new();
472
473 let mut message = Message::new(&json!({}));
475 message.temp_data = json!({
476 "transactions": [
477 {"id": "tx1", "amount": 100},
478 {"id": "tx2", "amount": 200}
479 ]
480 });
481 message.data = json!({});
482
483 let input = json!({
484 "mappings": [
485 {
486 "path": "data.processed.0.transaction_id",
487 "logic": {"var": "temp_data.transactions.0.id"}
488 },
489 {
490 "path": "data.processed.0.amount_cents",
491 "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
492 }
493 ]
494 });
495
496 let result = map_fn.execute(&mut message, &input).await;
497
498 assert!(result.is_ok());
499 let expected = json!({
500 "processed": [
501 {
502 "transaction_id": "tx1",
503 "amount_cents": 10000
504 }
505 ]
506 });
507 assert_eq!(message.data, expected);
508 }
509
510 #[tokio::test]
511 async fn test_convert_object_to_array() {
512 let map_fn = MapFunction::new();
513
514 let mut message = Message::new(&json!({}));
516 message.data = json!({
517 "items": {
518 "existing_key": "existing_value"
519 }
520 });
521
522 let input = json!({
523 "mappings": [
524 {
525 "path": "data.items.0.new_value",
526 "logic": "array_item"
527 }
528 ]
529 });
530
531 let result = map_fn.execute(&mut message, &input).await;
532
533 assert!(result.is_ok());
534 assert!(message.data["items"].is_array());
536 let expected = json!({
537 "items": [
538 {
539 "new_value": "array_item"
540 }
541 ]
542 });
543 assert_eq!(message.data, expected);
544 }
545
546 #[tokio::test]
547 async fn test_non_numeric_index_handling() {
548 let map_fn = MapFunction::new();
549
550 let mut message = Message::new(&json!({}));
552 message.data = json!({});
553
554 let input = json!({
555 "mappings": [
556 {
557 "path": "data.items.invalid_index.name",
558 "logic": "test"
559 }
560 ]
561 });
562
563 let result = map_fn.execute(&mut message, &input).await;
564
565 assert!(result.is_ok());
567 let expected = json!({
568 "items": {
569 "invalid_index": {
570 "name": "test"
571 }
572 }
573 });
574 assert_eq!(message.data, expected);
575
576 assert!(message.data["items"].is_object());
578 assert!(!message.data["items"].is_array());
579 }
580}