use crate::engine::error::{DataflowError, Result};
use crate::engine::message::{Change, Message};
use crate::engine::utils::{get_nested_value, set_nested_value};
use datalogic_rs::{CompiledLogic, DataLogic};
use log::{debug, error};
use serde::Deserialize;
use serde_json::Value;
use std::sync::Arc;
#[derive(Debug, Clone, Deserialize)]
pub struct MapConfig {
pub mappings: Vec<MapMapping>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct MapMapping {
pub path: String,
pub logic: Value,
#[serde(skip)]
pub logic_index: Option<usize>,
}
impl MapConfig {
pub fn from_json(input: &Value) -> Result<Self> {
let mappings = input.get("mappings").ok_or_else(|| {
DataflowError::Validation("Missing 'mappings' array in input".to_string())
})?;
let mappings_arr = mappings
.as_array()
.ok_or_else(|| DataflowError::Validation("'mappings' must be an array".to_string()))?;
let mut parsed_mappings = Vec::new();
for mapping in mappings_arr {
let path = mapping
.get("path")
.and_then(Value::as_str)
.ok_or_else(|| DataflowError::Validation("Missing 'path' in mapping".to_string()))?
.to_string();
let logic = mapping
.get("logic")
.ok_or_else(|| DataflowError::Validation("Missing 'logic' in mapping".to_string()))?
.clone();
parsed_mappings.push(MapMapping {
path,
logic,
logic_index: None,
});
}
Ok(MapConfig {
mappings: parsed_mappings,
})
}
pub fn execute(
&self,
message: &mut Message,
datalogic: &Arc<DataLogic>,
logic_cache: &[Arc<CompiledLogic>],
) -> Result<(usize, Vec<Change>)> {
let mut changes = Vec::new();
let mut errors_encountered = false;
debug!("Map: Executing {} mappings", self.mappings.len());
for mapping in &self.mappings {
let context_arc = message.get_context_arc();
debug!("Processing mapping to path: {}", mapping.path);
let compiled_logic = match mapping.logic_index {
Some(index) => {
if index >= logic_cache.len() {
error!(
"Map: Logic index {} out of bounds (cache size: {}) for mapping to {}",
index,
logic_cache.len(),
mapping.path
);
errors_encountered = true;
continue;
}
&logic_cache[index]
}
None => {
error!(
"Map: Logic not compiled (no index) for mapping to {}",
mapping.path
);
errors_encountered = true;
continue;
}
};
let result = datalogic.evaluate(compiled_logic, Arc::clone(&context_arc));
match result {
Ok(transformed_value) => {
debug!(
"Map: Evaluated logic for path {} resulted in: {:?}",
mapping.path, transformed_value
);
if transformed_value.is_null() {
debug!(
"Map: Skipping mapping for path {} as result is null",
mapping.path
);
continue;
}
let old_value = get_nested_value(&message.context, &mapping.path);
let old_value_arc = Arc::new(old_value.cloned().unwrap_or(Value::Null));
let new_value_arc = Arc::new(transformed_value.clone());
debug!(
"Recording change for path '{}': old={:?}, new={:?}",
mapping.path, old_value_arc, new_value_arc
);
changes.push(Change {
path: Arc::from(mapping.path.as_str()),
old_value: old_value_arc,
new_value: Arc::clone(&new_value_arc),
});
if mapping.path == "data"
|| mapping.path == "metadata"
|| mapping.path == "temp_data"
{
if let Value::Object(new_map) = transformed_value {
if let Value::Object(existing_map) = &mut message.context[&mapping.path]
{
for (key, value) in new_map {
existing_map.insert(key, value);
}
} else {
message.context[&mapping.path] = Value::Object(new_map);
}
} else {
message.context[&mapping.path] = transformed_value;
}
} else {
set_nested_value(&mut message.context, &mapping.path, transformed_value);
}
message.invalidate_context_cache();
debug!("Successfully mapped to path: {}", mapping.path);
}
Err(e) => {
error!(
"Map: Error evaluating logic for path {}: {:?}",
mapping.path, e
);
errors_encountered = true;
}
}
}
let status = if errors_encountered { 500 } else { 200 };
Ok((status, changes))
}
pub fn execute_with_trace(
&self,
message: &mut Message,
datalogic: &Arc<DataLogic>,
logic_cache: &[Arc<CompiledLogic>],
) -> Result<(usize, Vec<Change>, Vec<Value>)> {
let mut changes = Vec::new();
let mut errors_encountered = false;
let mut context_snapshots = Vec::with_capacity(self.mappings.len());
debug!("Map (trace): Executing {} mappings", self.mappings.len());
for mapping in &self.mappings {
context_snapshots.push(message.context.clone());
let context_arc = message.get_context_arc();
debug!("Processing mapping to path: {}", mapping.path);
let compiled_logic = match mapping.logic_index {
Some(index) => {
if index >= logic_cache.len() {
error!(
"Map: Logic index {} out of bounds (cache size: {}) for mapping to {}",
index,
logic_cache.len(),
mapping.path
);
errors_encountered = true;
continue;
}
&logic_cache[index]
}
None => {
error!(
"Map: Logic not compiled (no index) for mapping to {}",
mapping.path
);
errors_encountered = true;
continue;
}
};
let result = datalogic.evaluate(compiled_logic, Arc::clone(&context_arc));
match result {
Ok(transformed_value) => {
debug!(
"Map: Evaluated logic for path {} resulted in: {:?}",
mapping.path, transformed_value
);
if transformed_value.is_null() {
debug!(
"Map: Skipping mapping for path {} as result is null",
mapping.path
);
continue;
}
let old_value = get_nested_value(&message.context, &mapping.path);
let old_value_arc = Arc::new(old_value.cloned().unwrap_or(Value::Null));
let new_value_arc = Arc::new(transformed_value.clone());
changes.push(Change {
path: Arc::from(mapping.path.as_str()),
old_value: old_value_arc,
new_value: Arc::clone(&new_value_arc),
});
if mapping.path == "data"
|| mapping.path == "metadata"
|| mapping.path == "temp_data"
{
if let Value::Object(new_map) = transformed_value {
if let Value::Object(existing_map) = &mut message.context[&mapping.path]
{
for (key, value) in new_map {
existing_map.insert(key, value);
}
} else {
message.context[&mapping.path] = Value::Object(new_map);
}
} else {
message.context[&mapping.path] = transformed_value;
}
} else {
set_nested_value(&mut message.context, &mapping.path, transformed_value);
}
message.invalidate_context_cache();
}
Err(e) => {
error!(
"Map: Error evaluating logic for path {}: {:?}",
mapping.path, e
);
errors_encountered = true;
}
}
}
let status = if errors_encountered { 500 } else { 200 };
Ok((status, changes, context_snapshots))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::message::Message;
use serde_json::json;
#[test]
fn test_map_config_from_json() {
let input = json!({
"mappings": [
{
"path": "data.field1",
"logic": {"var": "data.source"}
},
{
"path": "data.field2",
"logic": "static_value"
}
]
});
let config = MapConfig::from_json(&input).unwrap();
assert_eq!(config.mappings.len(), 2);
assert_eq!(config.mappings[0].path, "data.field1");
assert_eq!(config.mappings[1].path, "data.field2");
}
#[test]
fn test_map_config_missing_mappings() {
let input = json!({});
let result = MapConfig::from_json(&input);
assert!(result.is_err());
}
#[test]
fn test_map_config_invalid_mappings() {
let input = json!({
"mappings": "not_an_array"
});
let result = MapConfig::from_json(&input);
assert!(result.is_err());
}
#[test]
fn test_map_config_missing_path() {
let input = json!({
"mappings": [
{
"logic": {"var": "data.source"}
}
]
});
let result = MapConfig::from_json(&input);
assert!(result.is_err());
}
#[test]
fn test_map_config_missing_logic() {
let input = json!({
"mappings": [
{
"path": "data.field1"
}
]
});
let result = MapConfig::from_json(&input);
assert!(result.is_err());
}
#[test]
fn test_map_metadata_assignment() {
let datalogic = Arc::new(DataLogic::with_preserve_structure());
let mut message = Message::new(Arc::new(json!({})));
message.context["data"] = json!({
"SwiftMT": {
"message_type": "103"
}
});
let config = MapConfig {
mappings: vec![MapMapping {
path: "metadata.SwiftMT.message_type".to_string(),
logic: json!({"var": "data.SwiftMT.message_type"}),
logic_index: Some(0),
}],
};
let logic_cache = vec![datalogic.compile(&config.mappings[0].logic).unwrap()];
let result = config.execute(&mut message, &datalogic, &logic_cache);
assert!(result.is_ok());
let (status, changes) = result.unwrap();
assert_eq!(status, 200);
assert_eq!(changes.len(), 1);
assert_eq!(
message.context["metadata"]
.get("SwiftMT")
.and_then(|v| v.get("message_type")),
Some(&json!("103"))
);
}
#[test]
fn test_map_null_values_skip_assignment() {
let datalogic = Arc::new(DataLogic::with_preserve_structure());
let mut message = Message::new(Arc::new(json!({})));
message.context["data"] = json!({
"existing_field": "should_remain"
});
message.context["metadata"] = json!({
"existing_meta": "should_remain"
});
let config = MapConfig {
mappings: vec![
MapMapping {
path: "data.new_field".to_string(),
logic: json!({"var": "data.non_existent_field"}), logic_index: Some(0),
},
MapMapping {
path: "metadata.new_meta".to_string(),
logic: json!({"var": "data.another_non_existent"}), logic_index: Some(1),
},
MapMapping {
path: "data.actual_field".to_string(),
logic: json!("actual_value"), logic_index: Some(2),
},
],
};
let logic_cache = vec![
datalogic.compile(&config.mappings[0].logic).unwrap(),
datalogic.compile(&config.mappings[1].logic).unwrap(),
datalogic.compile(&config.mappings[2].logic).unwrap(),
];
let result = config.execute(&mut message, &datalogic, &logic_cache);
assert!(result.is_ok());
let (status, changes) = result.unwrap();
assert_eq!(status, 200);
assert_eq!(changes.len(), 1);
assert_eq!(changes[0].path.as_ref(), "data.actual_field");
assert_eq!(message.context["data"].get("new_field"), None);
assert_eq!(message.context["metadata"].get("new_meta"), None);
assert_eq!(
message.context["data"].get("existing_field"),
Some(&json!("should_remain"))
);
assert_eq!(
message.context["metadata"].get("existing_meta"),
Some(&json!("should_remain"))
);
assert_eq!(
message.context["data"].get("actual_field"),
Some(&json!("actual_value"))
);
}
#[test]
fn test_map_execute_with_trace_captures_context_snapshots() {
let datalogic = Arc::new(DataLogic::with_preserve_structure());
let mut message = Message::new(Arc::new(json!({})));
message.context["data"] = json!({
"first": "Alice",
"last": "Smith"
});
let mut config = MapConfig {
mappings: vec![
MapMapping {
path: "data.full_name".to_string(),
logic: json!({"cat": [{"var": "data.first"}, " ", {"var": "data.last"}]}),
logic_index: None,
},
MapMapping {
path: "data.greeting".to_string(),
logic: json!({"cat": ["Hello, ", {"var": "data.full_name"}]}),
logic_index: None,
},
],
};
let mut logic_cache = Vec::new();
for (i, mapping) in config.mappings.iter_mut().enumerate() {
logic_cache.push(datalogic.compile(&mapping.logic).unwrap());
mapping.logic_index = Some(i);
}
let result = config.execute_with_trace(&mut message, &datalogic, &logic_cache);
assert!(result.is_ok());
let (status, changes, context_snapshots) = result.unwrap();
assert_eq!(status, 200);
assert_eq!(changes.len(), 2);
assert_eq!(context_snapshots.len(), 2);
assert!(context_snapshots[0]["data"].get("full_name").is_none());
assert_eq!(
context_snapshots[1]["data"].get("full_name"),
Some(&json!("Alice Smith"))
);
}
#[test]
fn test_map_multiple_fields_including_metadata() {
let datalogic = Arc::new(DataLogic::with_preserve_structure());
let mut message = Message::new(Arc::new(json!({})));
message.context["data"] = json!({
"ISO20022_MX": {
"document": {
"TxInf": {
"OrgnlGrpInf": {
"OrgnlMsgNmId": "pacs.008.001.08"
}
}
}
},
"SwiftMT": {
"message_type": "103"
}
});
let mut config = MapConfig {
mappings: vec![
MapMapping {
path: "data.SwiftMT.message_type".to_string(),
logic: json!("103"),
logic_index: None,
},
MapMapping {
path: "metadata.SwiftMT.message_type".to_string(),
logic: json!({"var": "data.SwiftMT.message_type"}),
logic_index: None,
},
MapMapping {
path: "temp_data.original_msg_type".to_string(),
logic: json!({"var": "data.ISO20022_MX.document.TxInf.OrgnlGrpInf.OrgnlMsgNmId"}),
logic_index: None,
},
],
};
let mut logic_cache = Vec::new();
for (i, mapping) in config.mappings.iter_mut().enumerate() {
logic_cache.push(datalogic.compile(&mapping.logic).unwrap());
mapping.logic_index = Some(i);
}
let result = config.execute(&mut message, &datalogic, &logic_cache);
assert!(result.is_ok());
let (status, changes) = result.unwrap();
assert_eq!(status, 200);
assert_eq!(changes.len(), 3);
assert_eq!(
message.context["data"]
.get("SwiftMT")
.and_then(|v| v.get("message_type")),
Some(&json!("103"))
);
assert_eq!(
message.context["metadata"]
.get("SwiftMT")
.and_then(|v| v.get("message_type")),
Some(&json!("103"))
);
assert_eq!(
message.context["temp_data"].get("original_msg_type"),
Some(&json!("pacs.008.001.08"))
);
}
}