1use crate::engine::error::{DataflowError, Result};
17use crate::engine::executor::{ArenaContext, with_arena};
18use crate::engine::message::{Change, Message};
19use crate::engine::task_outcome::TaskOutcome;
20use crate::engine::utils::{get_nested_value_parts, set_nested_value_parts};
21use datalogic_rs::{Engine, Logic};
22use datavalue::OwnedDataValue;
23use log::{debug, error};
24use serde::Deserialize;
25use serde_json::Value;
26use std::sync::Arc;
27
28#[derive(Debug, Clone, Deserialize)]
30pub struct MapConfig {
31 pub mappings: Vec<MapMapping>,
33}
34
35#[derive(Debug, Clone, Deserialize, Default)]
37pub struct MapMapping {
38 pub path: String,
41
42 pub logic: Value,
45
46 #[doc(hidden)]
50 #[serde(skip)]
51 pub compiled_logic: Option<Arc<Logic>>,
52
53 #[doc(hidden)]
58 #[serde(skip)]
59 pub path_arc: Arc<str>,
60
61 #[doc(hidden)]
67 #[serde(skip)]
68 pub path_parts: Arc<[Arc<str>]>,
69}
70
71impl MapConfig {
72 pub fn from_json(input: &Value) -> Result<Self> {
74 let mappings = input.get("mappings").ok_or_else(|| {
75 DataflowError::Validation("Missing 'mappings' array in input".to_string())
76 })?;
77
78 let mappings_arr = mappings
79 .as_array()
80 .ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
81
82 let mut parsed_mappings = Vec::new();
83
84 for mapping in mappings_arr {
85 let path = mapping
86 .get("path")
87 .and_then(Value::as_str)
88 .ok_or_else(|| DataflowError::Validation("Missing 'path' in mapping".to_string()))?
89 .to_string();
90
91 let logic = mapping
92 .get("logic")
93 .ok_or_else(|| DataflowError::Validation("Missing 'logic' in mapping".to_string()))?
94 .clone();
95
96 parsed_mappings.push(MapMapping {
97 path_arc: Arc::from(path.as_str()),
98 path_parts: Arc::from(Vec::<Arc<str>>::new().into_boxed_slice()),
99 path,
100 logic,
101 compiled_logic: None,
102 });
103 }
104
105 Ok(MapConfig {
106 mappings: parsed_mappings,
107 })
108 }
109
110 pub fn execute(
116 &self,
117 message: &mut Message,
118 engine: &Arc<Engine>,
119 ) -> Result<(TaskOutcome, Vec<Change>)> {
120 with_arena(|arena| {
124 let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
125 self.execute_in_arena(message, &mut arena_ctx, engine, None)
126 })
127 }
128
129 pub(crate) fn execute_in_arena(
139 &self,
140 message: &mut Message,
141 arena_ctx: &mut ArenaContext<'_>,
142 engine: &Arc<Engine>,
143 mut trace_snapshots: Option<&mut Vec<Value>>,
144 ) -> Result<(TaskOutcome, Vec<Change>)> {
145 let mut changes = Vec::new();
146 let mut errors_encountered = false;
147
148 debug!("Map: Executing {} mappings", self.mappings.len());
149
150 let arena = arena_ctx.arena();
151 for mapping in &self.mappings {
152 debug!("Processing mapping to path: {}", mapping.path);
153
154 if let Some(buf) = trace_snapshots.as_deref_mut() {
158 buf.push(Value::from(&message.context));
159 }
160
161 let compiled_logic = match &mapping.compiled_logic {
166 Some(logic) => logic,
167 None => {
168 error!("Map: Logic not compiled for mapping to {}", mapping.path);
169 errors_encountered = true;
170 continue;
171 }
172 };
173
174 let ctx_av = arena_ctx.as_data_value();
175 let result_av = match engine.evaluate(compiled_logic, ctx_av, arena) {
176 Ok(av) => av,
177 Err(e) => {
178 error!(
179 "Map: Error evaluating logic for path {}: {:?}",
180 mapping.path, e
181 );
182 errors_encountered = true;
183 continue;
184 }
185 };
186
187 let transformed_value = result_av.to_owned();
188 debug!(
189 "Map: Evaluated logic for path {} resulted in: {:?}",
190 mapping.path, transformed_value
191 );
192
193 if matches!(transformed_value, OwnedDataValue::Null) {
194 debug!(
195 "Map: Skipping mapping for path {} as result is null",
196 mapping.path
197 );
198 continue;
199 }
200
201 let fallback_parts: Vec<Arc<str>>;
206 let parts: &[Arc<str>] = if mapping.path_parts.is_empty() && !mapping.path.is_empty() {
207 fallback_parts = mapping.path.split('.').map(Arc::from).collect();
208 &fallback_parts
209 } else {
210 &mapping.path_parts
211 };
212 let path_arc: Arc<str> = if mapping.path_arc.is_empty() && !mapping.path.is_empty() {
213 Arc::from(mapping.path.as_str())
214 } else {
215 Arc::clone(&mapping.path_arc)
216 };
217
218 if message.capture_changes {
219 let old_value = get_nested_value_parts(&message.context, parts)
223 .cloned()
224 .unwrap_or(OwnedDataValue::Null);
225 let new_value = transformed_value.clone();
226
227 changes.push(Change {
228 path: path_arc,
229 old_value,
230 new_value,
231 });
232 }
233 arena_ctx.apply_mutation_parts(&mut message.context, parts, |ctx| {
234 apply_mapping_parts(ctx, parts, &mapping.path, transformed_value);
235 });
236 debug!("Successfully mapped to path: {}", mapping.path);
237 }
238
239 let outcome = if errors_encountered {
240 TaskOutcome::Status(500)
241 } else {
242 TaskOutcome::Success
243 };
244 Ok((outcome, changes))
245 }
246}
247
248fn apply_mapping_parts(
252 context: &mut OwnedDataValue,
253 parts: &[Arc<str>],
254 full_path: &str,
255 new_value: OwnedDataValue,
256) {
257 if parts.len() == 1 && matches!(full_path, "data" | "metadata" | "temp_data") {
258 merge_root_field(context, full_path, new_value);
259 } else {
260 set_nested_value_parts(context, parts, new_value);
261 }
262}
263
264fn merge_root_field(context: &mut OwnedDataValue, path: &str, new_value: OwnedDataValue) {
268 let OwnedDataValue::Object(ctx_pairs) = context else {
269 *context = wrap_root(path, new_value);
271 return;
272 };
273
274 let slot_idx = ctx_pairs.iter().position(|(k, _)| k == path);
275 match slot_idx {
276 Some(idx) => {
277 let slot = &mut ctx_pairs[idx].1;
278 match (slot, new_value) {
279 (OwnedDataValue::Object(existing), OwnedDataValue::Object(new_pairs)) => {
280 for (k, v) in new_pairs {
281 if let Some(s) = existing.iter_mut().find(|(ek, _)| ek == &k) {
282 s.1 = v;
283 } else {
284 existing.push((k, v));
285 }
286 }
287 }
288 (slot, new) => *slot = new,
289 }
290 }
291 None => {
292 ctx_pairs.push((path.to_string(), new_value));
293 }
294 }
295}
296
297fn wrap_root(path: &str, value: OwnedDataValue) -> OwnedDataValue {
300 OwnedDataValue::Object(vec![(path.to_string(), value)])
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306 use crate::engine::message::Message;
307 use crate::engine::utils::set_nested_value;
308 use serde_json::json;
309
310 fn dv(v: serde_json::Value) -> OwnedDataValue {
311 OwnedDataValue::from(&v)
312 }
313
314 fn fresh_message(initial: serde_json::Value) -> Message {
315 let mut m = Message::new(Arc::new(dv(json!({}))));
317 set_nested_value(&mut m.context, "data", dv(initial));
318 m
319 }
320
321 #[test]
322 fn test_map_config_from_json() {
323 let input = json!({
324 "mappings": [
325 { "path": "data.field1", "logic": {"var": "data.source"} },
326 { "path": "data.field2", "logic": "static_value" }
327 ]
328 });
329
330 let config = MapConfig::from_json(&input).unwrap();
331 assert_eq!(config.mappings.len(), 2);
332 assert_eq!(config.mappings[0].path, "data.field1");
333 assert_eq!(config.mappings[1].path, "data.field2");
334 }
335
336 #[test]
337 fn test_map_config_missing_mappings() {
338 assert!(MapConfig::from_json(&json!({})).is_err());
339 }
340
341 #[test]
342 fn test_map_config_invalid_mappings() {
343 assert!(MapConfig::from_json(&json!({"mappings": "not_an_array"})).is_err());
344 }
345
346 #[test]
347 fn test_map_config_missing_path() {
348 let input = json!({"mappings": [{"logic": {"var": "data.source"}}]});
349 assert!(MapConfig::from_json(&input).is_err());
350 }
351
352 #[test]
353 fn test_map_config_missing_logic() {
354 let input = json!({"mappings": [{"path": "data.field1"}]});
355 assert!(MapConfig::from_json(&input).is_err());
356 }
357
358 fn compile_mappings(engine: &Arc<Engine>, config: &mut MapConfig) {
362 for mapping in &mut config.mappings {
363 mapping.compiled_logic = Some(engine.compile_arc(&mapping.logic).unwrap());
364 }
365 }
366
367 #[test]
368 fn test_map_metadata_assignment() {
369 let engine = Arc::new(Engine::builder().with_templating(true).build());
370
371 let mut message = fresh_message(json!({
372 "SwiftMT": { "message_type": "103" }
373 }));
374
375 let mut config = MapConfig {
376 mappings: vec![MapMapping {
377 path: "metadata.SwiftMT.message_type".to_string(),
378 logic: json!({"var": "data.SwiftMT.message_type"}),
379 ..Default::default()
380 }],
381 };
382 compile_mappings(&engine, &mut config);
383
384 let result = config.execute(&mut message, &engine);
385 assert!(result.is_ok());
386
387 let (outcome, changes) = result.unwrap();
388 assert_eq!(outcome, TaskOutcome::Success);
389 assert_eq!(changes.len(), 1);
390
391 assert_eq!(
392 message.context["metadata"]
393 .get("SwiftMT")
394 .and_then(|v| v.get("message_type")),
395 Some(&dv(json!("103")))
396 );
397 }
398
399 #[test]
400 fn test_map_null_values_skip_assignment() {
401 let engine = Arc::new(Engine::builder().with_templating(true).build());
402
403 let mut message = fresh_message(json!({ "existing_field": "should_remain" }));
404 set_nested_value(
405 &mut message.context,
406 "metadata",
407 dv(json!({"existing_meta": "should_remain"})),
408 );
409
410 let mut config = MapConfig {
411 mappings: vec![
412 MapMapping {
413 path: "data.new_field".to_string(),
414 logic: json!({"var": "data.non_existent_field"}),
415 ..Default::default()
416 },
417 MapMapping {
418 path: "metadata.new_meta".to_string(),
419 logic: json!({"var": "data.another_non_existent"}),
420 ..Default::default()
421 },
422 MapMapping {
423 path: "data.actual_field".to_string(),
424 logic: json!("actual_value"),
425 ..Default::default()
426 },
427 ],
428 };
429 compile_mappings(&engine, &mut config);
430
431 let result = config.execute(&mut message, &engine);
432 assert!(result.is_ok());
433
434 let (outcome, changes) = result.unwrap();
435 assert_eq!(outcome, TaskOutcome::Success);
436 assert_eq!(changes.len(), 1);
437 assert_eq!(changes[0].path.as_ref(), "data.actual_field");
438
439 assert_eq!(message.context["data"].get("new_field"), None);
440 assert_eq!(message.context["metadata"].get("new_meta"), None);
441
442 assert_eq!(
443 message.context["data"].get("existing_field"),
444 Some(&dv(json!("should_remain")))
445 );
446 assert_eq!(
447 message.context["metadata"].get("existing_meta"),
448 Some(&dv(json!("should_remain")))
449 );
450
451 assert_eq!(
452 message.context["data"].get("actual_field"),
453 Some(&dv(json!("actual_value")))
454 );
455 }
456
457 #[test]
458 fn test_map_execute_with_trace_captures_context_snapshots() {
459 let engine = Arc::new(Engine::builder().with_templating(true).build());
460
461 let mut message = fresh_message(json!({ "first": "Alice", "last": "Smith" }));
462
463 let mut config = MapConfig {
464 mappings: vec![
465 MapMapping {
466 path: "data.full_name".to_string(),
467 logic: json!({"cat": [{"var": "data.first"}, " ", {"var": "data.last"}]}),
468 ..Default::default()
469 },
470 MapMapping {
471 path: "data.greeting".to_string(),
472 logic: json!({"cat": ["Hello, ", {"var": "data.full_name"}]}),
473 ..Default::default()
474 },
475 ],
476 };
477 compile_mappings(&engine, &mut config);
478
479 let mut context_snapshots: Vec<Value> = Vec::new();
480 let result = with_arena(|arena| {
481 let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
482 config.execute_in_arena(
483 &mut message,
484 &mut arena_ctx,
485 &engine,
486 Some(&mut context_snapshots),
487 )
488 });
489 assert!(result.is_ok());
490
491 let (outcome, changes) = result.unwrap();
492 assert_eq!(outcome, TaskOutcome::Success);
493 assert_eq!(changes.len(), 2);
494 assert_eq!(context_snapshots.len(), 2);
495
496 assert!(context_snapshots[0]["data"].get("full_name").is_none());
498 assert_eq!(
499 context_snapshots[1]["data"].get("full_name"),
500 Some(&json!("Alice Smith"))
501 );
502 }
503
504 #[test]
505 fn test_map_multiple_fields_including_metadata() {
506 let engine = Arc::new(Engine::builder().with_templating(true).build());
507
508 let mut message = fresh_message(json!({
509 "ISO20022_MX": {
510 "document": {
511 "TxInf": {
512 "OrgnlGrpInf": { "OrgnlMsgNmId": "pacs.008.001.08" }
513 }
514 }
515 },
516 "SwiftMT": { "message_type": "103" }
517 }));
518
519 let mut config = MapConfig {
520 mappings: vec![
521 MapMapping {
522 path: "data.SwiftMT.message_type".to_string(),
523 logic: json!("103"),
524 ..Default::default()
525 },
526 MapMapping {
527 path: "metadata.SwiftMT.message_type".to_string(),
528 logic: json!({"var": "data.SwiftMT.message_type"}),
529 ..Default::default()
530 },
531 MapMapping {
532 path: "temp_data.original_msg_type".to_string(),
533 logic: json!({"var": "data.ISO20022_MX.document.TxInf.OrgnlGrpInf.OrgnlMsgNmId"}),
534 ..Default::default()
535 },
536 ],
537 };
538 compile_mappings(&engine, &mut config);
539
540 let result = config.execute(&mut message, &engine);
541 assert!(result.is_ok());
542
543 let (outcome, changes) = result.unwrap();
544 assert_eq!(outcome, TaskOutcome::Success);
545 assert_eq!(changes.len(), 3);
546
547 assert_eq!(
548 message.context["data"]
549 .get("SwiftMT")
550 .and_then(|v| v.get("message_type")),
551 Some(&dv(json!("103")))
552 );
553 assert_eq!(
554 message.context["metadata"]
555 .get("SwiftMT")
556 .and_then(|v| v.get("message_type")),
557 Some(&dv(json!("103")))
558 );
559 assert_eq!(
560 message.context["temp_data"].get("original_msg_type"),
561 Some(&dv(json!("pacs.008.001.08")))
562 );
563 }
564}