dataflow_rs/engine/functions/
map.rs1use crate::engine::error::{DataflowError, Result};
36use crate::engine::message::{Change, Message};
37use crate::engine::utils::{get_nested_value, set_nested_value};
38use datalogic_rs::{CompiledLogic, DataLogic};
39use log::{debug, error};
40use serde::Deserialize;
41use serde_json::Value;
42use std::sync::Arc;
43
44#[derive(Debug, Clone, Deserialize)]
50pub struct MapConfig {
51 pub mappings: Vec<MapMapping>,
53}
54
55#[derive(Debug, Clone, Deserialize)]
60pub struct MapMapping {
61 pub path: String,
64
65 pub logic: Value,
68
69 #[serde(skip)]
71 pub logic_index: Option<usize>,
72}
73
74impl MapConfig {
75 pub fn from_json(input: &Value) -> Result<Self> {
86 let mappings = input.get("mappings").ok_or_else(|| {
87 DataflowError::Validation("Missing 'mappings' array in input".to_string())
88 })?;
89
90 let mappings_arr = mappings
91 .as_array()
92 .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
93
94 let mut parsed_mappings = Vec::new();
95
96 for mapping in mappings_arr {
97 let path = mapping
98 .get("path")
99 .and_then(Value::as_str)
100 .ok_or_else(|| DataflowError::Validation("Missing 'path' in mapping".to_string()))?
101 .to_string();
102
103 let logic = mapping
104 .get("logic")
105 .ok_or_else(|| DataflowError::Validation("Missing 'logic' in mapping".to_string()))?
106 .clone();
107
108 parsed_mappings.push(MapMapping {
109 path,
110 logic,
111 logic_index: None,
112 });
113 }
114
115 Ok(MapConfig {
116 mappings: parsed_mappings,
117 })
118 }
119
120 pub fn execute(
140 &self,
141 message: &mut Message,
142 datalogic: &Arc<DataLogic>,
143 logic_cache: &[Arc<CompiledLogic>],
144 ) -> Result<(usize, Vec<Change>)> {
145 let mut changes = Vec::new();
146 let mut errors_encountered = false;
147
148 debug!("Map: Executing {} mappings", self.mappings.len());
149
150 for mapping in &self.mappings {
152 let context_arc = message.get_context_arc();
155 debug!("Processing mapping to path: {}", mapping.path);
156
157 let compiled_logic = match mapping.logic_index {
159 Some(index) => {
160 if index >= logic_cache.len() {
162 error!(
163 "Map: Logic index {} out of bounds (cache size: {}) for mapping to {}",
164 index,
165 logic_cache.len(),
166 mapping.path
167 );
168 errors_encountered = true;
169 continue;
170 }
171 &logic_cache[index]
172 }
173 None => {
174 error!(
175 "Map: Logic not compiled (no index) for mapping to {}",
176 mapping.path
177 );
178 errors_encountered = true;
179 continue;
180 }
181 };
182
183 let result = datalogic.evaluate(compiled_logic, Arc::clone(&context_arc));
186
187 match result {
188 Ok(transformed_value) => {
189 debug!(
190 "Map: Evaluated logic for path {} resulted in: {:?}",
191 mapping.path, transformed_value
192 );
193
194 if transformed_value.is_null() {
196 debug!(
197 "Map: Skipping mapping for path {} as result is null",
198 mapping.path
199 );
200 continue;
201 }
202
203 let old_value = get_nested_value(&message.context, &mapping.path);
205
206 let old_value_arc = Arc::new(old_value.cloned().unwrap_or(Value::Null));
207 let new_value_arc = Arc::new(transformed_value.clone());
208
209 debug!(
210 "Recording change for path '{}': old={:?}, new={:?}",
211 mapping.path, old_value_arc, new_value_arc
212 );
213 changes.push(Change {
214 path: Arc::from(mapping.path.as_str()),
215 old_value: old_value_arc,
216 new_value: Arc::clone(&new_value_arc),
217 });
218
219 if mapping.path == "data"
222 || mapping.path == "metadata"
223 || mapping.path == "temp_data"
224 {
225 if let Value::Object(new_map) = transformed_value {
227 if let Value::Object(existing_map) = &mut message.context[&mapping.path]
229 {
230 for (key, value) in new_map {
232 existing_map.insert(key, value);
233 }
234 } else {
235 message.context[&mapping.path] = Value::Object(new_map);
237 }
238 } else {
239 message.context[&mapping.path] = transformed_value;
241 }
242 } else {
243 set_nested_value(&mut message.context, &mapping.path, transformed_value);
245 }
246 message.invalidate_context_cache();
249 debug!("Successfully mapped to path: {}", mapping.path);
250 }
251 Err(e) => {
252 error!(
253 "Map: Error evaluating logic for path {}: {:?}",
254 mapping.path, e
255 );
256 errors_encountered = true;
257 }
258 }
259 }
260
261 let status = if errors_encountered { 500 } else { 200 };
263 Ok((status, changes))
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use crate::engine::message::Message;
271 use serde_json::json;
272
273 #[test]
274 fn test_map_config_from_json() {
275 let input = json!({
276 "mappings": [
277 {
278 "path": "data.field1",
279 "logic": {"var": "data.source"}
280 },
281 {
282 "path": "data.field2",
283 "logic": "static_value"
284 }
285 ]
286 });
287
288 let config = MapConfig::from_json(&input).unwrap();
289 assert_eq!(config.mappings.len(), 2);
290 assert_eq!(config.mappings[0].path, "data.field1");
291 assert_eq!(config.mappings[1].path, "data.field2");
292 }
293
294 #[test]
295 fn test_map_config_missing_mappings() {
296 let input = json!({});
297 let result = MapConfig::from_json(&input);
298 assert!(result.is_err());
299 }
300
301 #[test]
302 fn test_map_config_invalid_mappings() {
303 let input = json!({
304 "mappings": "not_an_array"
305 });
306 let result = MapConfig::from_json(&input);
307 assert!(result.is_err());
308 }
309
310 #[test]
311 fn test_map_config_missing_path() {
312 let input = json!({
313 "mappings": [
314 {
315 "logic": {"var": "data.source"}
316 }
317 ]
318 });
319 let result = MapConfig::from_json(&input);
320 assert!(result.is_err());
321 }
322
323 #[test]
324 fn test_map_config_missing_logic() {
325 let input = json!({
326 "mappings": [
327 {
328 "path": "data.field1"
329 }
330 ]
331 });
332 let result = MapConfig::from_json(&input);
333 assert!(result.is_err());
334 }
335
336 #[test]
337 fn test_map_metadata_assignment() {
338 let datalogic = Arc::new(DataLogic::with_preserve_structure());
340
341 let mut message = Message::new(Arc::new(json!({})));
343 message.context["data"] = json!({
344 "SwiftMT": {
345 "message_type": "103"
346 }
347 });
348
349 let config = MapConfig {
351 mappings: vec![MapMapping {
352 path: "metadata.SwiftMT.message_type".to_string(),
353 logic: json!({"var": "data.SwiftMT.message_type"}),
354 logic_index: Some(0),
355 }],
356 };
357
358 let logic_cache = vec![datalogic.compile(&config.mappings[0].logic).unwrap()];
360
361 let result = config.execute(&mut message, &datalogic, &logic_cache);
363 assert!(result.is_ok());
364
365 let (status, changes) = result.unwrap();
366 assert_eq!(status, 200);
367 assert_eq!(changes.len(), 1);
368
369 assert_eq!(
371 message.context["metadata"]
372 .get("SwiftMT")
373 .and_then(|v| v.get("message_type")),
374 Some(&json!("103"))
375 );
376 }
377
378 #[test]
379 fn test_map_null_values_skip_assignment() {
380 let datalogic = Arc::new(DataLogic::with_preserve_structure());
382
383 let mut message = Message::new(Arc::new(json!({})));
385 message.context["data"] = json!({
386 "existing_field": "should_remain"
387 });
388 message.context["metadata"] = json!({
389 "existing_meta": "should_remain"
390 });
391
392 let config = MapConfig {
394 mappings: vec![
395 MapMapping {
396 path: "data.new_field".to_string(),
397 logic: json!({"var": "data.non_existent_field"}), logic_index: Some(0),
399 },
400 MapMapping {
401 path: "metadata.new_meta".to_string(),
402 logic: json!({"var": "data.another_non_existent"}), logic_index: Some(1),
404 },
405 MapMapping {
406 path: "data.actual_field".to_string(),
407 logic: json!("actual_value"), logic_index: Some(2),
409 },
410 ],
411 };
412
413 let logic_cache = vec![
415 datalogic.compile(&config.mappings[0].logic).unwrap(),
416 datalogic.compile(&config.mappings[1].logic).unwrap(),
417 datalogic.compile(&config.mappings[2].logic).unwrap(),
418 ];
419
420 let result = config.execute(&mut message, &datalogic, &logic_cache);
422 assert!(result.is_ok());
423
424 let (status, changes) = result.unwrap();
425 assert_eq!(status, 200);
426 assert_eq!(changes.len(), 1);
428 assert_eq!(changes[0].path.as_ref(), "data.actual_field");
429
430 assert_eq!(message.context["data"].get("new_field"), None);
432 assert_eq!(message.context["metadata"].get("new_meta"), None);
433
434 assert_eq!(
436 message.context["data"].get("existing_field"),
437 Some(&json!("should_remain"))
438 );
439 assert_eq!(
440 message.context["metadata"].get("existing_meta"),
441 Some(&json!("should_remain"))
442 );
443
444 assert_eq!(
446 message.context["data"].get("actual_field"),
447 Some(&json!("actual_value"))
448 );
449 }
450
451 #[test]
452 fn test_map_multiple_fields_including_metadata() {
453 let datalogic = Arc::new(DataLogic::with_preserve_structure());
455
456 let mut message = Message::new(Arc::new(json!({})));
458 message.context["data"] = json!({
459 "ISO20022_MX": {
460 "document": {
461 "TxInf": {
462 "OrgnlGrpInf": {
463 "OrgnlMsgNmId": "pacs.008.001.08"
464 }
465 }
466 }
467 },
468 "SwiftMT": {
469 "message_type": "103"
470 }
471 });
472
473 let mut config = MapConfig {
475 mappings: vec![
476 MapMapping {
477 path: "data.SwiftMT.message_type".to_string(),
478 logic: json!("103"),
479 logic_index: None,
480 },
481 MapMapping {
482 path: "metadata.SwiftMT.message_type".to_string(),
483 logic: json!({"var": "data.SwiftMT.message_type"}),
484 logic_index: None,
485 },
486 MapMapping {
487 path: "temp_data.original_msg_type".to_string(),
488 logic: json!({"var": "data.ISO20022_MX.document.TxInf.OrgnlGrpInf.OrgnlMsgNmId"}),
489 logic_index: None,
490 },
491 ],
492 };
493
494 let mut logic_cache = Vec::new();
496 for (i, mapping) in config.mappings.iter_mut().enumerate() {
497 logic_cache.push(datalogic.compile(&mapping.logic).unwrap());
498 mapping.logic_index = Some(i);
499 }
500
501 let result = config.execute(&mut message, &datalogic, &logic_cache);
503 assert!(result.is_ok());
504
505 let (status, changes) = result.unwrap();
506 assert_eq!(status, 200);
507 assert_eq!(changes.len(), 3);
508
509 assert_eq!(
511 message.context["data"]
512 .get("SwiftMT")
513 .and_then(|v| v.get("message_type")),
514 Some(&json!("103"))
515 );
516 assert_eq!(
517 message.context["metadata"]
518 .get("SwiftMT")
519 .and_then(|v| v.get("message_type")),
520 Some(&json!("103"))
521 );
522 assert_eq!(
523 message.context["temp_data"].get("original_msg_type"),
524 Some(&json!("pacs.008.001.08"))
525 );
526 }
527}