use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::common::message::Message;
use crate::error::{Error, Result};
use crate::transform::json_path::CompiledPath;
use crate::transform::step::Step;
use crate::transform::value::{CompiledMapping, FieldMapping};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemapStepConfig {
pub mappings: Vec<FieldMapping>,
#[serde(default)]
pub keep_unmapped: bool,
}
pub struct RemapStep {
mappings: Vec<CompiledMapping>,
keep_unmapped: bool,
}
impl RemapStep {
pub fn new(config: RemapStepConfig) -> Result<Self> {
if config.mappings.is_empty() {
return Err(Error::config("Remap step requires at least one mapping"));
}
let mut mappings = Vec::with_capacity(config.mappings.len());
for m in &config.mappings {
let compiled = m
.compile()
.map_err(|e| Error::config(format!("Remap mapping to '{}': {}", m.to, e)))?;
mappings.push(compiled);
}
Ok(Self {
mappings,
keep_unmapped: config.keep_unmapped,
})
}
}
impl Step for RemapStep {
fn step_type(&self) -> &'static str {
"remap"
}
fn process(&self, mut msg: Message) -> Result<Option<Message>> {
let mut updates: Vec<(&CompiledPath, Value)> = Vec::with_capacity(self.mappings.len());
for mapping in &self.mappings {
let value = mapping.source.resolve(&msg);
if value.is_null() && mapping.source.should_skip_null() {
tracing::debug!(to = %mapping.to, "Source resolved to null, skipping mapping");
continue;
}
updates.push((&mapping.to, value));
tracing::trace!(to = %mapping.to, "Remapped field");
}
let new_payload = if self.keep_unmapped {
std::mem::take(&mut msg.payload)
} else {
Value::Object(serde_json::Map::new())
};
msg.payload = new_payload;
for (to, value) in updates {
to.set(&mut msg.payload, value);
}
Ok(Some(msg))
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn make_msg(payload: Value) -> Message {
Message::new("test", payload)
}
#[test]
fn test_remap_simple_field() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("$.old".into()),
value: None,
to: "$.new".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"old": "value", "other": "ignored"}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload, json!({"new": "value"}));
}
#[test]
fn test_remap_nested_to_flat() {
let config = RemapStepConfig {
mappings: vec![
FieldMapping {
from: Some("$.data.user.id".into()),
value: None,
to: "$.user_id".into(),
},
FieldMapping {
from: Some("$.data.user.name".into()),
value: None,
to: "$.username".into(),
},
],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({
"data": {
"user": {
"id": 123,
"name": "Alice"
}
}
}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["user_id"], 123);
assert_eq!(result.payload["username"], "Alice");
}
#[test]
fn test_remap_flat_to_nested() {
let config = RemapStepConfig {
mappings: vec![
FieldMapping {
from: Some("$.id".into()),
value: None,
to: "$.user.id".into(),
},
FieldMapping {
from: Some("$.name".into()),
value: None,
to: "$.user.name".into(),
},
],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"id": 456, "name": "Bob"}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["user"]["id"], 456);
assert_eq!(result.payload["user"]["name"], "Bob");
}
#[test]
fn test_remap_keep_unmapped() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("$.old".into()),
value: None,
to: "$.new".into(),
}],
keep_unmapped: true,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"old": "value", "other": "kept"}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["new"], "value");
assert_eq!(result.payload["other"], "kept");
assert_eq!(result.payload["old"], "value");
}
#[test]
fn test_remap_missing_source_field_skipped() {
let config = RemapStepConfig {
mappings: vec![
FieldMapping {
from: Some("$.exists".into()),
value: None,
to: "$.found".into(),
},
FieldMapping {
from: Some("$.missing".into()),
value: None,
to: "$.not_found".into(),
},
],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"exists": "here"}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["found"], "here");
assert!(result.payload.get("not_found").is_none());
}
#[test]
fn test_remap_array_index() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("$.items[0].id".into()),
value: None,
to: "$.first_id".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"items": [{"id": 1}, {"id": 2}]}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["first_id"], 1);
}
#[test]
fn test_remap_complex_object() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("$.data".into()),
value: None,
to: "$.payload".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"data": {"nested": {"value": 42}}}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["payload"]["nested"]["value"], 42);
}
#[test]
fn test_remap_literal_value_from() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("static_value".into()),
value: None,
to: "$.injected".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"data": "ignored"}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["injected"], "static_value");
}
#[test]
fn test_remap_template() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("Hello {{ $.name }}!".into()),
value: None,
to: "$.greeting".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"name": "World"}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["greeting"], "Hello World!");
}
#[test]
fn test_remap_template_multiple_parts() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("{{ $.first }} and {{ $.second }}".into()),
value: None,
to: "$.combined".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"first": "A", "second": "B"}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["combined"], "A and B");
}
#[test]
fn test_remap_template_mixed_types() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("Count: {{ $.count }}".into()),
value: None,
to: "$.info".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"count": 42}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["info"], "Count: 42");
}
#[test]
fn test_remap_template_missing_field() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("Hello {{ $.missing }}".into()),
value: None,
to: "$.greeting".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"name": "World"}));
let result = step.process(msg).unwrap().unwrap();
assert!(result.payload.get("greeting").is_none());
}
#[test]
fn test_remap_value_precedence() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("$.ignore".into()),
value: Some(json!("priority")),
to: "$.result".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({"ignore": "ignored"}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["result"], "priority");
}
#[test]
fn test_remap_builtin_variable_uuid() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: None,
value: Some(json!("$UUID")),
to: "$.id".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({}));
let result = step.process(msg).unwrap().unwrap();
let uuid = result.payload["id"].as_str().unwrap();
assert_eq!(uuid.len(), 36); }
#[test]
fn test_remap_builtin_variable_now() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: None,
value: Some(json!("$NOW")),
to: "$.timestamp".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({}));
let result = step.process(msg).unwrap().unwrap();
let ts = result.payload["timestamp"].as_str().unwrap();
assert!(ts.contains("T")); }
#[test]
fn test_remap_builtin_variable_source_id() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: None,
value: Some(json!("$SOURCE_ID")),
to: "$.source".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({}));
let result = step.process(msg).unwrap().unwrap();
assert_eq!(result.payload["source"], "test");
}
#[test]
fn test_remap_template_with_variable() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("ID: {{ $UUID }}".into()),
value: None,
to: "$.generated".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({}));
let result = step.process(msg).unwrap().unwrap();
let generated = result.payload["generated"].as_str().unwrap();
assert!(generated.starts_with("ID: "));
assert!(generated.len() >= 40); }
#[test]
fn test_remap_variable_from_field() {
let config = RemapStepConfig {
mappings: vec![FieldMapping {
from: Some("$NOW".into()),
value: None,
to: "$.ts".into(),
}],
keep_unmapped: false,
};
let step = RemapStep::new(config).unwrap();
let msg = make_msg(json!({}));
let result = step.process(msg).unwrap().unwrap();
let ts = result.payload["ts"].as_str().unwrap();
assert!(ts.contains("T"));
}
}