Skip to main content

oxirs_stream/
message_transformer.rs

1//! # Message Transformer
2//!
3//! Message format transformation pipeline for schema evolution and format conversion.
4//!
5//! Provides a flexible pipeline system to map fields between different message formats
6//! (JSON, Avro, Protobuf, CSV, Raw) while applying per-field string transformations.
7
8use std::collections::HashMap;
9
10// ────────────────────────────────────────────────────────────────────────────
11// Public types
12// ────────────────────────────────────────────────────────────────────────────
13
14/// Supported message serialisation formats.
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub enum MessageFormat {
17    Json,
18    Avro,
19    Protobuf,
20    Csv,
21    Raw,
22}
23
24/// Elementary string transformation applied to a single field value.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum TransformFn {
27    /// Convert value to uppercase.
28    ToUpper,
29    /// Convert value to lowercase.
30    ToLower,
31    /// Trim leading and trailing whitespace.
32    Trim,
33    /// Prepend a fixed string.
34    Prefix(String),
35    /// Append a fixed string.
36    Suffix(String),
37    /// Replace all occurrences of `from` with `to`.
38    Replace { from: String, to: String },
39    /// Pass value through unchanged.
40    Identity,
41}
42
43impl TransformFn {
44    /// Apply this transformation to `value` and return the result.
45    pub fn apply(&self, value: &str) -> String {
46        match self {
47            TransformFn::ToUpper => value.to_uppercase(),
48            TransformFn::ToLower => value.to_lowercase(),
49            TransformFn::Trim => value.trim().to_string(),
50            TransformFn::Prefix(prefix) => format!("{}{}", prefix, value),
51            TransformFn::Suffix(suffix) => format!("{}{}", value, suffix),
52            TransformFn::Replace { from, to } => value.replace(from.as_str(), to.as_str()),
53            TransformFn::Identity => value.to_string(),
54        }
55    }
56}
57
58/// Mapping from one field to another with an optional transformation.
59#[derive(Debug, Clone)]
60pub struct FieldMapping {
61    /// Name of the field in the source payload.
62    pub source_field: String,
63    /// Name of the field in the target payload.
64    pub target_field: String,
65    /// Optional transformation applied to the value.
66    pub transform: Option<TransformFn>,
67}
68
69impl FieldMapping {
70    /// Create a new field mapping without a transformation.
71    pub fn new(source_field: impl Into<String>, target_field: impl Into<String>) -> Self {
72        Self {
73            source_field: source_field.into(),
74            target_field: target_field.into(),
75            transform: None,
76        }
77    }
78
79    /// Create a new field mapping with a transformation.
80    pub fn with_transform(
81        source_field: impl Into<String>,
82        target_field: impl Into<String>,
83        transform: TransformFn,
84    ) -> Self {
85        Self {
86            source_field: source_field.into(),
87            target_field: target_field.into(),
88            transform: Some(transform),
89        }
90    }
91}
92
93/// A named transformation pipeline: ordered field mappings plus format metadata.
94#[derive(Debug, Clone)]
95pub struct TransformPipeline {
96    /// Ordered list of field mappings to apply.
97    pub mappings: Vec<FieldMapping>,
98    /// Expected format of incoming payloads.
99    pub source_format: MessageFormat,
100    /// Format of outgoing payloads.
101    pub target_format: MessageFormat,
102}
103
104impl TransformPipeline {
105    /// Create a new pipeline.
106    pub fn new(
107        source_format: MessageFormat,
108        target_format: MessageFormat,
109        mappings: Vec<FieldMapping>,
110    ) -> Self {
111        Self {
112            mappings,
113            source_format,
114            target_format,
115        }
116    }
117}
118
119/// A keyed collection of fields with an associated format tag.
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct MessagePayload {
122    /// Field name → value pairs.
123    pub fields: HashMap<String, String>,
124    /// Format tag for this payload.
125    pub format: MessageFormat,
126}
127
128impl MessagePayload {
129    /// Create a new payload with the given fields and format.
130    pub fn new(fields: HashMap<String, String>, format: MessageFormat) -> Self {
131        Self { fields, format }
132    }
133}
134
135/// Errors that can occur during transformation.
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum TransformError {
138    /// No pipeline registered under the given name.
139    PipelineNotFound(String),
140    /// A required source field was absent from the payload.
141    FieldNotFound(String),
142    /// The transformation step itself produced an error (reserved for future use).
143    TransformFailed(String),
144}
145
146impl std::fmt::Display for TransformError {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        match self {
149            TransformError::PipelineNotFound(name) => {
150                write!(f, "pipeline not found: {}", name)
151            }
152            TransformError::FieldNotFound(field) => {
153                write!(f, "source field not found: {}", field)
154            }
155            TransformError::TransformFailed(msg) => {
156                write!(f, "transform failed: {}", msg)
157            }
158        }
159    }
160}
161
162impl std::error::Error for TransformError {}
163
164// ────────────────────────────────────────────────────────────────────────────
165// MessageTransformer
166// ────────────────────────────────────────────────────────────────────────────
167
168/// Registry of named transformation pipelines.
169///
170/// # Example
171/// ```rust
172/// use oxirs_stream::message_transformer::*;
173/// use std::collections::HashMap;
174///
175/// let mut transformer = MessageTransformer::new();
176/// let pipeline = TransformPipeline::new(
177///     MessageFormat::Json,
178///     MessageFormat::Avro,
179///     vec![FieldMapping::with_transform("name", "NAME", TransformFn::ToUpper)],
180/// );
181/// transformer.add_pipeline("upper", pipeline);
182///
183/// let mut fields = HashMap::new();
184/// fields.insert("name".to_string(), "alice".to_string());
185/// let payload = MessagePayload::new(fields, MessageFormat::Json);
186/// let result = transformer.transform("upper", payload)?;
187/// assert_eq!(result.fields["NAME"], "ALICE");
188/// # Ok::<(), TransformError>(())
189/// ```
190#[derive(Debug, Default)]
191pub struct MessageTransformer {
192    pipelines: HashMap<String, TransformPipeline>,
193}
194
195impl MessageTransformer {
196    /// Create an empty transformer.
197    pub fn new() -> Self {
198        Self {
199            pipelines: HashMap::new(),
200        }
201    }
202
203    /// Register a named pipeline. Overwrites any existing pipeline with the same name.
204    pub fn add_pipeline(&mut self, name: &str, pipeline: TransformPipeline) {
205        self.pipelines.insert(name.to_string(), pipeline);
206    }
207
208    /// Apply the named pipeline to `payload`.
209    ///
210    /// Each [`FieldMapping`] in the pipeline is applied in order:
211    /// 1. The value of `source_field` is read from the payload.
212    /// 2. The optional [`TransformFn`] is applied.
213    /// 3. The result is stored under `target_field` in the output payload.
214    ///
215    /// Fields present in the payload but not mentioned by any mapping are dropped.
216    pub fn transform(
217        &self,
218        pipeline_name: &str,
219        payload: MessagePayload,
220    ) -> Result<MessagePayload, TransformError> {
221        let pipeline = self
222            .pipelines
223            .get(pipeline_name)
224            .ok_or_else(|| TransformError::PipelineNotFound(pipeline_name.to_string()))?;
225
226        let mut output_fields: HashMap<String, String> = HashMap::new();
227
228        for mapping in &pipeline.mappings {
229            let value = payload
230                .fields
231                .get(&mapping.source_field)
232                .ok_or_else(|| TransformError::FieldNotFound(mapping.source_field.clone()))?;
233
234            let transformed = match &mapping.transform {
235                Some(tf) => tf.apply(value),
236                None => value.clone(),
237            };
238
239            output_fields.insert(mapping.target_field.clone(), transformed);
240        }
241
242        Ok(MessagePayload {
243            fields: output_fields,
244            format: pipeline.target_format.clone(),
245        })
246    }
247
248    /// Apply a sequence of pipelines in order.
249    ///
250    /// The output of each pipeline is fed as the input to the next. The
251    /// format of the intermediate payloads is updated by each pipeline.
252    pub fn chain_pipelines(
253        &self,
254        names: &[&str],
255        payload: MessagePayload,
256    ) -> Result<MessagePayload, TransformError> {
257        let mut current = payload;
258        for name in names {
259            current = self.transform(name, current)?;
260        }
261        Ok(current)
262    }
263
264    /// Return the names of all registered pipelines (order unspecified).
265    pub fn list_pipelines(&self) -> Vec<&str> {
266        self.pipelines.keys().map(|k| k.as_str()).collect()
267    }
268
269    /// Return the number of registered pipelines.
270    pub fn pipeline_count(&self) -> usize {
271        self.pipelines.len()
272    }
273
274    /// Check whether a pipeline with the given name exists.
275    pub fn has_pipeline(&self, name: &str) -> bool {
276        self.pipelines.contains_key(name)
277    }
278
279    /// Remove a pipeline by name. Returns `true` if it was present.
280    pub fn remove_pipeline(&mut self, name: &str) -> bool {
281        self.pipelines.remove(name).is_some()
282    }
283}
284
285// ────────────────────────────────────────────────────────────────────────────
286// Tests
287// ────────────────────────────────────────────────────────────────────────────
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    fn make_payload(fields: &[(&str, &str)], format: MessageFormat) -> MessagePayload {
294        let map = fields
295            .iter()
296            .map(|(k, v)| (k.to_string(), v.to_string()))
297            .collect();
298        MessagePayload::new(map, format)
299    }
300
301    fn simple_transformer(
302        name: &str,
303        src_field: &str,
304        tgt_field: &str,
305        tf: TransformFn,
306        src_fmt: MessageFormat,
307        tgt_fmt: MessageFormat,
308    ) -> MessageTransformer {
309        let mut t = MessageTransformer::new();
310        let mapping = FieldMapping::with_transform(src_field, tgt_field, tf);
311        let pipeline = TransformPipeline::new(src_fmt, tgt_fmt, vec![mapping]);
312        t.add_pipeline(name, pipeline);
313        t
314    }
315
316    // ── TransformFn::ToUpper ──────────────────────────────────────────────
317
318    #[test]
319    fn test_to_upper_basic() {
320        let tf = TransformFn::ToUpper;
321        assert_eq!(tf.apply("hello"), "HELLO");
322    }
323
324    #[test]
325    fn test_to_upper_already_upper() {
326        assert_eq!(TransformFn::ToUpper.apply("WORLD"), "WORLD");
327    }
328
329    #[test]
330    fn test_to_upper_mixed() {
331        assert_eq!(TransformFn::ToUpper.apply("HeLLo WoRLd"), "HELLO WORLD");
332    }
333
334    #[test]
335    fn test_to_upper_empty() {
336        assert_eq!(TransformFn::ToUpper.apply(""), "");
337    }
338
339    // ── TransformFn::ToLower ──────────────────────────────────────────────
340
341    #[test]
342    fn test_to_lower_basic() {
343        assert_eq!(TransformFn::ToLower.apply("HELLO"), "hello");
344    }
345
346    #[test]
347    fn test_to_lower_already_lower() {
348        assert_eq!(TransformFn::ToLower.apply("world"), "world");
349    }
350
351    #[test]
352    fn test_to_lower_mixed() {
353        assert_eq!(TransformFn::ToLower.apply("HeLLo"), "hello");
354    }
355
356    // ── TransformFn::Trim ────────────────────────────────────────────────
357
358    #[test]
359    fn test_trim_leading() {
360        assert_eq!(TransformFn::Trim.apply("  hello"), "hello");
361    }
362
363    #[test]
364    fn test_trim_trailing() {
365        assert_eq!(TransformFn::Trim.apply("hello  "), "hello");
366    }
367
368    #[test]
369    fn test_trim_both() {
370        assert_eq!(TransformFn::Trim.apply("  hello  "), "hello");
371    }
372
373    #[test]
374    fn test_trim_no_whitespace() {
375        assert_eq!(TransformFn::Trim.apply("hello"), "hello");
376    }
377
378    #[test]
379    fn test_trim_only_whitespace() {
380        assert_eq!(TransformFn::Trim.apply("   "), "");
381    }
382
383    // ── TransformFn::Prefix ───────────────────────────────────────────────
384
385    #[test]
386    fn test_prefix_basic() {
387        assert_eq!(
388            TransformFn::Prefix("pre_".to_string()).apply("value"),
389            "pre_value"
390        );
391    }
392
393    #[test]
394    fn test_prefix_empty_value() {
395        assert_eq!(TransformFn::Prefix("pre_".to_string()).apply(""), "pre_");
396    }
397
398    #[test]
399    fn test_prefix_empty_prefix() {
400        assert_eq!(TransformFn::Prefix(String::new()).apply("value"), "value");
401    }
402
403    // ── TransformFn::Suffix ───────────────────────────────────────────────
404
405    #[test]
406    fn test_suffix_basic() {
407        assert_eq!(
408            TransformFn::Suffix("_suf".to_string()).apply("value"),
409            "value_suf"
410        );
411    }
412
413    #[test]
414    fn test_suffix_empty_value() {
415        assert_eq!(TransformFn::Suffix("_end".to_string()).apply(""), "_end");
416    }
417
418    #[test]
419    fn test_suffix_empty_suffix() {
420        assert_eq!(TransformFn::Suffix(String::new()).apply("value"), "value");
421    }
422
423    // ── TransformFn::Replace ──────────────────────────────────────────────
424
425    #[test]
426    fn test_replace_basic() {
427        let tf = TransformFn::Replace {
428            from: "foo".to_string(),
429            to: "bar".to_string(),
430        };
431        assert_eq!(tf.apply("foo baz foo"), "bar baz bar");
432    }
433
434    #[test]
435    fn test_replace_no_match() {
436        let tf = TransformFn::Replace {
437            from: "x".to_string(),
438            to: "y".to_string(),
439        };
440        assert_eq!(tf.apply("hello"), "hello");
441    }
442
443    #[test]
444    fn test_replace_empty_from() {
445        // Replacing empty string inserts `to` between every character.
446        let tf = TransformFn::Replace {
447            from: String::new(),
448            to: "-".to_string(),
449        };
450        // Standard Rust behaviour: replaces at every position.
451        let result = tf.apply("ab");
452        assert!(result.contains('-'));
453    }
454
455    #[test]
456    fn test_replace_to_empty() {
457        let tf = TransformFn::Replace {
458            from: "o".to_string(),
459            to: String::new(),
460        };
461        assert_eq!(tf.apply("foobar"), "fbar");
462    }
463
464    // ── TransformFn::Identity ─────────────────────────────────────────────
465
466    #[test]
467    fn test_identity_passthrough() {
468        assert_eq!(TransformFn::Identity.apply("unchanged"), "unchanged");
469    }
470
471    #[test]
472    fn test_identity_empty() {
473        assert_eq!(TransformFn::Identity.apply(""), "");
474    }
475
476    // ── MessageTransformer::add_pipeline / has_pipeline / list_pipelines ──
477
478    #[test]
479    fn test_add_and_has_pipeline() {
480        let mut t = MessageTransformer::new();
481        assert!(!t.has_pipeline("p1"));
482        let pipeline = TransformPipeline::new(MessageFormat::Json, MessageFormat::Csv, vec![]);
483        t.add_pipeline("p1", pipeline);
484        assert!(t.has_pipeline("p1"));
485    }
486
487    #[test]
488    fn test_list_pipelines_empty() {
489        let t = MessageTransformer::new();
490        assert!(t.list_pipelines().is_empty());
491    }
492
493    #[test]
494    fn test_list_pipelines_multiple() {
495        let mut t = MessageTransformer::new();
496        for name in ["a", "b", "c"] {
497            let p = TransformPipeline::new(MessageFormat::Raw, MessageFormat::Raw, vec![]);
498            t.add_pipeline(name, p);
499        }
500        let mut names = t.list_pipelines();
501        names.sort();
502        assert_eq!(names, vec!["a", "b", "c"]);
503    }
504
505    #[test]
506    fn test_pipeline_count() {
507        let mut t = MessageTransformer::new();
508        assert_eq!(t.pipeline_count(), 0);
509        t.add_pipeline(
510            "x",
511            TransformPipeline::new(MessageFormat::Json, MessageFormat::Json, vec![]),
512        );
513        assert_eq!(t.pipeline_count(), 1);
514    }
515
516    #[test]
517    fn test_remove_pipeline() {
518        let mut t = MessageTransformer::new();
519        t.add_pipeline(
520            "rm",
521            TransformPipeline::new(MessageFormat::Json, MessageFormat::Json, vec![]),
522        );
523        assert!(t.remove_pipeline("rm"));
524        assert!(!t.has_pipeline("rm"));
525        assert!(!t.remove_pipeline("rm")); // second remove returns false
526    }
527
528    // ── MessageTransformer::transform ────────────────────────────────────
529
530    #[test]
531    fn test_transform_basic_field_rename() {
532        let mut t = MessageTransformer::new();
533        let mapping = FieldMapping::new("src", "dst");
534        let pipeline =
535            TransformPipeline::new(MessageFormat::Json, MessageFormat::Avro, vec![mapping]);
536        t.add_pipeline("rename", pipeline);
537
538        let payload = make_payload(&[("src", "value")], MessageFormat::Json);
539        let result = t.transform("rename", payload).expect("should succeed");
540        assert_eq!(result.fields.get("dst"), Some(&"value".to_string()));
541        assert!(!result.fields.contains_key("src"));
542        assert_eq!(result.format, MessageFormat::Avro);
543    }
544
545    #[test]
546    fn test_transform_to_upper() {
547        let t = simple_transformer(
548            "p",
549            "name",
550            "NAME",
551            TransformFn::ToUpper,
552            MessageFormat::Json,
553            MessageFormat::Json,
554        );
555        let payload = make_payload(&[("name", "alice")], MessageFormat::Json);
556        let result = t.transform("p", payload).expect("ok");
557        assert_eq!(result.fields["NAME"], "ALICE");
558    }
559
560    #[test]
561    fn test_transform_prefix() {
562        let t = simple_transformer(
563            "p",
564            "id",
565            "id",
566            TransformFn::Prefix("usr_".to_string()),
567            MessageFormat::Raw,
568            MessageFormat::Raw,
569        );
570        let payload = make_payload(&[("id", "42")], MessageFormat::Raw);
571        let result = t.transform("p", payload).expect("ok");
572        assert_eq!(result.fields["id"], "usr_42");
573    }
574
575    #[test]
576    fn test_transform_suffix() {
577        let t = simple_transformer(
578            "p",
579            "tag",
580            "tag",
581            TransformFn::Suffix("_v2".to_string()),
582            MessageFormat::Csv,
583            MessageFormat::Csv,
584        );
585        let payload = make_payload(&[("tag", "sensor")], MessageFormat::Csv);
586        let result = t.transform("p", payload).expect("ok");
587        assert_eq!(result.fields["tag"], "sensor_v2");
588    }
589
590    #[test]
591    fn test_transform_replace() {
592        let t = simple_transformer(
593            "p",
594            "path",
595            "path",
596            TransformFn::Replace {
597                from: "/".to_string(),
598                to: ".".to_string(),
599            },
600            MessageFormat::Json,
601            MessageFormat::Json,
602        );
603        let payload = make_payload(&[("path", "a/b/c")], MessageFormat::Json);
604        let result = t.transform("p", payload).expect("ok");
605        assert_eq!(result.fields["path"], "a.b.c");
606    }
607
608    #[test]
609    fn test_transform_pipeline_not_found() {
610        let t = MessageTransformer::new();
611        let payload = make_payload(&[], MessageFormat::Json);
612        let err = t.transform("missing", payload).unwrap_err();
613        assert_eq!(err, TransformError::PipelineNotFound("missing".to_string()));
614    }
615
616    #[test]
617    fn test_transform_field_not_found() {
618        let t = simple_transformer(
619            "p",
620            "nonexistent",
621            "out",
622            TransformFn::Identity,
623            MessageFormat::Json,
624            MessageFormat::Json,
625        );
626        let payload = make_payload(&[("other", "val")], MessageFormat::Json);
627        let err = t.transform("p", payload).unwrap_err();
628        assert_eq!(
629            err,
630            TransformError::FieldNotFound("nonexistent".to_string())
631        );
632    }
633
634    #[test]
635    fn test_transform_empty_pipeline_empty_output() {
636        let mut t = MessageTransformer::new();
637        let pipeline = TransformPipeline::new(MessageFormat::Json, MessageFormat::Avro, vec![]);
638        t.add_pipeline("empty", pipeline);
639
640        let payload = make_payload(&[("a", "1"), ("b", "2")], MessageFormat::Json);
641        let result = t.transform("empty", payload).expect("ok");
642        assert!(result.fields.is_empty());
643        assert_eq!(result.format, MessageFormat::Avro);
644    }
645
646    #[test]
647    fn test_transform_multiple_mappings() {
648        let mut t = MessageTransformer::new();
649        let mappings = vec![
650            FieldMapping::with_transform("first", "FIRST", TransformFn::ToUpper),
651            FieldMapping::with_transform("last", "LAST", TransformFn::ToLower),
652            FieldMapping::new("email", "email_address"),
653        ];
654        let pipeline = TransformPipeline::new(MessageFormat::Json, MessageFormat::Json, mappings);
655        t.add_pipeline("multi", pipeline);
656
657        let payload = make_payload(
658            &[("first", "Alice"), ("last", "SMITH"), ("email", "a@b.com")],
659            MessageFormat::Json,
660        );
661        let result = t.transform("multi", payload).expect("ok");
662        assert_eq!(result.fields["FIRST"], "ALICE");
663        assert_eq!(result.fields["LAST"], "smith");
664        assert_eq!(result.fields["email_address"], "a@b.com");
665    }
666
667    // ── chain_pipelines ───────────────────────────────────────────────────
668
669    #[test]
670    fn test_chain_single_pipeline() {
671        let t = simple_transformer(
672            "p",
673            "v",
674            "v",
675            TransformFn::Trim,
676            MessageFormat::Raw,
677            MessageFormat::Raw,
678        );
679        let payload = make_payload(&[("v", "  hello  ")], MessageFormat::Raw);
680        let result = t.chain_pipelines(&["p"], payload).expect("ok");
681        assert_eq!(result.fields["v"], "hello");
682    }
683
684    #[test]
685    fn test_chain_two_pipelines() {
686        let mut t = MessageTransformer::new();
687
688        // Pipeline 1: rename src→mid and uppercase
689        let p1 = TransformPipeline::new(
690            MessageFormat::Json,
691            MessageFormat::Json,
692            vec![FieldMapping::with_transform(
693                "src",
694                "mid",
695                TransformFn::ToUpper,
696            )],
697        );
698        // Pipeline 2: rename mid→dst and add prefix
699        let p2 = TransformPipeline::new(
700            MessageFormat::Json,
701            MessageFormat::Avro,
702            vec![FieldMapping::with_transform(
703                "mid",
704                "dst",
705                TransformFn::Prefix(">>".to_string()),
706            )],
707        );
708        t.add_pipeline("p1", p1);
709        t.add_pipeline("p2", p2);
710
711        let payload = make_payload(&[("src", "hello")], MessageFormat::Json);
712        let result = t.chain_pipelines(&["p1", "p2"], payload).expect("ok");
713        assert_eq!(result.fields["dst"], ">>HELLO");
714        assert_eq!(result.format, MessageFormat::Avro);
715    }
716
717    #[test]
718    fn test_chain_three_pipelines() {
719        let mut t = MessageTransformer::new();
720
721        let p1 = TransformPipeline::new(
722            MessageFormat::Raw,
723            MessageFormat::Raw,
724            vec![FieldMapping::with_transform("a", "b", TransformFn::Trim)],
725        );
726        let p2 = TransformPipeline::new(
727            MessageFormat::Raw,
728            MessageFormat::Raw,
729            vec![FieldMapping::with_transform("b", "c", TransformFn::ToUpper)],
730        );
731        let p3 = TransformPipeline::new(
732            MessageFormat::Raw,
733            MessageFormat::Csv,
734            vec![FieldMapping::with_transform(
735                "c",
736                "d",
737                TransformFn::Suffix("!".to_string()),
738            )],
739        );
740        t.add_pipeline("p1", p1);
741        t.add_pipeline("p2", p2);
742        t.add_pipeline("p3", p3);
743
744        let payload = make_payload(&[("a", "  hi  ")], MessageFormat::Raw);
745        let result = t.chain_pipelines(&["p1", "p2", "p3"], payload).expect("ok");
746        assert_eq!(result.fields["d"], "HI!");
747        assert_eq!(result.format, MessageFormat::Csv);
748    }
749
750    #[test]
751    fn test_chain_empty_names() {
752        let t = MessageTransformer::new();
753        let payload = make_payload(&[("x", "y")], MessageFormat::Json);
754        let result = t.chain_pipelines(&[], payload.clone()).expect("ok");
755        assert_eq!(result, payload);
756    }
757
758    #[test]
759    fn test_chain_missing_pipeline_error() {
760        let t = MessageTransformer::new();
761        let payload = make_payload(&[("x", "y")], MessageFormat::Json);
762        let err = t.chain_pipelines(&["missing"], payload).unwrap_err();
763        assert!(matches!(err, TransformError::PipelineNotFound(_)));
764    }
765
766    // ── Error Display ─────────────────────────────────────────────────────
767
768    #[test]
769    fn test_error_display_pipeline_not_found() {
770        let e = TransformError::PipelineNotFound("p".to_string());
771        assert!(e.to_string().contains("pipeline not found"));
772        assert!(e.to_string().contains("p"));
773    }
774
775    #[test]
776    fn test_error_display_field_not_found() {
777        let e = TransformError::FieldNotFound("f".to_string());
778        assert!(e.to_string().contains("source field not found"));
779        assert!(e.to_string().contains("f"));
780    }
781
782    #[test]
783    fn test_error_display_transform_failed() {
784        let e = TransformError::TransformFailed("boom".to_string());
785        assert!(e.to_string().contains("transform failed"));
786        assert!(e.to_string().contains("boom"));
787    }
788
789    // ── MessageFormat ─────────────────────────────────────────────────────
790
791    #[test]
792    fn test_message_format_equality() {
793        assert_eq!(MessageFormat::Json, MessageFormat::Json);
794        assert_ne!(MessageFormat::Json, MessageFormat::Avro);
795    }
796
797    #[test]
798    fn test_all_message_formats_exist() {
799        let formats = [
800            MessageFormat::Json,
801            MessageFormat::Avro,
802            MessageFormat::Protobuf,
803            MessageFormat::Csv,
804            MessageFormat::Raw,
805        ];
806        assert_eq!(formats.len(), 5);
807    }
808
809    // ── FieldMapping constructors ─────────────────────────────────────────
810
811    #[test]
812    fn test_field_mapping_new_no_transform() {
813        let m = FieldMapping::new("src", "dst");
814        assert_eq!(m.source_field, "src");
815        assert_eq!(m.target_field, "dst");
816        assert!(m.transform.is_none());
817    }
818
819    #[test]
820    fn test_field_mapping_with_transform() {
821        let m = FieldMapping::with_transform("s", "t", TransformFn::ToUpper);
822        assert!(m.transform.is_some());
823    }
824
825    // ── Overwrite existing pipeline ───────────────────────────────────────
826
827    #[test]
828    fn test_add_pipeline_overwrites() {
829        let mut t = MessageTransformer::new();
830        let p1 = TransformPipeline::new(
831            MessageFormat::Json,
832            MessageFormat::Json,
833            vec![FieldMapping::new("a", "b")],
834        );
835        let p2 = TransformPipeline::new(
836            MessageFormat::Json,
837            MessageFormat::Avro,
838            vec![FieldMapping::new("a", "c")],
839        );
840        t.add_pipeline("p", p1);
841        t.add_pipeline("p", p2);
842        // After overwrite, mapping goes to "c"
843        let payload = make_payload(&[("a", "v")], MessageFormat::Json);
844        let result = t.transform("p", payload).expect("ok");
845        assert!(result.fields.contains_key("c"));
846        assert!(!result.fields.contains_key("b"));
847    }
848
849    // ── Trim + ToLower combined via chain ─────────────────────────────────
850
851    #[test]
852    fn test_chain_trim_then_lower() {
853        let mut t = MessageTransformer::new();
854        let p1 = TransformPipeline::new(
855            MessageFormat::Raw,
856            MessageFormat::Raw,
857            vec![FieldMapping::with_transform("x", "x", TransformFn::Trim)],
858        );
859        let p2 = TransformPipeline::new(
860            MessageFormat::Raw,
861            MessageFormat::Raw,
862            vec![FieldMapping::with_transform("x", "x", TransformFn::ToLower)],
863        );
864        t.add_pipeline("trim", p1);
865        t.add_pipeline("lower", p2);
866
867        let payload = make_payload(&[("x", "  HELLO  ")], MessageFormat::Raw);
868        let result = t.chain_pipelines(&["trim", "lower"], payload).expect("ok");
869        assert_eq!(result.fields["x"], "hello");
870    }
871
872    // ── Idempotency of Identity transform ─────────────────────────────────
873
874    #[test]
875    fn test_identity_in_pipeline() {
876        let mut t = MessageTransformer::new();
877        let p = TransformPipeline::new(
878            MessageFormat::Json,
879            MessageFormat::Json,
880            vec![FieldMapping::with_transform(
881                "k",
882                "k",
883                TransformFn::Identity,
884            )],
885        );
886        t.add_pipeline("id", p);
887        let payload = make_payload(&[("k", "unchanged_value_123")], MessageFormat::Json);
888        let result = t.transform("id", payload).expect("ok");
889        assert_eq!(result.fields["k"], "unchanged_value_123");
890    }
891}