use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PayloadTransformError {
PathNotFound(String),
InvalidPath(String),
TransformFailed(String),
TypeConversionError(String),
InvalidOperation(String),
}
impl std::fmt::Display for PayloadTransformError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PayloadTransformError::PathNotFound(path) => {
write!(f, "Path not found in payload: {}", path)
}
PayloadTransformError::InvalidPath(path) => {
write!(f, "Invalid JSONPath expression: {}", path)
}
PayloadTransformError::TransformFailed(msg) => {
write!(f, "Transform failed: {}", msg)
}
PayloadTransformError::TypeConversionError(msg) => {
write!(f, "Type conversion error: {}", msg)
}
PayloadTransformError::InvalidOperation(msg) => {
write!(f, "Invalid operation: {}", msg)
}
}
}
}
impl std::error::Error for PayloadTransformError {}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum TransformOperation {
Extract {
source_path: String,
target_field: String,
optional: bool,
default: Option<Value>,
},
Rename {
from: String,
to: String,
},
AddConstant {
field: String,
value: Value,
},
RemoveFields {
fields: Vec<String>,
},
FilterFields {
fields: Vec<String>,
},
StringTransform {
source_path: String,
target_field: String,
transform: StringTransformType,
},
MapValue {
source_path: String,
target_field: String,
mappings: HashMap<String, Value>,
default: Option<Value>,
},
Template {
target_field: String,
template: String,
},
Flatten {
source_path: String,
prefix: Option<String>,
separator: String,
},
Wrap {
wrapper_field: String,
},
Custom {
target_field: String,
expression: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[serde(rename_all = "snake_case")]
pub enum StringTransformType {
Uppercase,
Lowercase,
Trim,
Replace { from: String, to: String },
Regex { pattern: String },
Split { delimiter: String, index: usize },
Prefix { prefix: String },
Suffix { suffix: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct FieldMapping {
pub source: String,
pub target: String,
pub optional: bool,
pub default: Option<Value>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct PayloadTransform {
pub operations: Vec<TransformOperation>,
pub start_empty: bool,
pub strict: bool,
}
impl PayloadTransform {
pub fn new() -> Self {
Self {
operations: Vec::new(),
start_empty: false,
strict: true,
}
}
pub fn empty() -> Self {
Self {
operations: Vec::new(),
start_empty: true,
strict: true,
}
}
pub fn strict(mut self, strict: bool) -> Self {
self.strict = strict;
self
}
pub fn extract_field(mut self, source_path: &str, target_field: &str) -> Self {
self.operations.push(TransformOperation::Extract {
source_path: source_path.to_string(),
target_field: target_field.to_string(),
optional: false,
default: None,
});
self
}
pub fn extract_field_or(
mut self,
source_path: &str,
target_field: &str,
default: Value,
) -> Self {
self.operations.push(TransformOperation::Extract {
source_path: source_path.to_string(),
target_field: target_field.to_string(),
optional: true,
default: Some(default),
});
self
}
pub fn rename_field(mut self, from: &str, to: &str) -> Self {
self.operations.push(TransformOperation::Rename {
from: from.to_string(),
to: to.to_string(),
});
self
}
pub fn add_default(mut self, field: &str, value: Value) -> Self {
self.operations.push(TransformOperation::AddConstant {
field: field.to_string(),
value,
});
self
}
pub fn remove_fields(mut self, fields: &[&str]) -> Self {
self.operations.push(TransformOperation::RemoveFields {
fields: fields.iter().map(|s| s.to_string()).collect(),
});
self
}
pub fn filter_fields(mut self, fields: &[&str]) -> Self {
self.operations.push(TransformOperation::FilterFields {
fields: fields.iter().map(|s| s.to_string()).collect(),
});
self
}
pub fn string_transform(
mut self,
source_path: &str,
target_field: &str,
transform: StringTransformType,
) -> Self {
self.operations.push(TransformOperation::StringTransform {
source_path: source_path.to_string(),
target_field: target_field.to_string(),
transform,
});
self
}
pub fn map_value(
mut self,
source_path: &str,
target_field: &str,
mappings: HashMap<String, Value>,
default: Option<Value>,
) -> Self {
self.operations.push(TransformOperation::MapValue {
source_path: source_path.to_string(),
target_field: target_field.to_string(),
mappings,
default,
});
self
}
pub fn template(mut self, target_field: &str, template: &str) -> Self {
self.operations.push(TransformOperation::Template {
target_field: target_field.to_string(),
template: template.to_string(),
});
self
}
pub fn wrap(mut self, wrapper_field: &str) -> Self {
self.operations.push(TransformOperation::Wrap {
wrapper_field: wrapper_field.to_string(),
});
self
}
pub fn flatten(mut self, source_path: &str, prefix: Option<&str>, separator: &str) -> Self {
self.operations.push(TransformOperation::Flatten {
source_path: source_path.to_string(),
prefix: prefix.map(|s| s.to_string()),
separator: separator.to_string(),
});
self
}
pub fn add_operation(mut self, operation: TransformOperation) -> Self {
self.operations.push(operation);
self
}
pub fn apply(&self, input: &Value) -> Result<Value, PayloadTransformError> {
let mut result = if self.start_empty {
Value::Object(serde_json::Map::new())
} else {
input.clone()
};
for operation in &self.operations {
match self.apply_operation(&mut result, input, operation) {
Ok(()) => {}
Err(e) if self.strict => return Err(e),
Err(_) => continue, }
}
Ok(result)
}
fn apply_operation(
&self,
result: &mut Value,
input: &Value,
operation: &TransformOperation,
) -> Result<(), PayloadTransformError> {
match operation {
TransformOperation::Extract {
source_path,
target_field,
optional,
default,
} => {
let value = get_value_by_path(input, source_path);
match value {
Some(v) => set_field(result, target_field, v.clone()),
None if *optional => {
if let Some(def) = default {
set_field(result, target_field, def.clone());
}
}
None => return Err(PayloadTransformError::PathNotFound(source_path.clone())),
}
}
TransformOperation::Rename { from, to } => {
if let Some(value) = get_value_by_path(result, from) {
let v = value.clone();
remove_field(result, from);
set_field(result, to, v);
}
}
TransformOperation::AddConstant { field, value } => {
set_field(result, field, value.clone());
}
TransformOperation::RemoveFields { fields } => {
for field in fields {
remove_field(result, field);
}
}
TransformOperation::FilterFields { fields } => {
if let Value::Object(map) = result {
let fields_set: std::collections::HashSet<_> = fields.iter().collect();
map.retain(|k, _| fields_set.contains(k));
}
}
TransformOperation::StringTransform {
source_path,
target_field,
transform,
} => {
if let Some(Value::String(s)) = get_value_by_path(input, source_path) {
let transformed = apply_string_transform(s, transform)?;
set_field(result, target_field, Value::String(transformed));
}
}
TransformOperation::MapValue {
source_path,
target_field,
mappings,
default,
} => {
if let Some(value) = get_value_by_path(input, source_path) {
let key = match value {
Value::String(s) => s.clone(),
v => v.to_string(),
};
if let Some(mapped) = mappings.get(&key) {
set_field(result, target_field, mapped.clone());
} else if let Some(def) = default {
set_field(result, target_field, def.clone());
}
}
}
TransformOperation::Template {
target_field,
template,
} => {
let rendered = render_template(template, input);
set_field(result, target_field, Value::String(rendered));
}
TransformOperation::Flatten {
source_path,
prefix,
separator,
} => {
if let Some(Value::Object(map)) = get_value_by_path(input, source_path) {
for (key, value) in map {
let new_key = match prefix {
Some(p) => format!("{}{}{}", p, separator, key),
None => key.clone(),
};
set_field(result, &new_key, value.clone());
}
}
}
TransformOperation::Wrap { wrapper_field } => {
let current = result.clone();
*result = serde_json::json!({
wrapper_field: current
});
}
TransformOperation::Custom {
target_field,
expression,
} => {
set_field(result, target_field, Value::String(expression.clone()));
}
}
Ok(())
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct TransformPipeline {
pub name: String,
pub description: Option<String>,
pub transforms: Vec<PayloadTransform>,
}
impl TransformPipeline {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
description: None,
transforms: Vec::new(),
}
}
pub fn with_description(mut self, description: &str) -> Self {
self.description = Some(description.to_string());
self
}
#[allow(clippy::should_implement_trait)]
pub fn add(mut self, transform: PayloadTransform) -> Self {
self.transforms.push(transform);
self
}
pub fn apply(&self, input: &Value) -> Result<Value, PayloadTransformError> {
let mut result = input.clone();
for transform in &self.transforms {
result = transform.apply(&result)?;
}
Ok(result)
}
}
fn get_value_by_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
let path = path.strip_prefix("$.").unwrap_or(path);
let parts: Vec<&str> = path.split('.').collect();
let mut current = value;
for part in parts {
if part.is_empty() {
continue;
}
if let Some(idx_start) = part.find('[') {
let field = &part[..idx_start];
let idx_end = part.find(']')?;
let idx: usize = part[idx_start + 1..idx_end].parse().ok()?;
current = current.get(field)?;
current = current.get(idx)?;
} else {
current = current.get(part)?;
}
}
Some(current)
}
fn set_field(value: &mut Value, field: &str, new_value: Value) {
if let Value::Object(map) = value {
map.insert(field.to_string(), new_value);
}
}
fn remove_field(value: &mut Value, path: &str) {
let path = path.strip_prefix("$.").unwrap_or(path);
if let Value::Object(map) = value {
map.remove(path);
}
}
fn apply_string_transform(
s: &str,
transform: &StringTransformType,
) -> Result<String, PayloadTransformError> {
match transform {
StringTransformType::Uppercase => Ok(s.to_uppercase()),
StringTransformType::Lowercase => Ok(s.to_lowercase()),
StringTransformType::Trim => Ok(s.trim().to_string()),
StringTransformType::Replace { from, to } => Ok(s.replace(from, to)),
StringTransformType::Regex { pattern } => {
let re = regex::Regex::new(pattern)
.map_err(|e| PayloadTransformError::InvalidOperation(e.to_string()))?;
if let Some(caps) = re.captures(s) {
if let Some(m) = caps.get(1) {
return Ok(m.as_str().to_string());
}
}
Ok(String::new())
}
StringTransformType::Split { delimiter, index } => {
let parts: Vec<&str> = s.split(delimiter).collect();
Ok(parts.get(*index).unwrap_or(&"").to_string())
}
StringTransformType::Prefix { prefix } => Ok(format!("{}{}", prefix, s)),
StringTransformType::Suffix { suffix } => Ok(format!("{}{}", s, suffix)),
}
}
fn render_template(template: &str, input: &Value) -> String {
let re = regex::Regex::new(r"\{\{([^}]+)\}\}").unwrap();
re.replace_all(template, |caps: ®ex::Captures| {
let path = &caps[1];
get_value_by_path(input, path)
.map(|v| match v {
Value::String(s) => s.clone(),
other => other.to_string(),
})
.unwrap_or_default()
})
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_extract_field() {
let transform = PayloadTransform::empty().extract_field("$.data.user", "user");
let input = json!({
"data": {
"user": { "id": 123, "name": "Alice" }
}
});
let result = transform.apply(&input).unwrap();
assert_eq!(result["user"]["id"], 123);
assert_eq!(result["user"]["name"], "Alice");
}
#[test]
fn test_extract_optional_with_default() {
let transform = PayloadTransform::empty().extract_field_or(
"$.missing.field",
"value",
json!("default"),
);
let input = json!({ "other": "data" });
let result = transform.apply(&input).unwrap();
assert_eq!(result["value"], "default");
}
#[test]
fn test_rename_field() {
let transform = PayloadTransform::new().rename_field("$.old_name", "new_name");
let input = json!({
"old_name": "value",
"other": "data"
});
let result = transform.apply(&input).unwrap();
assert_eq!(result["new_name"], "value");
assert!(result.get("old_name").is_none());
}
#[test]
fn test_add_constant() {
let transform = PayloadTransform::new()
.add_default("source", json!("webhook"))
.add_default("version", json!(1));
let input = json!({ "data": "test" });
let result = transform.apply(&input).unwrap();
assert_eq!(result["source"], "webhook");
assert_eq!(result["version"], 1);
assert_eq!(result["data"], "test");
}
#[test]
fn test_remove_fields() {
let transform = PayloadTransform::new().remove_fields(&["secret", "internal"]);
let input = json!({
"data": "keep",
"secret": "remove",
"internal": "remove"
});
let result = transform.apply(&input).unwrap();
assert_eq!(result["data"], "keep");
assert!(result.get("secret").is_none());
assert!(result.get("internal").is_none());
}
#[test]
fn test_filter_fields() {
let transform = PayloadTransform::new().filter_fields(&["id", "name"]);
let input = json!({
"id": 123,
"name": "test",
"secret": "hidden",
"internal": "hidden"
});
let result = transform.apply(&input).unwrap();
assert_eq!(result["id"], 123);
assert_eq!(result["name"], "test");
assert!(result.get("secret").is_none());
assert!(result.get("internal").is_none());
}
#[test]
fn test_string_transform_uppercase() {
let transform = PayloadTransform::new().string_transform(
"$.action",
"action_upper",
StringTransformType::Uppercase,
);
let input = json!({ "action": "user.created" });
let result = transform.apply(&input).unwrap();
assert_eq!(result["action_upper"], "USER.CREATED");
}
#[test]
fn test_string_transform_split() {
let transform = PayloadTransform::new().string_transform(
"$.action",
"entity",
StringTransformType::Split {
delimiter: ".".to_string(),
index: 0,
},
);
let input = json!({ "action": "user.created" });
let result = transform.apply(&input).unwrap();
assert_eq!(result["entity"], "user");
}
#[test]
fn test_map_value() {
let mut mappings = HashMap::new();
mappings.insert("created".to_string(), json!("new"));
mappings.insert("updated".to_string(), json!("modified"));
mappings.insert("deleted".to_string(), json!("removed"));
let transform = PayloadTransform::new().map_value(
"$.action",
"status",
mappings,
Some(json!("unknown")),
);
let input = json!({ "action": "created" });
let result = transform.apply(&input).unwrap();
assert_eq!(result["status"], "new");
let input2 = json!({ "action": "other" });
let result2 = transform.apply(&input2).unwrap();
assert_eq!(result2["status"], "unknown");
}
#[test]
fn test_template() {
let transform = PayloadTransform::new()
.template("message", "User {{$.user.name}} performed {{$.action}}");
let input = json!({
"user": { "name": "Alice" },
"action": "login"
});
let result = transform.apply(&input).unwrap();
assert_eq!(result["message"], "User Alice performed login");
}
#[test]
fn test_wrap() {
let transform = PayloadTransform::new().wrap("payload");
let input = json!({ "data": "test" });
let result = transform.apply(&input).unwrap();
assert_eq!(result["payload"]["data"], "test");
}
#[test]
fn test_flatten() {
let transform = PayloadTransform::new().flatten("$.metadata", Some("meta"), "_");
let input = json!({
"id": 1,
"metadata": {
"created": "2026-01-01",
"version": "1.0"
}
});
let result = transform.apply(&input).unwrap();
assert_eq!(result["meta_created"], "2026-01-01");
assert_eq!(result["meta_version"], "1.0");
}
#[test]
fn test_complex_pipeline() {
let transform = PayloadTransform::empty()
.extract_field("$.repository.full_name", "repo")
.extract_field("$.sender.login", "actor")
.extract_field("$.action", "event")
.add_default("source", json!("github"))
.string_transform("$.action", "event_type", StringTransformType::Uppercase);
let input = json!({
"action": "opened",
"repository": {
"full_name": "org/repo"
},
"sender": {
"login": "user123"
}
});
let result = transform.apply(&input).unwrap();
assert_eq!(result["repo"], "org/repo");
assert_eq!(result["actor"], "user123");
assert_eq!(result["event"], "opened");
assert_eq!(result["source"], "github");
assert_eq!(result["event_type"], "OPENED");
}
#[test]
fn test_non_strict_mode() {
let transform = PayloadTransform::new()
.strict(false)
.extract_field("$.missing", "value") .add_default("added", json!("success"));
let input = json!({ "other": "data" });
let result = transform.apply(&input).unwrap(); assert_eq!(result["added"], "success");
assert_eq!(result["other"], "data");
}
#[test]
fn test_pipeline() {
let pipeline = TransformPipeline::new("github_webhook")
.with_description("Transform GitHub webhook payloads")
.add(
PayloadTransform::empty()
.extract_field("$.repository.name", "repo")
.extract_field("$.action", "event"),
)
.add(PayloadTransform::new().add_default("processed", json!(true)));
let input = json!({
"action": "push",
"repository": { "name": "test-repo" }
});
let result = pipeline.apply(&input).unwrap();
assert_eq!(result["repo"], "test-repo");
assert_eq!(result["event"], "push");
assert_eq!(result["processed"], true);
}
#[test]
fn test_array_index_access() {
let transform = PayloadTransform::empty().extract_field("$.items[0].name", "first_item");
let input = json!({
"items": [
{ "name": "first" },
{ "name": "second" }
]
});
let result = transform.apply(&input).unwrap();
assert_eq!(result["first_item"], "first");
}
}