dataflow_rs/engine/functions/
map.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use serde_json::{json, Value};
7
8pub struct MapFunction {
13 }
15
16impl Default for MapFunction {
17 fn default() -> Self {
18 Self::new()
19 }
20}
21
22impl MapFunction {
23 pub fn new() -> Self {
25 Self {}
26 }
27
28 fn set_value_at_path(&self, target: &mut Value, path: &str, value: Value) -> Result<Value> {
30 let mut current = target;
31 let mut old_value = Value::Null;
32 let path_parts: Vec<&str> = path.split('.').collect();
33
34 fn is_numeric_index(s: &str) -> bool {
36 s.parse::<usize>().is_ok()
37 }
38
39 for (i, part) in path_parts.iter().enumerate() {
41 let is_numeric = is_numeric_index(part);
42
43 if i == path_parts.len() - 1 {
44 if is_numeric {
46 if !current.is_array() {
48 *current = Value::Array(vec![]);
50 }
51
52 if let Ok(index) = part.parse::<usize>() {
53 if let Value::Array(arr) = current {
54 while arr.len() <= index {
56 arr.push(Value::Null);
57 }
58 old_value = arr[index].clone();
60 arr[index] = value.clone();
61 }
62 } else {
63 return Err(DataflowError::Validation(format!(
64 "Invalid array index: {}",
65 part
66 )));
67 }
68 } else {
69 if !current.is_object() {
71 *current = Value::Object(serde_json::Map::new());
73 }
74
75 if let Value::Object(map) = current {
76 old_value = map.get(*part).cloned().unwrap_or(Value::Null);
78 map.insert(part.to_string(), value.clone());
79 }
80 }
81 } else {
82 if is_numeric {
84 if !current.is_array() {
86 *current = Value::Array(vec![]);
87 }
88
89 if let Ok(index) = part.parse::<usize>() {
90 if let Value::Array(arr) = current {
91 while arr.len() <= index {
93 arr.push(Value::Null);
94 }
95 if arr[index].is_null() {
97 let next_part = path_parts.get(i + 1).unwrap_or(&"");
99 if is_numeric_index(next_part) {
100 arr[index] = Value::Array(vec![]);
101 } else {
102 arr[index] = json!({});
103 }
104 }
105 current = &mut arr[index];
106 }
107 } else {
108 return Err(DataflowError::Validation(format!(
109 "Invalid array index: {}",
110 part
111 )));
112 }
113 } else {
114 if !current.is_object() {
116 *current = Value::Object(serde_json::Map::new());
117 }
118
119 if let Value::Object(map) = current {
120 if !map.contains_key(*part) {
121 let next_part = path_parts.get(i + 1).unwrap_or(&"");
123 if is_numeric_index(next_part) {
124 map.insert(part.to_string(), Value::Array(vec![]));
125 } else {
126 map.insert(part.to_string(), json!({}));
127 }
128 }
129 current = map.get_mut(*part).unwrap();
130 }
131 }
132 }
133 }
134
135 Ok(old_value)
136 }
137}
138
139#[async_trait]
140impl AsyncFunctionHandler for MapFunction {
141 async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
142 let mappings = input.get("mappings").ok_or_else(|| {
144 DataflowError::Validation("Missing 'mappings' array in input".to_string())
145 })?;
146
147 let mappings_arr = mappings
148 .as_array()
149 .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
150
151 let mut changes = Vec::new();
152
153 for mapping in mappings_arr {
155 let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
157 DataflowError::Validation("Missing 'path' in mapping".to_string())
158 })?;
159
160 let logic = mapping.get("logic").ok_or_else(|| {
162 DataflowError::Validation("Missing 'logic' in mapping".to_string())
163 })?;
164
165 let data_clone = message.data.clone();
167 let metadata_clone = message.metadata.clone();
168 let temp_data_clone = message.temp_data.clone();
169
170 let data_for_eval = json!({
172 "data": data_clone,
173 "metadata": metadata_clone,
174 "temp_data": temp_data_clone,
175 });
176
177 let (target_object, adjusted_path) =
179 if let Some(path) = target_path.strip_prefix("data.") {
180 (&mut message.data, path)
181 } else if let Some(path) = target_path.strip_prefix("metadata.") {
182 (&mut message.metadata, path)
183 } else if let Some(path) = target_path.strip_prefix("temp_data.") {
184 (&mut message.temp_data, path)
185 } else if target_path == "data" {
186 (&mut message.data, "")
187 } else if target_path == "metadata" {
188 (&mut message.metadata, "")
189 } else if target_path == "temp_data" {
190 (&mut message.temp_data, "")
191 } else {
192 (&mut message.data, target_path)
194 };
195
196 let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
198 let mut data_logic = data_logic_cell.borrow_mut();
199 data_logic.reset_arena();
200
201 data_logic
202 .evaluate_json(logic, &data_for_eval, None)
203 .map_err(|e| {
204 DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {}", e))
205 })
206 })?;
207
208 if adjusted_path.is_empty() {
210 let old_value = std::mem::replace(target_object, result.clone());
212 changes.push(Change {
213 path: target_path.to_string(),
214 old_value,
215 new_value: result,
216 });
217 } else {
218 let old_value =
220 self.set_value_at_path(target_object, adjusted_path, result.clone())?;
221 changes.push(Change {
222 path: target_path.to_string(),
223 old_value,
224 new_value: result,
225 });
226 }
227 }
228
229 Ok((200, changes))
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use crate::engine::message::Message;
237 use serde_json::json;
238
239 #[tokio::test]
240 async fn test_array_notation_simple() {
241 let map_fn = MapFunction::new();
242
243 let mut message = Message::new(&json!({}));
245 message.data = json!({});
246
247 let input = json!({
248 "mappings": [
249 {
250 "path": "data.items.0.name",
251 "logic": "Test Item"
252 }
253 ]
254 });
255
256 let result = map_fn.execute(&mut message, &input).await;
257
258 assert!(result.is_ok());
259 let expected = json!({
260 "items": [
261 {
262 "name": "Test Item"
263 }
264 ]
265 });
266 assert_eq!(message.data, expected);
267 }
268
269 #[tokio::test]
270 async fn test_array_notation_complex_path() {
271 let map_fn = MapFunction::new();
272
273 let mut message = Message::new(&json!({}));
275 message.data = json!({});
276
277 let input = json!({
278 "mappings": [
279 {
280 "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
281 "logic": "INSTR123"
282 }
283 ]
284 });
285
286 let result = map_fn.execute(&mut message, &input).await;
287
288 assert!(result.is_ok());
289 let expected = json!({
290 "MX": {
291 "FIToFICstmrCdtTrf": {
292 "CdtTrfTxInf": [
293 {
294 "PmtId": {
295 "InstrId": "INSTR123"
296 }
297 }
298 ]
299 }
300 }
301 });
302 assert_eq!(message.data, expected);
303 }
304
305 #[tokio::test]
306 async fn test_multiple_array_indices() {
307 let map_fn = MapFunction::new();
308
309 let mut message = Message::new(&json!({}));
311 message.data = json!({});
312
313 let input = json!({
314 "mappings": [
315 {
316 "path": "data.matrix.0.1.value",
317 "logic": "cell_01"
318 },
319 {
320 "path": "data.matrix.1.0.value",
321 "logic": "cell_10"
322 }
323 ]
324 });
325
326 let result = map_fn.execute(&mut message, &input).await;
327
328 assert!(result.is_ok());
329 let expected = json!({
330 "matrix": [
331 [
332 null,
333 {
334 "value": "cell_01"
335 }
336 ],
337 [
338 {
339 "value": "cell_10"
340 }
341 ]
342 ]
343 });
344 assert_eq!(message.data, expected);
345 }
346
347 #[tokio::test]
348 async fn test_array_extension() {
349 let map_fn = MapFunction::new();
350
351 let mut message = Message::new(&json!({}));
353 message.data = json!({});
354
355 let input = json!({
356 "mappings": [
357 {
358 "path": "data.items.5.name",
359 "logic": "Item at index 5"
360 }
361 ]
362 });
363
364 let result = map_fn.execute(&mut message, &input).await;
365
366 assert!(result.is_ok());
367
368 assert!(message.data["items"].is_array());
370 let items_array = message.data["items"].as_array().unwrap();
371 assert_eq!(items_array.len(), 6);
372
373 for i in 0..5 {
375 assert_eq!(items_array[i], json!(null));
376 }
377
378 assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
380 }
381
382 #[tokio::test]
383 async fn test_mixed_array_and_object_notation() {
384 let map_fn = MapFunction::new();
385
386 let mut message = Message::new(&json!({}));
388 message.data = json!({});
389
390 let input = json!({
391 "mappings": [
392 {
393 "path": "data.users.0.profile.addresses.1.city",
394 "logic": "New York"
395 },
396 {
397 "path": "data.users.0.profile.name",
398 "logic": "John Doe"
399 }
400 ]
401 });
402
403 let result = map_fn.execute(&mut message, &input).await;
404
405 assert!(result.is_ok());
406 let expected = json!({
407 "users": [
408 {
409 "profile": {
410 "name": "John Doe",
411 "addresses": [
412 null,
413 {
414 "city": "New York"
415 }
416 ]
417 }
418 }
419 ]
420 });
421 assert_eq!(message.data, expected);
422 }
423
424 #[tokio::test]
425 async fn test_overwrite_existing_value() {
426 let map_fn = MapFunction::new();
427
428 let mut message = Message::new(&json!({}));
430 message.data = json!({
431 "items": [
432 {"name": "Old Value"},
433 {"name": "Another Item"}
434 ]
435 });
436
437 let input = json!({
438 "mappings": [
439 {
440 "path": "data.items.0.name",
441 "logic": "New Value"
442 }
443 ]
444 });
445
446 let result = map_fn.execute(&mut message, &input).await;
447
448 assert!(result.is_ok());
449 let expected = json!({
450 "items": [
451 {"name": "New Value"},
452 {"name": "Another Item"}
453 ]
454 });
455 assert_eq!(message.data, expected);
456
457 let (_, changes) = result.unwrap();
459 assert_eq!(changes.len(), 1);
460 assert_eq!(changes[0].path, "data.items.0.name");
461 assert_eq!(changes[0].old_value, json!("Old Value"));
462 assert_eq!(changes[0].new_value, json!("New Value"));
463 }
464
465 #[tokio::test]
466 async fn test_array_notation_with_jsonlogic() {
467 let map_fn = MapFunction::new();
468
469 let mut message = Message::new(&json!({}));
471 message.temp_data = json!({
472 "transactions": [
473 {"id": "tx1", "amount": 100},
474 {"id": "tx2", "amount": 200}
475 ]
476 });
477 message.data = json!({});
478
479 let input = json!({
480 "mappings": [
481 {
482 "path": "data.processed.0.transaction_id",
483 "logic": {"var": "temp_data.transactions.0.id"}
484 },
485 {
486 "path": "data.processed.0.amount_cents",
487 "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
488 }
489 ]
490 });
491
492 let result = map_fn.execute(&mut message, &input).await;
493
494 assert!(result.is_ok());
495 let expected = json!({
496 "processed": [
497 {
498 "transaction_id": "tx1",
499 "amount_cents": 10000
500 }
501 ]
502 });
503 assert_eq!(message.data, expected);
504 }
505
506 #[tokio::test]
507 async fn test_convert_object_to_array() {
508 let map_fn = MapFunction::new();
509
510 let mut message = Message::new(&json!({}));
512 message.data = json!({
513 "items": {
514 "existing_key": "existing_value"
515 }
516 });
517
518 let input = json!({
519 "mappings": [
520 {
521 "path": "data.items.0.new_value",
522 "logic": "array_item"
523 }
524 ]
525 });
526
527 let result = map_fn.execute(&mut message, &input).await;
528
529 assert!(result.is_ok());
530 assert!(message.data["items"].is_array());
532 let expected = json!({
533 "items": [
534 {
535 "new_value": "array_item"
536 }
537 ]
538 });
539 assert_eq!(message.data, expected);
540 }
541
542 #[tokio::test]
543 async fn test_non_numeric_index_handling() {
544 let map_fn = MapFunction::new();
545
546 let mut message = Message::new(&json!({}));
548 message.data = json!({});
549
550 let input = json!({
551 "mappings": [
552 {
553 "path": "data.items.invalid_index.name",
554 "logic": "test"
555 }
556 ]
557 });
558
559 let result = map_fn.execute(&mut message, &input).await;
560
561 assert!(result.is_ok());
563 let expected = json!({
564 "items": {
565 "invalid_index": {
566 "name": "test"
567 }
568 }
569 });
570 assert_eq!(message.data, expected);
571
572 assert!(message.data["items"].is_object());
574 assert!(!message.data["items"].is_array());
575 }
576}