dataflow_rs/engine/functions/
map.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::FUNCTION_DATA_LOGIC;
3use crate::engine::message::{Change, Message};
4use crate::engine::AsyncFunctionHandler;
5use async_trait::async_trait;
6use serde_json::{json, Value};
7
8pub struct MapFunction {
13 }
15
16impl Default for MapFunction {
17 fn default() -> Self {
18 Self::new()
19 }
20}
21
22impl MapFunction {
23 pub fn new() -> Self {
25 Self {}
26 }
27
28 fn set_value_at_path(&self, target: &mut Value, path: &str, value: Value) -> Result<Value> {
30 let mut current = target;
31 let mut old_value = Value::Null;
32 let path_parts: Vec<&str> = path.split('.').collect();
33
34 fn is_numeric_index(s: &str) -> bool {
36 s.parse::<usize>().is_ok()
37 }
38
39 for (i, part) in path_parts.iter().enumerate() {
41 let is_numeric = is_numeric_index(part);
42
43 if i == path_parts.len() - 1 {
44 if is_numeric {
46 if !current.is_array() {
48 *current = Value::Array(vec![]);
50 }
51
52 if let Ok(index) = part.parse::<usize>() {
53 if let Value::Array(arr) = current {
54 while arr.len() <= index {
56 arr.push(Value::Null);
57 }
58 old_value = arr[index].clone();
60 arr[index] = value.clone();
61 }
62 } else {
63 return Err(DataflowError::Validation(format!(
64 "Invalid array index: {}",
65 part
66 )));
67 }
68 } else {
69 if !current.is_object() {
71 *current = Value::Object(serde_json::Map::new());
73 }
74
75 if let Value::Object(map) = current {
76 old_value = map.get(*part).cloned().unwrap_or(Value::Null);
78 map.insert(part.to_string(), value.clone());
79 }
80 }
81 } else {
82 if is_numeric {
84 if !current.is_array() {
86 *current = Value::Array(vec![]);
87 }
88
89 if let Ok(index) = part.parse::<usize>() {
90 if let Value::Array(arr) = current {
91 while arr.len() <= index {
93 arr.push(Value::Null);
94 }
95 if arr[index].is_null() {
97 let next_part = path_parts.get(i + 1).unwrap_or(&"");
99 if is_numeric_index(next_part) {
100 arr[index] = Value::Array(vec![]);
101 } else {
102 arr[index] = json!({});
103 }
104 }
105 current = &mut arr[index];
106 }
107 } else {
108 return Err(DataflowError::Validation(format!(
109 "Invalid array index: {}",
110 part
111 )));
112 }
113 } else {
114 if !current.is_object() {
116 *current = Value::Object(serde_json::Map::new());
117 }
118
119 if let Value::Object(map) = current {
120 if !map.contains_key(*part) {
121 let next_part = path_parts.get(i + 1).unwrap_or(&"");
123 if is_numeric_index(next_part) {
124 map.insert(part.to_string(), Value::Array(vec![]));
125 } else {
126 map.insert(part.to_string(), json!({}));
127 }
128 }
129 current = map.get_mut(*part).unwrap();
130 }
131 }
132 }
133 }
134
135 Ok(old_value)
136 }
137}
138
139#[async_trait]
140impl AsyncFunctionHandler for MapFunction {
141 async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
142 let mappings = input.get("mappings").ok_or_else(|| {
144 DataflowError::Validation("Missing 'mappings' array in input".to_string())
145 })?;
146
147 let mappings_arr = mappings
148 .as_array()
149 .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
150
151 let mut changes = Vec::new();
152
153 for mapping in mappings_arr {
155 let target_path = mapping.get("path").and_then(Value::as_str).ok_or_else(|| {
157 DataflowError::Validation("Missing 'path' in mapping".to_string())
158 })?;
159
160 let logic = mapping.get("logic").ok_or_else(|| {
162 DataflowError::Validation("Missing 'logic' in mapping".to_string())
163 })?;
164
165 let data_clone = message.data.clone();
167 let metadata_clone = message.metadata.clone();
168 let temp_data_clone = message.temp_data.clone();
169
170 let data_for_eval = json!({
172 "data": data_clone,
173 "metadata": metadata_clone,
174 "temp_data": temp_data_clone,
175 });
176
177 let (target_object, adjusted_path) =
179 if let Some(path) = target_path.strip_prefix("data.") {
180 (&mut message.data, path)
181 } else if let Some(path) = target_path.strip_prefix("metadata.") {
182 (&mut message.metadata, path)
183 } else if let Some(path) = target_path.strip_prefix("temp_data.") {
184 (&mut message.temp_data, path)
185 } else if target_path == "data" {
186 (&mut message.data, "")
187 } else if target_path == "metadata" {
188 (&mut message.metadata, "")
189 } else if target_path == "temp_data" {
190 (&mut message.temp_data, "")
191 } else {
192 (&mut message.data, target_path)
194 };
195
196 let result = FUNCTION_DATA_LOGIC.with(|data_logic_cell| {
198 let data_logic = data_logic_cell.borrow_mut();
199
200 data_logic
201 .evaluate_json(logic, &data_for_eval, None)
202 .map_err(|e| {
203 DataflowError::LogicEvaluation(format!("Failed to evaluate logic: {}", e))
204 })
205 })?;
206
207 if adjusted_path.is_empty() {
209 let old_value = std::mem::replace(target_object, result.clone());
211 changes.push(Change {
212 path: target_path.to_string(),
213 old_value,
214 new_value: result,
215 });
216 } else {
217 let old_value =
219 self.set_value_at_path(target_object, adjusted_path, result.clone())?;
220 changes.push(Change {
221 path: target_path.to_string(),
222 old_value,
223 new_value: result,
224 });
225 }
226 }
227
228 Ok((200, changes))
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::engine::message::Message;
236 use serde_json::json;
237
238 #[tokio::test]
239 async fn test_array_notation_simple() {
240 let map_fn = MapFunction::new();
241
242 let mut message = Message::new(&json!({}));
244 message.data = json!({});
245
246 let input = json!({
247 "mappings": [
248 {
249 "path": "data.items.0.name",
250 "logic": "Test Item"
251 }
252 ]
253 });
254
255 let result = map_fn.execute(&mut message, &input).await;
256
257 assert!(result.is_ok());
258 let expected = json!({
259 "items": [
260 {
261 "name": "Test Item"
262 }
263 ]
264 });
265 assert_eq!(message.data, expected);
266 }
267
268 #[tokio::test]
269 async fn test_array_notation_complex_path() {
270 let map_fn = MapFunction::new();
271
272 let mut message = Message::new(&json!({}));
274 message.data = json!({});
275
276 let input = json!({
277 "mappings": [
278 {
279 "path": "data.MX.FIToFICstmrCdtTrf.CdtTrfTxInf.0.PmtId.InstrId",
280 "logic": "INSTR123"
281 }
282 ]
283 });
284
285 let result = map_fn.execute(&mut message, &input).await;
286
287 assert!(result.is_ok());
288 let expected = json!({
289 "MX": {
290 "FIToFICstmrCdtTrf": {
291 "CdtTrfTxInf": [
292 {
293 "PmtId": {
294 "InstrId": "INSTR123"
295 }
296 }
297 ]
298 }
299 }
300 });
301 assert_eq!(message.data, expected);
302 }
303
304 #[tokio::test]
305 async fn test_multiple_array_indices() {
306 let map_fn = MapFunction::new();
307
308 let mut message = Message::new(&json!({}));
310 message.data = json!({});
311
312 let input = json!({
313 "mappings": [
314 {
315 "path": "data.matrix.0.1.value",
316 "logic": "cell_01"
317 },
318 {
319 "path": "data.matrix.1.0.value",
320 "logic": "cell_10"
321 }
322 ]
323 });
324
325 let result = map_fn.execute(&mut message, &input).await;
326
327 assert!(result.is_ok());
328 let expected = json!({
329 "matrix": [
330 [
331 null,
332 {
333 "value": "cell_01"
334 }
335 ],
336 [
337 {
338 "value": "cell_10"
339 }
340 ]
341 ]
342 });
343 assert_eq!(message.data, expected);
344 }
345
346 #[tokio::test]
347 async fn test_array_extension() {
348 let map_fn = MapFunction::new();
349
350 let mut message = Message::new(&json!({}));
352 message.data = json!({});
353
354 let input = json!({
355 "mappings": [
356 {
357 "path": "data.items.5.name",
358 "logic": "Item at index 5"
359 }
360 ]
361 });
362
363 let result = map_fn.execute(&mut message, &input).await;
364
365 assert!(result.is_ok());
366
367 assert!(message.data["items"].is_array());
369 let items_array = message.data["items"].as_array().unwrap();
370 assert_eq!(items_array.len(), 6);
371
372 for i in 0..5 {
374 assert_eq!(items_array[i], json!(null));
375 }
376
377 assert_eq!(items_array[5], json!({"name": "Item at index 5"}));
379 }
380
381 #[tokio::test]
382 async fn test_mixed_array_and_object_notation() {
383 let map_fn = MapFunction::new();
384
385 let mut message = Message::new(&json!({}));
387 message.data = json!({});
388
389 let input = json!({
390 "mappings": [
391 {
392 "path": "data.users.0.profile.addresses.1.city",
393 "logic": "New York"
394 },
395 {
396 "path": "data.users.0.profile.name",
397 "logic": "John Doe"
398 }
399 ]
400 });
401
402 let result = map_fn.execute(&mut message, &input).await;
403
404 assert!(result.is_ok());
405 let expected = json!({
406 "users": [
407 {
408 "profile": {
409 "name": "John Doe",
410 "addresses": [
411 null,
412 {
413 "city": "New York"
414 }
415 ]
416 }
417 }
418 ]
419 });
420 assert_eq!(message.data, expected);
421 }
422
423 #[tokio::test]
424 async fn test_overwrite_existing_value() {
425 let map_fn = MapFunction::new();
426
427 let mut message = Message::new(&json!({}));
429 message.data = json!({
430 "items": [
431 {"name": "Old Value"},
432 {"name": "Another Item"}
433 ]
434 });
435
436 let input = json!({
437 "mappings": [
438 {
439 "path": "data.items.0.name",
440 "logic": "New Value"
441 }
442 ]
443 });
444
445 let result = map_fn.execute(&mut message, &input).await;
446
447 assert!(result.is_ok());
448 let expected = json!({
449 "items": [
450 {"name": "New Value"},
451 {"name": "Another Item"}
452 ]
453 });
454 assert_eq!(message.data, expected);
455
456 let (_, changes) = result.unwrap();
458 assert_eq!(changes.len(), 1);
459 assert_eq!(changes[0].path, "data.items.0.name");
460 assert_eq!(changes[0].old_value, json!("Old Value"));
461 assert_eq!(changes[0].new_value, json!("New Value"));
462 }
463
464 #[tokio::test]
465 async fn test_array_notation_with_jsonlogic() {
466 let map_fn = MapFunction::new();
467
468 let mut message = Message::new(&json!({}));
470 message.temp_data = json!({
471 "transactions": [
472 {"id": "tx1", "amount": 100},
473 {"id": "tx2", "amount": 200}
474 ]
475 });
476 message.data = json!({});
477
478 let input = json!({
479 "mappings": [
480 {
481 "path": "data.processed.0.transaction_id",
482 "logic": {"var": "temp_data.transactions.0.id"}
483 },
484 {
485 "path": "data.processed.0.amount_cents",
486 "logic": {"*": [{"var": "temp_data.transactions.0.amount"}, 100]}
487 }
488 ]
489 });
490
491 let result = map_fn.execute(&mut message, &input).await;
492
493 assert!(result.is_ok());
494 let expected = json!({
495 "processed": [
496 {
497 "transaction_id": "tx1",
498 "amount_cents": 10000
499 }
500 ]
501 });
502 assert_eq!(message.data, expected);
503 }
504
505 #[tokio::test]
506 async fn test_convert_object_to_array() {
507 let map_fn = MapFunction::new();
508
509 let mut message = Message::new(&json!({}));
511 message.data = json!({
512 "items": {
513 "existing_key": "existing_value"
514 }
515 });
516
517 let input = json!({
518 "mappings": [
519 {
520 "path": "data.items.0.new_value",
521 "logic": "array_item"
522 }
523 ]
524 });
525
526 let result = map_fn.execute(&mut message, &input).await;
527
528 assert!(result.is_ok());
529 assert!(message.data["items"].is_array());
531 let expected = json!({
532 "items": [
533 {
534 "new_value": "array_item"
535 }
536 ]
537 });
538 assert_eq!(message.data, expected);
539 }
540
541 #[tokio::test]
542 async fn test_non_numeric_index_handling() {
543 let map_fn = MapFunction::new();
544
545 let mut message = Message::new(&json!({}));
547 message.data = json!({});
548
549 let input = json!({
550 "mappings": [
551 {
552 "path": "data.items.invalid_index.name",
553 "logic": "test"
554 }
555 ]
556 });
557
558 let result = map_fn.execute(&mut message, &input).await;
559
560 assert!(result.is_ok());
562 let expected = json!({
563 "items": {
564 "invalid_index": {
565 "name": "test"
566 }
567 }
568 });
569 assert_eq!(message.data, expected);
570
571 assert!(message.data["items"].is_object());
573 assert!(!message.data["items"].is_array());
574 }
575}