use crate::engine::error::{DataflowError, Result};
use crate::engine::executor::{ArenaContext, with_arena};
use crate::engine::message::{Change, Message};
use crate::engine::task_outcome::TaskOutcome;
use crate::engine::utils::{get_nested_value_parts, set_nested_value_parts};
use datalogic_rs::{Engine, Logic};
use datavalue::OwnedDataValue;
use log::{debug, error};
use serde::Deserialize;
use serde_json::Value;
use std::sync::Arc;
#[derive(Debug, Clone, Deserialize)]
pub struct MapConfig {
pub mappings: Vec<MapMapping>,
}
#[derive(Debug, Clone, Deserialize, Default)]
pub struct MapMapping {
pub path: String,
pub logic: Value,
#[doc(hidden)]
#[serde(skip)]
pub compiled_logic: Option<Arc<Logic>>,
#[doc(hidden)]
#[serde(skip)]
pub path_arc: Arc<str>,
#[doc(hidden)]
#[serde(skip)]
pub path_parts: Arc<[Arc<str>]>,
}
impl MapConfig {
pub fn from_json(input: &Value) -> Result<Self> {
let mappings = input.get("mappings").ok_or_else(|| {
DataflowError::Validation("Missing 'mappings' array in input".to_string())
})?;
let mappings_arr = mappings
.as_array()
.ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
let mut parsed_mappings = Vec::new();
for mapping in mappings_arr {
let path = mapping
.get("path")
.and_then(Value::as_str)
.ok_or_else(|| DataflowError::Validation("Missing 'path' in mapping".to_string()))?
.to_string();
let logic = mapping
.get("logic")
.ok_or_else(|| DataflowError::Validation("Missing 'logic' in mapping".to_string()))?
.clone();
parsed_mappings.push(MapMapping {
path_arc: Arc::from(path.as_str()),
path_parts: Arc::from(Vec::<Arc<str>>::new().into_boxed_slice()),
path,
logic,
compiled_logic: None,
});
}
Ok(MapConfig {
mappings: parsed_mappings,
})
}
pub fn execute(
&self,
message: &mut Message,
engine: &Arc<Engine>,
) -> Result<(TaskOutcome, Vec<Change>)> {
with_arena(|arena| {
let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
self.execute_in_arena(message, &mut arena_ctx, engine, None)
})
}
pub(crate) fn execute_in_arena(
&self,
message: &mut Message,
arena_ctx: &mut ArenaContext<'_>,
engine: &Arc<Engine>,
mut trace_snapshots: Option<&mut Vec<Value>>,
) -> Result<(TaskOutcome, Vec<Change>)> {
let mut changes = Vec::new();
let mut errors_encountered = false;
debug!("Map: Executing {} mappings", self.mappings.len());
let arena = arena_ctx.arena();
for mapping in &self.mappings {
debug!("Processing mapping to path: {}", mapping.path);
if let Some(buf) = trace_snapshots.as_deref_mut() {
buf.push(Value::from(&message.context));
}
let compiled_logic = match &mapping.compiled_logic {
Some(logic) => logic,
None => {
error!("Map: Logic not compiled for mapping to {}", mapping.path);
errors_encountered = true;
continue;
}
};
let ctx_av = arena_ctx.as_data_value();
let result_av = match engine.evaluate(compiled_logic, ctx_av, arena) {
Ok(av) => av,
Err(e) => {
error!(
"Map: Error evaluating logic for path {}: {:?}",
mapping.path, e
);
errors_encountered = true;
continue;
}
};
let transformed_value = result_av.to_owned();
debug!(
"Map: Evaluated logic for path {} resulted in: {:?}",
mapping.path, transformed_value
);
if matches!(transformed_value, OwnedDataValue::Null) {
debug!(
"Map: Skipping mapping for path {} as result is null",
mapping.path
);
continue;
}
let fallback_parts: Vec<Arc<str>>;
let parts: &[Arc<str>] = if mapping.path_parts.is_empty() && !mapping.path.is_empty() {
fallback_parts = mapping.path.split('.').map(Arc::from).collect();
&fallback_parts
} else {
&mapping.path_parts
};
let path_arc: Arc<str> = if mapping.path_arc.is_empty() && !mapping.path.is_empty() {
Arc::from(mapping.path.as_str())
} else {
Arc::clone(&mapping.path_arc)
};
if message.capture_changes {
let old_value = get_nested_value_parts(&message.context, parts)
.cloned()
.unwrap_or(OwnedDataValue::Null);
let new_value = transformed_value.clone();
changes.push(Change {
path: path_arc,
old_value,
new_value,
});
}
arena_ctx.apply_mutation_parts(&mut message.context, parts, |ctx| {
apply_mapping_parts(ctx, parts, &mapping.path, transformed_value);
});
debug!("Successfully mapped to path: {}", mapping.path);
}
let outcome = if errors_encountered {
TaskOutcome::Status(500)
} else {
TaskOutcome::Success
};
Ok((outcome, changes))
}
}
fn apply_mapping_parts(
context: &mut OwnedDataValue,
parts: &[Arc<str>],
full_path: &str,
new_value: OwnedDataValue,
) {
if parts.len() == 1 && matches!(full_path, "data" | "metadata" | "temp_data") {
merge_root_field(context, full_path, new_value);
} else {
set_nested_value_parts(context, parts, new_value);
}
}
fn merge_root_field(context: &mut OwnedDataValue, path: &str, new_value: OwnedDataValue) {
let OwnedDataValue::Object(ctx_pairs) = context else {
*context = wrap_root(path, new_value);
return;
};
let slot_idx = ctx_pairs.iter().position(|(k, _)| k == path);
match slot_idx {
Some(idx) => {
let slot = &mut ctx_pairs[idx].1;
match (slot, new_value) {
(OwnedDataValue::Object(existing), OwnedDataValue::Object(new_pairs)) => {
for (k, v) in new_pairs {
if let Some(s) = existing.iter_mut().find(|(ek, _)| ek == &k) {
s.1 = v;
} else {
existing.push((k, v));
}
}
}
(slot, new) => *slot = new,
}
}
None => {
ctx_pairs.push((path.to_string(), new_value));
}
}
}
fn wrap_root(path: &str, value: OwnedDataValue) -> OwnedDataValue {
OwnedDataValue::Object(vec![(path.to_string(), value)])
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::message::Message;
use crate::engine::utils::set_nested_value;
use serde_json::json;
fn dv(v: serde_json::Value) -> OwnedDataValue {
OwnedDataValue::from(&v)
}
fn fresh_message(initial: serde_json::Value) -> Message {
let mut m = Message::new(Arc::new(dv(json!({}))));
set_nested_value(&mut m.context, "data", dv(initial));
m
}
#[test]
fn test_map_config_from_json() {
let input = json!({
"mappings": [
{ "path": "data.field1", "logic": {"var": "data.source"} },
{ "path": "data.field2", "logic": "static_value" }
]
});
let config = MapConfig::from_json(&input).unwrap();
assert_eq!(config.mappings.len(), 2);
assert_eq!(config.mappings[0].path, "data.field1");
assert_eq!(config.mappings[1].path, "data.field2");
}
#[test]
fn test_map_config_missing_mappings() {
assert!(MapConfig::from_json(&json!({})).is_err());
}
#[test]
fn test_map_config_invalid_mappings() {
assert!(MapConfig::from_json(&json!({"mappings": "not_an_array"})).is_err());
}
#[test]
fn test_map_config_missing_path() {
let input = json!({"mappings": [{"logic": {"var": "data.source"}}]});
assert!(MapConfig::from_json(&input).is_err());
}
#[test]
fn test_map_config_missing_logic() {
let input = json!({"mappings": [{"path": "data.field1"}]});
assert!(MapConfig::from_json(&input).is_err());
}
fn compile_mappings(engine: &Arc<Engine>, config: &mut MapConfig) {
for mapping in &mut config.mappings {
mapping.compiled_logic = Some(engine.compile_arc(&mapping.logic).unwrap());
}
}
#[test]
fn test_map_metadata_assignment() {
let engine = Arc::new(Engine::builder().with_templating(true).build());
let mut message = fresh_message(json!({
"SwiftMT": { "message_type": "103" }
}));
let mut config = MapConfig {
mappings: vec![MapMapping {
path: "metadata.SwiftMT.message_type".to_string(),
logic: json!({"var": "data.SwiftMT.message_type"}),
..Default::default()
}],
};
compile_mappings(&engine, &mut config);
let result = config.execute(&mut message, &engine);
assert!(result.is_ok());
let (outcome, changes) = result.unwrap();
assert_eq!(outcome, TaskOutcome::Success);
assert_eq!(changes.len(), 1);
assert_eq!(
message.context["metadata"]
.get("SwiftMT")
.and_then(|v| v.get("message_type")),
Some(&dv(json!("103")))
);
}
#[test]
fn test_map_null_values_skip_assignment() {
let engine = Arc::new(Engine::builder().with_templating(true).build());
let mut message = fresh_message(json!({ "existing_field": "should_remain" }));
set_nested_value(
&mut message.context,
"metadata",
dv(json!({"existing_meta": "should_remain"})),
);
let mut config = MapConfig {
mappings: vec![
MapMapping {
path: "data.new_field".to_string(),
logic: json!({"var": "data.non_existent_field"}),
..Default::default()
},
MapMapping {
path: "metadata.new_meta".to_string(),
logic: json!({"var": "data.another_non_existent"}),
..Default::default()
},
MapMapping {
path: "data.actual_field".to_string(),
logic: json!("actual_value"),
..Default::default()
},
],
};
compile_mappings(&engine, &mut config);
let result = config.execute(&mut message, &engine);
assert!(result.is_ok());
let (outcome, changes) = result.unwrap();
assert_eq!(outcome, TaskOutcome::Success);
assert_eq!(changes.len(), 1);
assert_eq!(changes[0].path.as_ref(), "data.actual_field");
assert_eq!(message.context["data"].get("new_field"), None);
assert_eq!(message.context["metadata"].get("new_meta"), None);
assert_eq!(
message.context["data"].get("existing_field"),
Some(&dv(json!("should_remain")))
);
assert_eq!(
message.context["metadata"].get("existing_meta"),
Some(&dv(json!("should_remain")))
);
assert_eq!(
message.context["data"].get("actual_field"),
Some(&dv(json!("actual_value")))
);
}
#[test]
fn test_map_execute_with_trace_captures_context_snapshots() {
let engine = Arc::new(Engine::builder().with_templating(true).build());
let mut message = fresh_message(json!({ "first": "Alice", "last": "Smith" }));
let mut config = MapConfig {
mappings: vec![
MapMapping {
path: "data.full_name".to_string(),
logic: json!({"cat": [{"var": "data.first"}, " ", {"var": "data.last"}]}),
..Default::default()
},
MapMapping {
path: "data.greeting".to_string(),
logic: json!({"cat": ["Hello, ", {"var": "data.full_name"}]}),
..Default::default()
},
],
};
compile_mappings(&engine, &mut config);
let mut context_snapshots: Vec<Value> = Vec::new();
let result = with_arena(|arena| {
let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
config.execute_in_arena(
&mut message,
&mut arena_ctx,
&engine,
Some(&mut context_snapshots),
)
});
assert!(result.is_ok());
let (outcome, changes) = result.unwrap();
assert_eq!(outcome, TaskOutcome::Success);
assert_eq!(changes.len(), 2);
assert_eq!(context_snapshots.len(), 2);
assert!(context_snapshots[0]["data"].get("full_name").is_none());
assert_eq!(
context_snapshots[1]["data"].get("full_name"),
Some(&json!("Alice Smith"))
);
}
#[test]
fn test_map_multiple_fields_including_metadata() {
let engine = Arc::new(Engine::builder().with_templating(true).build());
let mut message = fresh_message(json!({
"ISO20022_MX": {
"document": {
"TxInf": {
"OrgnlGrpInf": { "OrgnlMsgNmId": "pacs.008.001.08" }
}
}
},
"SwiftMT": { "message_type": "103" }
}));
let mut config = MapConfig {
mappings: vec![
MapMapping {
path: "data.SwiftMT.message_type".to_string(),
logic: json!("103"),
..Default::default()
},
MapMapping {
path: "metadata.SwiftMT.message_type".to_string(),
logic: json!({"var": "data.SwiftMT.message_type"}),
..Default::default()
},
MapMapping {
path: "temp_data.original_msg_type".to_string(),
logic: json!({"var": "data.ISO20022_MX.document.TxInf.OrgnlGrpInf.OrgnlMsgNmId"}),
..Default::default()
},
],
};
compile_mappings(&engine, &mut config);
let result = config.execute(&mut message, &engine);
assert!(result.is_ok());
let (outcome, changes) = result.unwrap();
assert_eq!(outcome, TaskOutcome::Success);
assert_eq!(changes.len(), 3);
assert_eq!(
message.context["data"]
.get("SwiftMT")
.and_then(|v| v.get("message_type")),
Some(&dv(json!("103")))
);
assert_eq!(
message.context["metadata"]
.get("SwiftMT")
.and_then(|v| v.get("message_type")),
Some(&dv(json!("103")))
);
assert_eq!(
message.context["temp_data"].get("original_msg_type"),
Some(&dv(json!("pacs.008.001.08")))
);
}
}