1use serde::{Deserialize, Serialize};
34use serde_json::Value;
35use std::collections::HashMap;
36
37#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum PayloadTransformError {
40 PathNotFound(String),
42 InvalidPath(String),
44 TransformFailed(String),
46 TypeConversionError(String),
48 InvalidOperation(String),
50}
51
52impl std::fmt::Display for PayloadTransformError {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 match self {
55 PayloadTransformError::PathNotFound(path) => {
56 write!(f, "Path not found in payload: {}", path)
57 }
58 PayloadTransformError::InvalidPath(path) => {
59 write!(f, "Invalid JSONPath expression: {}", path)
60 }
61 PayloadTransformError::TransformFailed(msg) => {
62 write!(f, "Transform failed: {}", msg)
63 }
64 PayloadTransformError::TypeConversionError(msg) => {
65 write!(f, "Type conversion error: {}", msg)
66 }
67 PayloadTransformError::InvalidOperation(msg) => {
68 write!(f, "Invalid operation: {}", msg)
69 }
70 }
71 }
72}
73
74impl std::error::Error for PayloadTransformError {}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
79#[serde(tag = "type", rename_all = "snake_case")]
80pub enum TransformOperation {
81 Extract {
83 source_path: String,
85 target_field: String,
87 optional: bool,
89 default: Option<Value>,
91 },
92
93 Rename {
95 from: String,
97 to: String,
99 },
100
101 AddConstant {
103 field: String,
105 value: Value,
107 },
108
109 RemoveFields {
111 fields: Vec<String>,
113 },
114
115 FilterFields {
117 fields: Vec<String>,
119 },
120
121 StringTransform {
123 source_path: String,
125 target_field: String,
127 transform: StringTransformType,
129 },
130
131 MapValue {
133 source_path: String,
135 target_field: String,
137 mappings: HashMap<String, Value>,
139 default: Option<Value>,
141 },
142
143 Template {
145 target_field: String,
147 template: String,
149 },
150
151 Flatten {
153 source_path: String,
155 prefix: Option<String>,
157 separator: String,
159 },
160
161 Wrap {
163 wrapper_field: String,
165 },
166
167 Custom {
169 target_field: String,
171 expression: String,
173 },
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
179#[serde(rename_all = "snake_case")]
180pub enum StringTransformType {
181 Uppercase,
183 Lowercase,
185 Trim,
187 Replace { from: String, to: String },
189 Regex { pattern: String },
191 Split { delimiter: String, index: usize },
193 Prefix { prefix: String },
195 Suffix { suffix: String },
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
201#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
202pub struct FieldMapping {
203 pub source: String,
205 pub target: String,
207 pub optional: bool,
209 pub default: Option<Value>,
211}
212
213#[derive(Debug, Clone, Default, Serialize, Deserialize)]
215#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
216pub struct PayloadTransform {
217 pub operations: Vec<TransformOperation>,
219 pub start_empty: bool,
221 pub strict: bool,
223}
224
225impl PayloadTransform {
226 pub fn new() -> Self {
228 Self {
229 operations: Vec::new(),
230 start_empty: false,
231 strict: true,
232 }
233 }
234
235 pub fn empty() -> Self {
237 Self {
238 operations: Vec::new(),
239 start_empty: true,
240 strict: true,
241 }
242 }
243
244 pub fn strict(mut self, strict: bool) -> Self {
246 self.strict = strict;
247 self
248 }
249
250 pub fn extract_field(mut self, source_path: &str, target_field: &str) -> Self {
252 self.operations.push(TransformOperation::Extract {
253 source_path: source_path.to_string(),
254 target_field: target_field.to_string(),
255 optional: false,
256 default: None,
257 });
258 self
259 }
260
261 pub fn extract_field_or(
263 mut self,
264 source_path: &str,
265 target_field: &str,
266 default: Value,
267 ) -> Self {
268 self.operations.push(TransformOperation::Extract {
269 source_path: source_path.to_string(),
270 target_field: target_field.to_string(),
271 optional: true,
272 default: Some(default),
273 });
274 self
275 }
276
277 pub fn rename_field(mut self, from: &str, to: &str) -> Self {
279 self.operations.push(TransformOperation::Rename {
280 from: from.to_string(),
281 to: to.to_string(),
282 });
283 self
284 }
285
286 pub fn add_default(mut self, field: &str, value: Value) -> Self {
288 self.operations.push(TransformOperation::AddConstant {
289 field: field.to_string(),
290 value,
291 });
292 self
293 }
294
295 pub fn remove_fields(mut self, fields: &[&str]) -> Self {
297 self.operations.push(TransformOperation::RemoveFields {
298 fields: fields.iter().map(|s| s.to_string()).collect(),
299 });
300 self
301 }
302
303 pub fn filter_fields(mut self, fields: &[&str]) -> Self {
305 self.operations.push(TransformOperation::FilterFields {
306 fields: fields.iter().map(|s| s.to_string()).collect(),
307 });
308 self
309 }
310
311 pub fn string_transform(
313 mut self,
314 source_path: &str,
315 target_field: &str,
316 transform: StringTransformType,
317 ) -> Self {
318 self.operations.push(TransformOperation::StringTransform {
319 source_path: source_path.to_string(),
320 target_field: target_field.to_string(),
321 transform,
322 });
323 self
324 }
325
326 pub fn map_value(
328 mut self,
329 source_path: &str,
330 target_field: &str,
331 mappings: HashMap<String, Value>,
332 default: Option<Value>,
333 ) -> Self {
334 self.operations.push(TransformOperation::MapValue {
335 source_path: source_path.to_string(),
336 target_field: target_field.to_string(),
337 mappings,
338 default,
339 });
340 self
341 }
342
343 pub fn template(mut self, target_field: &str, template: &str) -> Self {
345 self.operations.push(TransformOperation::Template {
346 target_field: target_field.to_string(),
347 template: template.to_string(),
348 });
349 self
350 }
351
352 pub fn wrap(mut self, wrapper_field: &str) -> Self {
354 self.operations.push(TransformOperation::Wrap {
355 wrapper_field: wrapper_field.to_string(),
356 });
357 self
358 }
359
360 pub fn flatten(mut self, source_path: &str, prefix: Option<&str>, separator: &str) -> Self {
362 self.operations.push(TransformOperation::Flatten {
363 source_path: source_path.to_string(),
364 prefix: prefix.map(|s| s.to_string()),
365 separator: separator.to_string(),
366 });
367 self
368 }
369
370 pub fn add_operation(mut self, operation: TransformOperation) -> Self {
372 self.operations.push(operation);
373 self
374 }
375
376 pub fn apply(&self, input: &Value) -> Result<Value, PayloadTransformError> {
378 let mut result = if self.start_empty {
379 Value::Object(serde_json::Map::new())
380 } else {
381 input.clone()
382 };
383
384 for operation in &self.operations {
385 match self.apply_operation(&mut result, input, operation) {
386 Ok(()) => {}
387 Err(e) if self.strict => return Err(e),
388 Err(_) => continue, }
390 }
391
392 Ok(result)
393 }
394
395 fn apply_operation(
396 &self,
397 result: &mut Value,
398 input: &Value,
399 operation: &TransformOperation,
400 ) -> Result<(), PayloadTransformError> {
401 match operation {
402 TransformOperation::Extract {
403 source_path,
404 target_field,
405 optional,
406 default,
407 } => {
408 let value = get_value_by_path(input, source_path);
409 match value {
410 Some(v) => set_field(result, target_field, v.clone()),
411 None if *optional => {
412 if let Some(def) = default {
413 set_field(result, target_field, def.clone());
414 }
415 }
416 None => return Err(PayloadTransformError::PathNotFound(source_path.clone())),
417 }
418 }
419
420 TransformOperation::Rename { from, to } => {
421 if let Some(value) = get_value_by_path(result, from) {
422 let v = value.clone();
423 remove_field(result, from);
424 set_field(result, to, v);
425 }
426 }
427
428 TransformOperation::AddConstant { field, value } => {
429 set_field(result, field, value.clone());
430 }
431
432 TransformOperation::RemoveFields { fields } => {
433 for field in fields {
434 remove_field(result, field);
435 }
436 }
437
438 TransformOperation::FilterFields { fields } => {
439 if let Value::Object(map) = result {
440 let fields_set: std::collections::HashSet<_> = fields.iter().collect();
441 map.retain(|k, _| fields_set.contains(k));
442 }
443 }
444
445 TransformOperation::StringTransform {
446 source_path,
447 target_field,
448 transform,
449 } => {
450 if let Some(Value::String(s)) = get_value_by_path(input, source_path) {
451 let transformed = apply_string_transform(s, transform)?;
452 set_field(result, target_field, Value::String(transformed));
453 }
454 }
455
456 TransformOperation::MapValue {
457 source_path,
458 target_field,
459 mappings,
460 default,
461 } => {
462 if let Some(value) = get_value_by_path(input, source_path) {
463 let key = match value {
464 Value::String(s) => s.clone(),
465 v => v.to_string(),
466 };
467 if let Some(mapped) = mappings.get(&key) {
468 set_field(result, target_field, mapped.clone());
469 } else if let Some(def) = default {
470 set_field(result, target_field, def.clone());
471 }
472 }
473 }
474
475 TransformOperation::Template {
476 target_field,
477 template,
478 } => {
479 let rendered = render_template(template, input);
480 set_field(result, target_field, Value::String(rendered));
481 }
482
483 TransformOperation::Flatten {
484 source_path,
485 prefix,
486 separator,
487 } => {
488 if let Some(Value::Object(map)) = get_value_by_path(input, source_path) {
489 for (key, value) in map {
490 let new_key = match prefix {
491 Some(p) => format!("{}{}{}", p, separator, key),
492 None => key.clone(),
493 };
494 set_field(result, &new_key, value.clone());
495 }
496 }
497 }
498
499 TransformOperation::Wrap { wrapper_field } => {
500 let current = result.clone();
501 *result = serde_json::json!({
502 wrapper_field: current
503 });
504 }
505
506 TransformOperation::Custom {
507 target_field,
508 expression,
509 } => {
510 set_field(result, target_field, Value::String(expression.clone()));
513 }
514 }
515
516 Ok(())
517 }
518}
519
520#[derive(Debug, Clone, Default, Serialize, Deserialize)]
522#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
523pub struct TransformPipeline {
524 pub name: String,
526 pub description: Option<String>,
528 pub transforms: Vec<PayloadTransform>,
530}
531
532impl TransformPipeline {
533 pub fn new(name: &str) -> Self {
535 Self {
536 name: name.to_string(),
537 description: None,
538 transforms: Vec::new(),
539 }
540 }
541
542 pub fn with_description(mut self, description: &str) -> Self {
544 self.description = Some(description.to_string());
545 self
546 }
547
548 #[allow(clippy::should_implement_trait)]
550 pub fn add(mut self, transform: PayloadTransform) -> Self {
551 self.transforms.push(transform);
552 self
553 }
554
555 pub fn apply(&self, input: &Value) -> Result<Value, PayloadTransformError> {
557 let mut result = input.clone();
558 for transform in &self.transforms {
559 result = transform.apply(&result)?;
560 }
561 Ok(result)
562 }
563}
564
565fn get_value_by_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
569 let path = path.strip_prefix("$.").unwrap_or(path);
570 let parts: Vec<&str> = path.split('.').collect();
571
572 let mut current = value;
573 for part in parts {
574 if part.is_empty() {
575 continue;
576 }
577 if let Some(idx_start) = part.find('[') {
579 let field = &part[..idx_start];
580 let idx_end = part.find(']')?;
581 let idx: usize = part[idx_start + 1..idx_end].parse().ok()?;
582
583 current = current.get(field)?;
584 current = current.get(idx)?;
585 } else {
586 current = current.get(part)?;
587 }
588 }
589
590 Some(current)
591}
592
593fn set_field(value: &mut Value, field: &str, new_value: Value) {
595 if let Value::Object(map) = value {
596 map.insert(field.to_string(), new_value);
597 }
598}
599
600fn remove_field(value: &mut Value, path: &str) {
602 let path = path.strip_prefix("$.").unwrap_or(path);
603 if let Value::Object(map) = value {
604 map.remove(path);
605 }
606}
607
608fn apply_string_transform(
610 s: &str,
611 transform: &StringTransformType,
612) -> Result<String, PayloadTransformError> {
613 match transform {
614 StringTransformType::Uppercase => Ok(s.to_uppercase()),
615 StringTransformType::Lowercase => Ok(s.to_lowercase()),
616 StringTransformType::Trim => Ok(s.trim().to_string()),
617 StringTransformType::Replace { from, to } => Ok(s.replace(from, to)),
618 StringTransformType::Regex { pattern } => {
619 let re = regex::Regex::new(pattern)
620 .map_err(|e| PayloadTransformError::InvalidOperation(e.to_string()))?;
621 if let Some(caps) = re.captures(s) {
622 if let Some(m) = caps.get(1) {
623 return Ok(m.as_str().to_string());
624 }
625 }
626 Ok(String::new())
627 }
628 StringTransformType::Split { delimiter, index } => {
629 let parts: Vec<&str> = s.split(delimiter).collect();
630 Ok(parts.get(*index).unwrap_or(&"").to_string())
631 }
632 StringTransformType::Prefix { prefix } => Ok(format!("{}{}", prefix, s)),
633 StringTransformType::Suffix { suffix } => Ok(format!("{}{}", s, suffix)),
634 }
635}
636
637fn render_template(template: &str, input: &Value) -> String {
639 let re = regex::Regex::new(r"\{\{([^}]+)\}\}").unwrap();
640 re.replace_all(template, |caps: ®ex::Captures| {
641 let path = &caps[1];
642 get_value_by_path(input, path)
643 .map(|v| match v {
644 Value::String(s) => s.clone(),
645 other => other.to_string(),
646 })
647 .unwrap_or_default()
648 })
649 .to_string()
650}
651
652#[cfg(test)]
653mod tests {
654 use super::*;
655 use serde_json::json;
656
657 #[test]
658 fn test_extract_field() {
659 let transform = PayloadTransform::empty().extract_field("$.data.user", "user");
660
661 let input = json!({
662 "data": {
663 "user": { "id": 123, "name": "Alice" }
664 }
665 });
666
667 let result = transform.apply(&input).unwrap();
668 assert_eq!(result["user"]["id"], 123);
669 assert_eq!(result["user"]["name"], "Alice");
670 }
671
672 #[test]
673 fn test_extract_optional_with_default() {
674 let transform = PayloadTransform::empty().extract_field_or(
675 "$.missing.field",
676 "value",
677 json!("default"),
678 );
679
680 let input = json!({ "other": "data" });
681 let result = transform.apply(&input).unwrap();
682 assert_eq!(result["value"], "default");
683 }
684
685 #[test]
686 fn test_rename_field() {
687 let transform = PayloadTransform::new().rename_field("$.old_name", "new_name");
688
689 let input = json!({
690 "old_name": "value",
691 "other": "data"
692 });
693
694 let result = transform.apply(&input).unwrap();
695 assert_eq!(result["new_name"], "value");
696 assert!(result.get("old_name").is_none());
697 }
698
699 #[test]
700 fn test_add_constant() {
701 let transform = PayloadTransform::new()
702 .add_default("source", json!("webhook"))
703 .add_default("version", json!(1));
704
705 let input = json!({ "data": "test" });
706 let result = transform.apply(&input).unwrap();
707 assert_eq!(result["source"], "webhook");
708 assert_eq!(result["version"], 1);
709 assert_eq!(result["data"], "test");
710 }
711
712 #[test]
713 fn test_remove_fields() {
714 let transform = PayloadTransform::new().remove_fields(&["secret", "internal"]);
715
716 let input = json!({
717 "data": "keep",
718 "secret": "remove",
719 "internal": "remove"
720 });
721
722 let result = transform.apply(&input).unwrap();
723 assert_eq!(result["data"], "keep");
724 assert!(result.get("secret").is_none());
725 assert!(result.get("internal").is_none());
726 }
727
728 #[test]
729 fn test_filter_fields() {
730 let transform = PayloadTransform::new().filter_fields(&["id", "name"]);
731
732 let input = json!({
733 "id": 123,
734 "name": "test",
735 "secret": "hidden",
736 "internal": "hidden"
737 });
738
739 let result = transform.apply(&input).unwrap();
740 assert_eq!(result["id"], 123);
741 assert_eq!(result["name"], "test");
742 assert!(result.get("secret").is_none());
743 assert!(result.get("internal").is_none());
744 }
745
746 #[test]
747 fn test_string_transform_uppercase() {
748 let transform = PayloadTransform::new().string_transform(
749 "$.action",
750 "action_upper",
751 StringTransformType::Uppercase,
752 );
753
754 let input = json!({ "action": "user.created" });
755 let result = transform.apply(&input).unwrap();
756 assert_eq!(result["action_upper"], "USER.CREATED");
757 }
758
759 #[test]
760 fn test_string_transform_split() {
761 let transform = PayloadTransform::new().string_transform(
762 "$.action",
763 "entity",
764 StringTransformType::Split {
765 delimiter: ".".to_string(),
766 index: 0,
767 },
768 );
769
770 let input = json!({ "action": "user.created" });
771 let result = transform.apply(&input).unwrap();
772 assert_eq!(result["entity"], "user");
773 }
774
775 #[test]
776 fn test_map_value() {
777 let mut mappings = HashMap::new();
778 mappings.insert("created".to_string(), json!("new"));
779 mappings.insert("updated".to_string(), json!("modified"));
780 mappings.insert("deleted".to_string(), json!("removed"));
781
782 let transform = PayloadTransform::new().map_value(
783 "$.action",
784 "status",
785 mappings,
786 Some(json!("unknown")),
787 );
788
789 let input = json!({ "action": "created" });
790 let result = transform.apply(&input).unwrap();
791 assert_eq!(result["status"], "new");
792
793 let input2 = json!({ "action": "other" });
794 let result2 = transform.apply(&input2).unwrap();
795 assert_eq!(result2["status"], "unknown");
796 }
797
798 #[test]
799 fn test_template() {
800 let transform = PayloadTransform::new()
801 .template("message", "User {{$.user.name}} performed {{$.action}}");
802
803 let input = json!({
804 "user": { "name": "Alice" },
805 "action": "login"
806 });
807
808 let result = transform.apply(&input).unwrap();
809 assert_eq!(result["message"], "User Alice performed login");
810 }
811
812 #[test]
813 fn test_wrap() {
814 let transform = PayloadTransform::new().wrap("payload");
815
816 let input = json!({ "data": "test" });
817 let result = transform.apply(&input).unwrap();
818 assert_eq!(result["payload"]["data"], "test");
819 }
820
821 #[test]
822 fn test_flatten() {
823 let transform = PayloadTransform::new().flatten("$.metadata", Some("meta"), "_");
824
825 let input = json!({
826 "id": 1,
827 "metadata": {
828 "created": "2026-01-01",
829 "version": "1.0"
830 }
831 });
832
833 let result = transform.apply(&input).unwrap();
834 assert_eq!(result["meta_created"], "2026-01-01");
835 assert_eq!(result["meta_version"], "1.0");
836 }
837
838 #[test]
839 fn test_complex_pipeline() {
840 let transform = PayloadTransform::empty()
841 .extract_field("$.repository.full_name", "repo")
842 .extract_field("$.sender.login", "actor")
843 .extract_field("$.action", "event")
844 .add_default("source", json!("github"))
845 .string_transform("$.action", "event_type", StringTransformType::Uppercase);
846
847 let input = json!({
848 "action": "opened",
849 "repository": {
850 "full_name": "org/repo"
851 },
852 "sender": {
853 "login": "user123"
854 }
855 });
856
857 let result = transform.apply(&input).unwrap();
858 assert_eq!(result["repo"], "org/repo");
859 assert_eq!(result["actor"], "user123");
860 assert_eq!(result["event"], "opened");
861 assert_eq!(result["source"], "github");
862 assert_eq!(result["event_type"], "OPENED");
863 }
864
865 #[test]
866 fn test_non_strict_mode() {
867 let transform = PayloadTransform::new()
868 .strict(false)
869 .extract_field("$.missing", "value") .add_default("added", json!("success"));
871
872 let input = json!({ "other": "data" });
873 let result = transform.apply(&input).unwrap(); assert_eq!(result["added"], "success");
875 assert_eq!(result["other"], "data");
876 }
877
878 #[test]
879 fn test_pipeline() {
880 let pipeline = TransformPipeline::new("github_webhook")
881 .with_description("Transform GitHub webhook payloads")
882 .add(
883 PayloadTransform::empty()
884 .extract_field("$.repository.name", "repo")
885 .extract_field("$.action", "event"),
886 )
887 .add(PayloadTransform::new().add_default("processed", json!(true)));
888
889 let input = json!({
890 "action": "push",
891 "repository": { "name": "test-repo" }
892 });
893
894 let result = pipeline.apply(&input).unwrap();
895 assert_eq!(result["repo"], "test-repo");
896 assert_eq!(result["event"], "push");
897 assert_eq!(result["processed"], true);
898 }
899
900 #[test]
901 fn test_array_index_access() {
902 let transform = PayloadTransform::empty().extract_field("$.items[0].name", "first_item");
903
904 let input = json!({
905 "items": [
906 { "name": "first" },
907 { "name": "second" }
908 ]
909 });
910
911 let result = transform.apply(&input).unwrap();
912 assert_eq!(result["first_item"], "first");
913 }
914}