1use std::collections::HashMap;
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub enum MessageFormat {
17 Json,
18 Avro,
19 Protobuf,
20 Csv,
21 Raw,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum TransformFn {
27 ToUpper,
29 ToLower,
31 Trim,
33 Prefix(String),
35 Suffix(String),
37 Replace { from: String, to: String },
39 Identity,
41}
42
43impl TransformFn {
44 pub fn apply(&self, value: &str) -> String {
46 match self {
47 TransformFn::ToUpper => value.to_uppercase(),
48 TransformFn::ToLower => value.to_lowercase(),
49 TransformFn::Trim => value.trim().to_string(),
50 TransformFn::Prefix(prefix) => format!("{}{}", prefix, value),
51 TransformFn::Suffix(suffix) => format!("{}{}", value, suffix),
52 TransformFn::Replace { from, to } => value.replace(from.as_str(), to.as_str()),
53 TransformFn::Identity => value.to_string(),
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
60pub struct FieldMapping {
61 pub source_field: String,
63 pub target_field: String,
65 pub transform: Option<TransformFn>,
67}
68
69impl FieldMapping {
70 pub fn new(source_field: impl Into<String>, target_field: impl Into<String>) -> Self {
72 Self {
73 source_field: source_field.into(),
74 target_field: target_field.into(),
75 transform: None,
76 }
77 }
78
79 pub fn with_transform(
81 source_field: impl Into<String>,
82 target_field: impl Into<String>,
83 transform: TransformFn,
84 ) -> Self {
85 Self {
86 source_field: source_field.into(),
87 target_field: target_field.into(),
88 transform: Some(transform),
89 }
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct TransformPipeline {
96 pub mappings: Vec<FieldMapping>,
98 pub source_format: MessageFormat,
100 pub target_format: MessageFormat,
102}
103
104impl TransformPipeline {
105 pub fn new(
107 source_format: MessageFormat,
108 target_format: MessageFormat,
109 mappings: Vec<FieldMapping>,
110 ) -> Self {
111 Self {
112 mappings,
113 source_format,
114 target_format,
115 }
116 }
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct MessagePayload {
122 pub fields: HashMap<String, String>,
124 pub format: MessageFormat,
126}
127
128impl MessagePayload {
129 pub fn new(fields: HashMap<String, String>, format: MessageFormat) -> Self {
131 Self { fields, format }
132 }
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum TransformError {
138 PipelineNotFound(String),
140 FieldNotFound(String),
142 TransformFailed(String),
144}
145
146impl std::fmt::Display for TransformError {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 match self {
149 TransformError::PipelineNotFound(name) => {
150 write!(f, "pipeline not found: {}", name)
151 }
152 TransformError::FieldNotFound(field) => {
153 write!(f, "source field not found: {}", field)
154 }
155 TransformError::TransformFailed(msg) => {
156 write!(f, "transform failed: {}", msg)
157 }
158 }
159 }
160}
161
162impl std::error::Error for TransformError {}
163
164#[derive(Debug, Default)]
191pub struct MessageTransformer {
192 pipelines: HashMap<String, TransformPipeline>,
193}
194
195impl MessageTransformer {
196 pub fn new() -> Self {
198 Self {
199 pipelines: HashMap::new(),
200 }
201 }
202
203 pub fn add_pipeline(&mut self, name: &str, pipeline: TransformPipeline) {
205 self.pipelines.insert(name.to_string(), pipeline);
206 }
207
208 pub fn transform(
217 &self,
218 pipeline_name: &str,
219 payload: MessagePayload,
220 ) -> Result<MessagePayload, TransformError> {
221 let pipeline = self
222 .pipelines
223 .get(pipeline_name)
224 .ok_or_else(|| TransformError::PipelineNotFound(pipeline_name.to_string()))?;
225
226 let mut output_fields: HashMap<String, String> = HashMap::new();
227
228 for mapping in &pipeline.mappings {
229 let value = payload
230 .fields
231 .get(&mapping.source_field)
232 .ok_or_else(|| TransformError::FieldNotFound(mapping.source_field.clone()))?;
233
234 let transformed = match &mapping.transform {
235 Some(tf) => tf.apply(value),
236 None => value.clone(),
237 };
238
239 output_fields.insert(mapping.target_field.clone(), transformed);
240 }
241
242 Ok(MessagePayload {
243 fields: output_fields,
244 format: pipeline.target_format.clone(),
245 })
246 }
247
248 pub fn chain_pipelines(
253 &self,
254 names: &[&str],
255 payload: MessagePayload,
256 ) -> Result<MessagePayload, TransformError> {
257 let mut current = payload;
258 for name in names {
259 current = self.transform(name, current)?;
260 }
261 Ok(current)
262 }
263
264 pub fn list_pipelines(&self) -> Vec<&str> {
266 self.pipelines.keys().map(|k| k.as_str()).collect()
267 }
268
269 pub fn pipeline_count(&self) -> usize {
271 self.pipelines.len()
272 }
273
274 pub fn has_pipeline(&self, name: &str) -> bool {
276 self.pipelines.contains_key(name)
277 }
278
279 pub fn remove_pipeline(&mut self, name: &str) -> bool {
281 self.pipelines.remove(name).is_some()
282 }
283}
284
285#[cfg(test)]
290mod tests {
291 use super::*;
292
293 fn make_payload(fields: &[(&str, &str)], format: MessageFormat) -> MessagePayload {
294 let map = fields
295 .iter()
296 .map(|(k, v)| (k.to_string(), v.to_string()))
297 .collect();
298 MessagePayload::new(map, format)
299 }
300
301 fn simple_transformer(
302 name: &str,
303 src_field: &str,
304 tgt_field: &str,
305 tf: TransformFn,
306 src_fmt: MessageFormat,
307 tgt_fmt: MessageFormat,
308 ) -> MessageTransformer {
309 let mut t = MessageTransformer::new();
310 let mapping = FieldMapping::with_transform(src_field, tgt_field, tf);
311 let pipeline = TransformPipeline::new(src_fmt, tgt_fmt, vec![mapping]);
312 t.add_pipeline(name, pipeline);
313 t
314 }
315
316 #[test]
319 fn test_to_upper_basic() {
320 let tf = TransformFn::ToUpper;
321 assert_eq!(tf.apply("hello"), "HELLO");
322 }
323
324 #[test]
325 fn test_to_upper_already_upper() {
326 assert_eq!(TransformFn::ToUpper.apply("WORLD"), "WORLD");
327 }
328
329 #[test]
330 fn test_to_upper_mixed() {
331 assert_eq!(TransformFn::ToUpper.apply("HeLLo WoRLd"), "HELLO WORLD");
332 }
333
334 #[test]
335 fn test_to_upper_empty() {
336 assert_eq!(TransformFn::ToUpper.apply(""), "");
337 }
338
339 #[test]
342 fn test_to_lower_basic() {
343 assert_eq!(TransformFn::ToLower.apply("HELLO"), "hello");
344 }
345
346 #[test]
347 fn test_to_lower_already_lower() {
348 assert_eq!(TransformFn::ToLower.apply("world"), "world");
349 }
350
351 #[test]
352 fn test_to_lower_mixed() {
353 assert_eq!(TransformFn::ToLower.apply("HeLLo"), "hello");
354 }
355
356 #[test]
359 fn test_trim_leading() {
360 assert_eq!(TransformFn::Trim.apply(" hello"), "hello");
361 }
362
363 #[test]
364 fn test_trim_trailing() {
365 assert_eq!(TransformFn::Trim.apply("hello "), "hello");
366 }
367
368 #[test]
369 fn test_trim_both() {
370 assert_eq!(TransformFn::Trim.apply(" hello "), "hello");
371 }
372
373 #[test]
374 fn test_trim_no_whitespace() {
375 assert_eq!(TransformFn::Trim.apply("hello"), "hello");
376 }
377
378 #[test]
379 fn test_trim_only_whitespace() {
380 assert_eq!(TransformFn::Trim.apply(" "), "");
381 }
382
383 #[test]
386 fn test_prefix_basic() {
387 assert_eq!(
388 TransformFn::Prefix("pre_".to_string()).apply("value"),
389 "pre_value"
390 );
391 }
392
393 #[test]
394 fn test_prefix_empty_value() {
395 assert_eq!(TransformFn::Prefix("pre_".to_string()).apply(""), "pre_");
396 }
397
398 #[test]
399 fn test_prefix_empty_prefix() {
400 assert_eq!(TransformFn::Prefix(String::new()).apply("value"), "value");
401 }
402
403 #[test]
406 fn test_suffix_basic() {
407 assert_eq!(
408 TransformFn::Suffix("_suf".to_string()).apply("value"),
409 "value_suf"
410 );
411 }
412
413 #[test]
414 fn test_suffix_empty_value() {
415 assert_eq!(TransformFn::Suffix("_end".to_string()).apply(""), "_end");
416 }
417
418 #[test]
419 fn test_suffix_empty_suffix() {
420 assert_eq!(TransformFn::Suffix(String::new()).apply("value"), "value");
421 }
422
423 #[test]
426 fn test_replace_basic() {
427 let tf = TransformFn::Replace {
428 from: "foo".to_string(),
429 to: "bar".to_string(),
430 };
431 assert_eq!(tf.apply("foo baz foo"), "bar baz bar");
432 }
433
434 #[test]
435 fn test_replace_no_match() {
436 let tf = TransformFn::Replace {
437 from: "x".to_string(),
438 to: "y".to_string(),
439 };
440 assert_eq!(tf.apply("hello"), "hello");
441 }
442
443 #[test]
444 fn test_replace_empty_from() {
445 let tf = TransformFn::Replace {
447 from: String::new(),
448 to: "-".to_string(),
449 };
450 let result = tf.apply("ab");
452 assert!(result.contains('-'));
453 }
454
455 #[test]
456 fn test_replace_to_empty() {
457 let tf = TransformFn::Replace {
458 from: "o".to_string(),
459 to: String::new(),
460 };
461 assert_eq!(tf.apply("foobar"), "fbar");
462 }
463
464 #[test]
467 fn test_identity_passthrough() {
468 assert_eq!(TransformFn::Identity.apply("unchanged"), "unchanged");
469 }
470
471 #[test]
472 fn test_identity_empty() {
473 assert_eq!(TransformFn::Identity.apply(""), "");
474 }
475
476 #[test]
479 fn test_add_and_has_pipeline() {
480 let mut t = MessageTransformer::new();
481 assert!(!t.has_pipeline("p1"));
482 let pipeline = TransformPipeline::new(MessageFormat::Json, MessageFormat::Csv, vec![]);
483 t.add_pipeline("p1", pipeline);
484 assert!(t.has_pipeline("p1"));
485 }
486
487 #[test]
488 fn test_list_pipelines_empty() {
489 let t = MessageTransformer::new();
490 assert!(t.list_pipelines().is_empty());
491 }
492
493 #[test]
494 fn test_list_pipelines_multiple() {
495 let mut t = MessageTransformer::new();
496 for name in ["a", "b", "c"] {
497 let p = TransformPipeline::new(MessageFormat::Raw, MessageFormat::Raw, vec![]);
498 t.add_pipeline(name, p);
499 }
500 let mut names = t.list_pipelines();
501 names.sort();
502 assert_eq!(names, vec!["a", "b", "c"]);
503 }
504
505 #[test]
506 fn test_pipeline_count() {
507 let mut t = MessageTransformer::new();
508 assert_eq!(t.pipeline_count(), 0);
509 t.add_pipeline(
510 "x",
511 TransformPipeline::new(MessageFormat::Json, MessageFormat::Json, vec![]),
512 );
513 assert_eq!(t.pipeline_count(), 1);
514 }
515
516 #[test]
517 fn test_remove_pipeline() {
518 let mut t = MessageTransformer::new();
519 t.add_pipeline(
520 "rm",
521 TransformPipeline::new(MessageFormat::Json, MessageFormat::Json, vec![]),
522 );
523 assert!(t.remove_pipeline("rm"));
524 assert!(!t.has_pipeline("rm"));
525 assert!(!t.remove_pipeline("rm")); }
527
528 #[test]
531 fn test_transform_basic_field_rename() {
532 let mut t = MessageTransformer::new();
533 let mapping = FieldMapping::new("src", "dst");
534 let pipeline =
535 TransformPipeline::new(MessageFormat::Json, MessageFormat::Avro, vec![mapping]);
536 t.add_pipeline("rename", pipeline);
537
538 let payload = make_payload(&[("src", "value")], MessageFormat::Json);
539 let result = t.transform("rename", payload).expect("should succeed");
540 assert_eq!(result.fields.get("dst"), Some(&"value".to_string()));
541 assert!(!result.fields.contains_key("src"));
542 assert_eq!(result.format, MessageFormat::Avro);
543 }
544
545 #[test]
546 fn test_transform_to_upper() {
547 let t = simple_transformer(
548 "p",
549 "name",
550 "NAME",
551 TransformFn::ToUpper,
552 MessageFormat::Json,
553 MessageFormat::Json,
554 );
555 let payload = make_payload(&[("name", "alice")], MessageFormat::Json);
556 let result = t.transform("p", payload).expect("ok");
557 assert_eq!(result.fields["NAME"], "ALICE");
558 }
559
560 #[test]
561 fn test_transform_prefix() {
562 let t = simple_transformer(
563 "p",
564 "id",
565 "id",
566 TransformFn::Prefix("usr_".to_string()),
567 MessageFormat::Raw,
568 MessageFormat::Raw,
569 );
570 let payload = make_payload(&[("id", "42")], MessageFormat::Raw);
571 let result = t.transform("p", payload).expect("ok");
572 assert_eq!(result.fields["id"], "usr_42");
573 }
574
575 #[test]
576 fn test_transform_suffix() {
577 let t = simple_transformer(
578 "p",
579 "tag",
580 "tag",
581 TransformFn::Suffix("_v2".to_string()),
582 MessageFormat::Csv,
583 MessageFormat::Csv,
584 );
585 let payload = make_payload(&[("tag", "sensor")], MessageFormat::Csv);
586 let result = t.transform("p", payload).expect("ok");
587 assert_eq!(result.fields["tag"], "sensor_v2");
588 }
589
590 #[test]
591 fn test_transform_replace() {
592 let t = simple_transformer(
593 "p",
594 "path",
595 "path",
596 TransformFn::Replace {
597 from: "/".to_string(),
598 to: ".".to_string(),
599 },
600 MessageFormat::Json,
601 MessageFormat::Json,
602 );
603 let payload = make_payload(&[("path", "a/b/c")], MessageFormat::Json);
604 let result = t.transform("p", payload).expect("ok");
605 assert_eq!(result.fields["path"], "a.b.c");
606 }
607
608 #[test]
609 fn test_transform_pipeline_not_found() {
610 let t = MessageTransformer::new();
611 let payload = make_payload(&[], MessageFormat::Json);
612 let err = t.transform("missing", payload).unwrap_err();
613 assert_eq!(err, TransformError::PipelineNotFound("missing".to_string()));
614 }
615
616 #[test]
617 fn test_transform_field_not_found() {
618 let t = simple_transformer(
619 "p",
620 "nonexistent",
621 "out",
622 TransformFn::Identity,
623 MessageFormat::Json,
624 MessageFormat::Json,
625 );
626 let payload = make_payload(&[("other", "val")], MessageFormat::Json);
627 let err = t.transform("p", payload).unwrap_err();
628 assert_eq!(
629 err,
630 TransformError::FieldNotFound("nonexistent".to_string())
631 );
632 }
633
634 #[test]
635 fn test_transform_empty_pipeline_empty_output() {
636 let mut t = MessageTransformer::new();
637 let pipeline = TransformPipeline::new(MessageFormat::Json, MessageFormat::Avro, vec![]);
638 t.add_pipeline("empty", pipeline);
639
640 let payload = make_payload(&[("a", "1"), ("b", "2")], MessageFormat::Json);
641 let result = t.transform("empty", payload).expect("ok");
642 assert!(result.fields.is_empty());
643 assert_eq!(result.format, MessageFormat::Avro);
644 }
645
646 #[test]
647 fn test_transform_multiple_mappings() {
648 let mut t = MessageTransformer::new();
649 let mappings = vec![
650 FieldMapping::with_transform("first", "FIRST", TransformFn::ToUpper),
651 FieldMapping::with_transform("last", "LAST", TransformFn::ToLower),
652 FieldMapping::new("email", "email_address"),
653 ];
654 let pipeline = TransformPipeline::new(MessageFormat::Json, MessageFormat::Json, mappings);
655 t.add_pipeline("multi", pipeline);
656
657 let payload = make_payload(
658 &[("first", "Alice"), ("last", "SMITH"), ("email", "a@b.com")],
659 MessageFormat::Json,
660 );
661 let result = t.transform("multi", payload).expect("ok");
662 assert_eq!(result.fields["FIRST"], "ALICE");
663 assert_eq!(result.fields["LAST"], "smith");
664 assert_eq!(result.fields["email_address"], "a@b.com");
665 }
666
667 #[test]
670 fn test_chain_single_pipeline() {
671 let t = simple_transformer(
672 "p",
673 "v",
674 "v",
675 TransformFn::Trim,
676 MessageFormat::Raw,
677 MessageFormat::Raw,
678 );
679 let payload = make_payload(&[("v", " hello ")], MessageFormat::Raw);
680 let result = t.chain_pipelines(&["p"], payload).expect("ok");
681 assert_eq!(result.fields["v"], "hello");
682 }
683
684 #[test]
685 fn test_chain_two_pipelines() {
686 let mut t = MessageTransformer::new();
687
688 let p1 = TransformPipeline::new(
690 MessageFormat::Json,
691 MessageFormat::Json,
692 vec![FieldMapping::with_transform(
693 "src",
694 "mid",
695 TransformFn::ToUpper,
696 )],
697 );
698 let p2 = TransformPipeline::new(
700 MessageFormat::Json,
701 MessageFormat::Avro,
702 vec![FieldMapping::with_transform(
703 "mid",
704 "dst",
705 TransformFn::Prefix(">>".to_string()),
706 )],
707 );
708 t.add_pipeline("p1", p1);
709 t.add_pipeline("p2", p2);
710
711 let payload = make_payload(&[("src", "hello")], MessageFormat::Json);
712 let result = t.chain_pipelines(&["p1", "p2"], payload).expect("ok");
713 assert_eq!(result.fields["dst"], ">>HELLO");
714 assert_eq!(result.format, MessageFormat::Avro);
715 }
716
717 #[test]
718 fn test_chain_three_pipelines() {
719 let mut t = MessageTransformer::new();
720
721 let p1 = TransformPipeline::new(
722 MessageFormat::Raw,
723 MessageFormat::Raw,
724 vec![FieldMapping::with_transform("a", "b", TransformFn::Trim)],
725 );
726 let p2 = TransformPipeline::new(
727 MessageFormat::Raw,
728 MessageFormat::Raw,
729 vec![FieldMapping::with_transform("b", "c", TransformFn::ToUpper)],
730 );
731 let p3 = TransformPipeline::new(
732 MessageFormat::Raw,
733 MessageFormat::Csv,
734 vec![FieldMapping::with_transform(
735 "c",
736 "d",
737 TransformFn::Suffix("!".to_string()),
738 )],
739 );
740 t.add_pipeline("p1", p1);
741 t.add_pipeline("p2", p2);
742 t.add_pipeline("p3", p3);
743
744 let payload = make_payload(&[("a", " hi ")], MessageFormat::Raw);
745 let result = t.chain_pipelines(&["p1", "p2", "p3"], payload).expect("ok");
746 assert_eq!(result.fields["d"], "HI!");
747 assert_eq!(result.format, MessageFormat::Csv);
748 }
749
750 #[test]
751 fn test_chain_empty_names() {
752 let t = MessageTransformer::new();
753 let payload = make_payload(&[("x", "y")], MessageFormat::Json);
754 let result = t.chain_pipelines(&[], payload.clone()).expect("ok");
755 assert_eq!(result, payload);
756 }
757
758 #[test]
759 fn test_chain_missing_pipeline_error() {
760 let t = MessageTransformer::new();
761 let payload = make_payload(&[("x", "y")], MessageFormat::Json);
762 let err = t.chain_pipelines(&["missing"], payload).unwrap_err();
763 assert!(matches!(err, TransformError::PipelineNotFound(_)));
764 }
765
766 #[test]
769 fn test_error_display_pipeline_not_found() {
770 let e = TransformError::PipelineNotFound("p".to_string());
771 assert!(e.to_string().contains("pipeline not found"));
772 assert!(e.to_string().contains("p"));
773 }
774
775 #[test]
776 fn test_error_display_field_not_found() {
777 let e = TransformError::FieldNotFound("f".to_string());
778 assert!(e.to_string().contains("source field not found"));
779 assert!(e.to_string().contains("f"));
780 }
781
782 #[test]
783 fn test_error_display_transform_failed() {
784 let e = TransformError::TransformFailed("boom".to_string());
785 assert!(e.to_string().contains("transform failed"));
786 assert!(e.to_string().contains("boom"));
787 }
788
789 #[test]
792 fn test_message_format_equality() {
793 assert_eq!(MessageFormat::Json, MessageFormat::Json);
794 assert_ne!(MessageFormat::Json, MessageFormat::Avro);
795 }
796
797 #[test]
798 fn test_all_message_formats_exist() {
799 let formats = [
800 MessageFormat::Json,
801 MessageFormat::Avro,
802 MessageFormat::Protobuf,
803 MessageFormat::Csv,
804 MessageFormat::Raw,
805 ];
806 assert_eq!(formats.len(), 5);
807 }
808
809 #[test]
812 fn test_field_mapping_new_no_transform() {
813 let m = FieldMapping::new("src", "dst");
814 assert_eq!(m.source_field, "src");
815 assert_eq!(m.target_field, "dst");
816 assert!(m.transform.is_none());
817 }
818
819 #[test]
820 fn test_field_mapping_with_transform() {
821 let m = FieldMapping::with_transform("s", "t", TransformFn::ToUpper);
822 assert!(m.transform.is_some());
823 }
824
825 #[test]
828 fn test_add_pipeline_overwrites() {
829 let mut t = MessageTransformer::new();
830 let p1 = TransformPipeline::new(
831 MessageFormat::Json,
832 MessageFormat::Json,
833 vec![FieldMapping::new("a", "b")],
834 );
835 let p2 = TransformPipeline::new(
836 MessageFormat::Json,
837 MessageFormat::Avro,
838 vec![FieldMapping::new("a", "c")],
839 );
840 t.add_pipeline("p", p1);
841 t.add_pipeline("p", p2);
842 let payload = make_payload(&[("a", "v")], MessageFormat::Json);
844 let result = t.transform("p", payload).expect("ok");
845 assert!(result.fields.contains_key("c"));
846 assert!(!result.fields.contains_key("b"));
847 }
848
849 #[test]
852 fn test_chain_trim_then_lower() {
853 let mut t = MessageTransformer::new();
854 let p1 = TransformPipeline::new(
855 MessageFormat::Raw,
856 MessageFormat::Raw,
857 vec![FieldMapping::with_transform("x", "x", TransformFn::Trim)],
858 );
859 let p2 = TransformPipeline::new(
860 MessageFormat::Raw,
861 MessageFormat::Raw,
862 vec![FieldMapping::with_transform("x", "x", TransformFn::ToLower)],
863 );
864 t.add_pipeline("trim", p1);
865 t.add_pipeline("lower", p2);
866
867 let payload = make_payload(&[("x", " HELLO ")], MessageFormat::Raw);
868 let result = t.chain_pipelines(&["trim", "lower"], payload).expect("ok");
869 assert_eq!(result.fields["x"], "hello");
870 }
871
872 #[test]
875 fn test_identity_in_pipeline() {
876 let mut t = MessageTransformer::new();
877 let p = TransformPipeline::new(
878 MessageFormat::Json,
879 MessageFormat::Json,
880 vec![FieldMapping::with_transform(
881 "k",
882 "k",
883 TransformFn::Identity,
884 )],
885 );
886 t.add_pipeline("id", p);
887 let payload = make_payload(&[("k", "unchanged_value_123")], MessageFormat::Json);
888 let result = t.transform("id", payload).expect("ok");
889 assert_eq!(result.fields["k"], "unchanged_value_123");
890 }
891}