Skip to main content

courier/transforms/
mutate.rs

1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use serde::Deserialize;
4use serde_json::Value;
5
6use crate::config::parse_config;
7use crate::envelope::Envelope;
8use crate::pipeline::ErrorPolicy;
9use crate::transforms::{BasicTransform, MapOne, Transform};
10
11/// Performs structural changes to an envelope without a scripting runtime.
12///
13/// Operations are applied in order. Each operation targets a dotted path
14/// inside the envelope payload (e.g. `payload.user.id`). The root `payload`
15/// is implicit — paths are relative to the payload object.
16///
17/// Supported operations:
18/// - `add_field` — insert or overwrite a field. Creates intermediate objects
19///   as needed.
20/// - `remove_field` — delete a field. In strict mode the field must exist.
21/// - `rename_field` — move a value from one path to another. In strict mode
22///   the source must exist.
23/// - `cast` — convert a scalar value to another JSON type (`string`, `int`,
24///   `float`, `bool`, `json`).
25///
26/// # Missing-field handling
27/// `on_missing` controls behavior when an operation references a path that
28/// does not exist:
29/// - `strict` (default) — the transform returns an error, which is handled
30///   by the configured `ErrorPolicy`.
31/// - `lenient` — the operation is silently skipped.
32pub struct MutateTransform {
33    id: String,
34    operations: Vec<Operation>,
35    on_missing: MissingMode,
36}
37
38impl MutateTransform {
39    pub fn new(id: impl Into<String>, operations: Vec<Operation>, on_missing: MissingMode) -> Self {
40        Self {
41            id: id.into(),
42            operations,
43            on_missing,
44        }
45    }
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum MissingMode {
51    #[default]
52    Strict,
53    Lenient,
54}
55
56#[derive(Debug, Clone, Deserialize)]
57#[serde(tag = "type", rename_all = "snake_case")]
58pub enum Operation {
59    AddField {
60        path: String,
61        value: Value,
62    },
63    RemoveField {
64        path: String,
65    },
66    RenameField {
67        from: String,
68        to: String,
69    },
70    Cast {
71        path: String,
72        #[serde(rename = "to")]
73        to_type: CastType,
74    },
75}
76
77#[derive(Debug, Clone, Copy, Deserialize)]
78#[serde(rename_all = "snake_case")]
79pub enum CastType {
80    String,
81    Int,
82    Float,
83    Bool,
84    Json,
85}
86
87#[async_trait]
88impl MapOne for MutateTransform {
89    fn id(&self) -> &str {
90        &self.id
91    }
92
93    async fn map(&self, mut env: Envelope) -> Result<Option<Envelope>> {
94        for op in &self.operations {
95            apply_operation(&mut env, op, self.on_missing)?;
96        }
97        Ok(Some(env))
98    }
99}
100
101fn apply_operation(env: &mut Envelope, op: &Operation, mode: MissingMode) -> Result<()> {
102    match op {
103        Operation::AddField { path, value } => {
104            if mode == MissingMode::Strict {
105                set_path(&mut env.payload, path, value.clone(), true)?;
106            } else {
107                set_path_lenient(&mut env.payload, path, value.clone())?;
108            }
109        }
110        Operation::RemoveField { path } => match remove_path(&mut env.payload, path)? {
111            Some(removed) => {
112                if !removed && mode == MissingMode::Strict {
113                    bail!("mutate: field '{}' does not exist", path);
114                }
115            }
116            None => {
117                if mode == MissingMode::Strict {
118                    bail!("mutate: field '{}' does not exist", path);
119                }
120            }
121        },
122        Operation::RenameField { from, to } => match remove_path_value(&mut env.payload, from)? {
123            Some(Some(v)) => {
124                set_path(&mut env.payload, to, v, true)?;
125            }
126            Some(None) => {
127                if mode == MissingMode::Strict {
128                    bail!("mutate: field '{}' does not exist", from);
129                }
130            }
131            None => {
132                if mode == MissingMode::Strict {
133                    bail!("mutate: field '{}' does not exist", from);
134                }
135            }
136        },
137        Operation::Cast { path, to_type } => {
138            let current = get_path_mut(&mut env.payload, path)?;
139            match current {
140                Some(v) => {
141                    *v = cast_value(v, *to_type)?;
142                }
143                None => {
144                    if mode == MissingMode::Strict {
145                        bail!("mutate: field '{}' does not exist", path);
146                    }
147                }
148            }
149        }
150    }
151    Ok(())
152}
153
154/// Splits a dotted path into segments.
155fn split_path(path: &str) -> Vec<&str> {
156    path.split('.').collect()
157}
158
159/// Navigate to the parent object of the last segment, optionally creating
160/// intermediate objects.
161///
162/// Returns `Ok(None)` when an intermediate segment is missing and `create` is
163/// `false`. Callers in lenient mode should treat `None` as "path does not
164/// exist" and skip the operation.
165fn navigate_to_parent<'a>(
166    root: &'a mut Value,
167    path: &str,
168    create: bool,
169) -> Result<Option<(&'a mut serde_json::Map<String, Value>, String)>> {
170    let segments = split_path(path);
171    if segments.is_empty() {
172        bail!("mutate: empty path");
173    }
174
175    let mut current = root;
176    for segment in &segments[..segments.len() - 1] {
177        if create {
178            if !current.is_object() {
179                bail!("mutate: cannot create field inside non-object");
180            }
181            let map = current.as_object_mut().unwrap();
182            if !map.contains_key(*segment) {
183                map.insert(segment.to_string(), Value::Object(serde_json::Map::new()));
184            }
185            current = map.get_mut(*segment).unwrap();
186        } else {
187            match current {
188                Value::Object(map) => {
189                    current = match map.get_mut(*segment) {
190                        Some(v) => v,
191                        None => return Ok(None),
192                    };
193                }
194                _ => return Ok(None),
195            }
196        }
197    }
198
199    match current {
200        Value::Object(map) => Ok(Some((map, segments.last().unwrap().to_string()))),
201        _ => {
202            if create {
203                bail!("mutate: cannot create field inside non-object")
204            } else {
205                Ok(None)
206            }
207        }
208    }
209}
210
211fn set_path(root: &mut Value, path: &str, value: Value, create: bool) -> Result<()> {
212    let (map, key) = navigate_to_parent(root, path, create)?
213        .ok_or_else(|| anyhow::anyhow!("mutate: path '{}' does not exist", path))?;
214    map.insert(key, value);
215    Ok(())
216}
217
218fn set_path_lenient(root: &mut Value, path: &str, value: Value) -> Result<()> {
219    if let Some((map, key)) = navigate_to_parent_lenient_create(root, path)? {
220        map.insert(key, value);
221    }
222    Ok(())
223}
224
225fn navigate_to_parent_lenient_create<'a>(
226    root: &'a mut Value,
227    path: &str,
228) -> Result<Option<(&'a mut serde_json::Map<String, Value>, String)>> {
229    let segments = split_path(path);
230    if segments.is_empty() {
231        bail!("mutate: empty path");
232    }
233
234    let mut current = root;
235    for segment in &segments[..segments.len() - 1] {
236        if !current.is_object() {
237            return Ok(None);
238        }
239
240        let map = current.as_object_mut().unwrap();
241        if !map.contains_key(*segment) {
242            map.insert(segment.to_string(), Value::Object(serde_json::Map::new()));
243        }
244        current = map.get_mut(*segment).unwrap();
245    }
246
247    match current {
248        Value::Object(map) => Ok(Some((map, segments.last().unwrap().to_string()))),
249        _ => Ok(None),
250    }
251}
252
253fn remove_path(root: &mut Value, path: &str) -> Result<Option<bool>> {
254    Ok(navigate_to_parent(root, path, false)?.map(|(map, key)| map.remove(&key).is_some()))
255}
256
257fn remove_path_value(root: &mut Value, path: &str) -> Result<Option<Option<Value>>> {
258    Ok(navigate_to_parent(root, path, false)?.map(|(map, key)| map.remove(&key)))
259}
260
261fn get_path_mut<'a>(root: &'a mut Value, path: &str) -> Result<Option<&'a mut Value>> {
262    let segments = split_path(path);
263    if segments.is_empty() {
264        bail!("mutate: empty path");
265    }
266
267    let mut current = root;
268    for segment in &segments[..segments.len() - 1] {
269        match current {
270            Value::Object(map) => {
271                current = match map.get_mut(*segment) {
272                    Some(v) => v,
273                    None => return Ok(None),
274                };
275            }
276            _ => return Ok(None),
277        }
278    }
279
280    match current {
281        Value::Object(map) => {
282            let key = segments.last().unwrap();
283            Ok(map.get_mut(*key))
284        }
285        _ => Ok(None),
286    }
287}
288
289fn cast_value(value: &Value, to: CastType) -> Result<Value> {
290    match to {
291        CastType::String => Ok(Value::String(match value {
292            Value::String(s) => s.clone(),
293            other => other.to_string(),
294        })),
295        CastType::Int => {
296            let n = match value {
297                Value::Number(n) => n
298                    .as_i64()
299                    .or_else(|| n.as_f64().map(|f| f as i64))
300                    .unwrap_or(0),
301                Value::String(s) => s.parse().unwrap_or(0),
302                Value::Bool(b) => i64::from(*b),
303                Value::Null => 0,
304                _ => bail!("mutate: cannot cast {} to int", value),
305            };
306            Ok(Value::Number(serde_json::Number::from(n)))
307        }
308        CastType::Float => {
309            let f = match value {
310                Value::Number(n) => n.as_f64().unwrap_or(0.0),
311                Value::String(s) => s.parse().unwrap_or(0.0),
312                Value::Bool(b) => f64::from(*b),
313                Value::Null => 0.0,
314                _ => bail!("mutate: cannot cast {} to float", value),
315            };
316            Ok(Value::Number(
317                serde_json::Number::from_f64(f).unwrap_or_else(|| serde_json::Number::from(0)),
318            ))
319        }
320        CastType::Bool => {
321            let b = match value {
322                Value::Bool(b) => *b,
323                Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
324                Value::String(s) => !s.is_empty() && s != "false" && s != "0",
325                Value::Null => false,
326                _ => bail!("mutate: cannot cast {} to bool", value),
327            };
328            Ok(Value::Bool(b))
329        }
330        CastType::Json => match value {
331            Value::String(s) => serde_json::from_str(s)
332                .map_err(|err| anyhow::anyhow!("mutate: cannot cast string to json: {err}")),
333            other => Ok(other.clone()),
334        },
335    }
336}
337
338// ------------------------------------------------------------------
339// Config + Factory
340// ------------------------------------------------------------------
341
342#[derive(Debug, Deserialize)]
343struct MutateTransformConfig {
344    operations: Vec<Operation>,
345    #[serde(default)]
346    on_missing: MissingMode,
347}
348
349/// Registry factory for [`MutateTransform`]. Registered by
350/// `courier::registry::register_builtin` under kind `"mutate"`.
351pub fn mutate_transform_factory(
352    id: &str,
353    config: Value,
354    on_error: ErrorPolicy,
355) -> Result<Box<dyn Transform>> {
356    let config: MutateTransformConfig = parse_config("mutate", config)?;
357    Ok(Box::new(
358        BasicTransform::new(MutateTransform::new(
359            id,
360            config.operations,
361            config.on_missing,
362        ))
363        .with_error_policy(on_error),
364    ))
365}
366
367#[cfg(test)]
368mod tests {
369    use serde_json::json;
370
371    use super::*;
372    use crate::Registry;
373    use crate::config::{ErrorPolicyConfig, TransformSpec};
374    use crate::envelope::Envelope;
375
376    #[tokio::test]
377    async fn add_field_creates_value() {
378        let t = MutateTransform::new(
379            "t",
380            vec![Operation::AddField {
381                path: "user.name".into(),
382                value: json!("alice"),
383            }],
384            MissingMode::Strict,
385        );
386        let env = Envelope::new("src", json!({}));
387        let out = t.map(env).await.unwrap().unwrap();
388        assert_eq!(out.payload["user"]["name"], "alice");
389    }
390
391    #[tokio::test]
392    async fn add_field_overwrites_existing() {
393        let t = MutateTransform::new(
394            "t",
395            vec![Operation::AddField {
396                path: "user.name".into(),
397                value: json!("bob"),
398            }],
399            MissingMode::Strict,
400        );
401        let env = Envelope::new("src", json!({ "user": { "name": "alice" } }));
402        let out = t.map(env).await.unwrap().unwrap();
403        assert_eq!(out.payload["user"]["name"], "bob");
404    }
405
406    #[tokio::test]
407    async fn add_field_lenient_creates_missing_intermediate_path() {
408        let t = MutateTransform::new(
409            "t",
410            vec![Operation::AddField {
411                path: "user.name".into(),
412                value: json!("alice"),
413            }],
414            MissingMode::Lenient,
415        );
416        let env = Envelope::new("src", json!({}));
417        let out = t.map(env).await.unwrap().unwrap();
418        assert_eq!(out.payload, json!({ "user": { "name": "alice" } }));
419    }
420
421    #[tokio::test]
422    async fn add_field_lenient_skips_non_object_parent() {
423        let t = MutateTransform::new(
424            "t",
425            vec![Operation::AddField {
426                path: "user.name".into(),
427                value: json!("alice"),
428            }],
429            MissingMode::Lenient,
430        );
431        let env = Envelope::new("src", json!({ "user": 1 }));
432        let out = t.map(env).await.unwrap().unwrap();
433        assert_eq!(out.payload, json!({ "user": 1 }));
434    }
435
436    #[tokio::test]
437    async fn remove_field_deletes_value() {
438        let t = MutateTransform::new(
439            "t",
440            vec![Operation::RemoveField {
441                path: "old_field".into(),
442            }],
443            MissingMode::Strict,
444        );
445        let env = Envelope::new("src", json!({ "old_field": 1, "keep": 2 }));
446        let out = t.map(env).await.unwrap().unwrap();
447        assert!(!out.payload.as_object().unwrap().contains_key("old_field"));
448        assert_eq!(out.payload["keep"], 2);
449    }
450
451    #[tokio::test]
452    async fn remove_field_strict_errors_when_missing() {
453        let t = MutateTransform::new(
454            "t",
455            vec![Operation::RemoveField {
456                path: "missing".into(),
457            }],
458            MissingMode::Strict,
459        );
460        let env = Envelope::new("src", json!({}));
461        assert!(t.map(env).await.is_err());
462    }
463
464    #[tokio::test]
465    async fn remove_field_lenient_ignores_missing() {
466        let t = MutateTransform::new(
467            "t",
468            vec![Operation::RemoveField {
469                path: "missing".into(),
470            }],
471            MissingMode::Lenient,
472        );
473        let env = Envelope::new("src", json!({}));
474        assert!(t.map(env).await.unwrap().is_some());
475    }
476
477    #[tokio::test]
478    async fn rename_field_moves_value() {
479        let t = MutateTransform::new(
480            "t",
481            vec![Operation::RenameField {
482                from: "old".into(),
483                to: "new".into(),
484            }],
485            MissingMode::Strict,
486        );
487        let env = Envelope::new("src", json!({ "old": 42 }));
488        let out = t.map(env).await.unwrap().unwrap();
489        assert!(!out.payload.as_object().unwrap().contains_key("old"));
490        assert_eq!(out.payload["new"], 42);
491    }
492
493    #[tokio::test]
494    async fn rename_field_lenient_ignores_missing() {
495        let t = MutateTransform::new(
496            "t",
497            vec![Operation::RenameField {
498                from: "missing".into(),
499                to: "new".into(),
500            }],
501            MissingMode::Lenient,
502        );
503        let env = Envelope::new("src", json!({}));
504        assert!(t.map(env).await.unwrap().is_some());
505    }
506
507    #[tokio::test]
508    async fn cast_to_string() {
509        let t = MutateTransform::new(
510            "t",
511            vec![Operation::Cast {
512                path: "value".into(),
513                to_type: CastType::String,
514            }],
515            MissingMode::Strict,
516        );
517        let env = Envelope::new("src", json!({ "value": 42 }));
518        let out = t.map(env).await.unwrap().unwrap();
519        assert_eq!(out.payload["value"], "42");
520    }
521
522    #[tokio::test]
523    async fn cast_string_to_int() {
524        let t = MutateTransform::new(
525            "t",
526            vec![Operation::Cast {
527                path: "value".into(),
528                to_type: CastType::Int,
529            }],
530            MissingMode::Strict,
531        );
532        let env = Envelope::new("src", json!({ "value": "99" }));
533        let out = t.map(env).await.unwrap().unwrap();
534        assert_eq!(out.payload["value"], 99);
535    }
536
537    #[tokio::test]
538    async fn cast_to_bool() {
539        let t = MutateTransform::new(
540            "t",
541            vec![Operation::Cast {
542                path: "value".into(),
543                to_type: CastType::Bool,
544            }],
545            MissingMode::Strict,
546        );
547        let env = Envelope::new("src", json!({ "value": 1 }));
548        let out = t.map(env).await.unwrap().unwrap();
549        assert_eq!(out.payload["value"], true);
550    }
551
552    #[tokio::test]
553    async fn cast_string_to_json() {
554        let t = MutateTransform::new(
555            "t",
556            vec![Operation::Cast {
557                path: "value".into(),
558                to_type: CastType::Json,
559            }],
560            MissingMode::Strict,
561        );
562        let env = Envelope::new("src", json!({ "value": "{\"nested\":true}" }));
563        let out = t.map(env).await.unwrap().unwrap();
564        assert_eq!(out.payload["value"], json!({ "nested": true }));
565    }
566
567    #[tokio::test]
568    async fn cast_string_to_json_errors_when_invalid() {
569        let t = MutateTransform::new(
570            "t",
571            vec![Operation::Cast {
572                path: "value".into(),
573                to_type: CastType::Json,
574            }],
575            MissingMode::Strict,
576        );
577        let env = Envelope::new("src", json!({ "value": "not json" }));
578        let err = t.map(env).await.expect_err("expected invalid json error");
579        assert!(
580            err.to_string().contains("cannot cast string to json"),
581            "{err}"
582        );
583    }
584
585    #[tokio::test]
586    async fn multiple_operations_applied_in_order() {
587        let t = MutateTransform::new(
588            "t",
589            vec![
590                Operation::AddField {
591                    path: "a".into(),
592                    value: json!(1),
593                },
594                Operation::RenameField {
595                    from: "a".into(),
596                    to: "b".into(),
597                },
598            ],
599            MissingMode::Strict,
600        );
601        let env = Envelope::new("src", json!({}));
602        let out = t.map(env).await.unwrap().unwrap();
603        assert_eq!(out.payload["b"], 1);
604        assert!(!out.payload.as_object().unwrap().contains_key("a"));
605    }
606
607    #[test]
608    fn factory_resolves_through_registry() {
609        let registry = Registry::with_builtins().unwrap();
610        registry
611            .build_transform(
612                "p/t0",
613                TransformSpec {
614                    kind: "mutate".into(),
615                    config: json!({
616                        "operations": [
617                            { "type": "add_field", "path": "x", "value": 1 }
618                        ]
619                    }),
620                    on_error: Some(ErrorPolicyConfig::Drop),
621                },
622            )
623            .unwrap();
624    }
625
626    #[test]
627    fn factory_reports_invalid_config() {
628        let registry = Registry::with_builtins().unwrap();
629        let err = registry
630            .build_transform(
631                "p/t0",
632                TransformSpec {
633                    kind: "mutate".into(),
634                    config: json!({ "wrong_field": "x" }),
635                    on_error: None,
636                },
637            )
638            .err()
639            .expect("expected invalid-config error");
640        let msg = format!("{err:#}");
641        assert!(
642            msg.contains("invalid config for component type 'mutate'"),
643            "{msg}",
644        );
645    }
646
647    #[tokio::test]
648    async fn cast_string_to_string_preserves_value() {
649        let t = MutateTransform::new(
650            "t",
651            vec![Operation::Cast {
652                path: "value".into(),
653                to_type: CastType::String,
654            }],
655            MissingMode::Strict,
656        );
657        let env = Envelope::new("src", json!({ "value": "hello" }));
658        let out = t.map(env).await.unwrap().unwrap();
659        assert_eq!(out.payload["value"], "hello");
660    }
661
662    #[tokio::test]
663    async fn lenient_skips_missing_intermediate_path() {
664        let ops = vec![
665            Operation::RemoveField {
666                path: "a.b.c".into(),
667            },
668            Operation::RenameField {
669                from: "x.y.z".into(),
670                to: "w".into(),
671            },
672            Operation::Cast {
673                path: "m.n.o".into(),
674                to_type: CastType::String,
675            },
676        ];
677        let t = MutateTransform::new("t", ops, MissingMode::Lenient);
678        let env = Envelope::new("src", json!({}));
679        let out = t.map(env).await.unwrap().unwrap();
680        assert_eq!(out.payload, json!({}));
681    }
682
683    #[tokio::test]
684    async fn lenient_skips_non_object_parent() {
685        // When an intermediate segment is a scalar (not an object),
686        // lenient mode should skip the operation instead of erroring.
687        let ops = vec![
688            Operation::RemoveField { path: "a.b".into() },
689            Operation::RenameField {
690                from: "a.c".into(),
691                to: "d".into(),
692            },
693        ];
694        let t = MutateTransform::new("t", ops, MissingMode::Lenient);
695        let env = Envelope::new("src", json!({ "a": 1 }));
696        let out = t.map(env).await.unwrap().unwrap();
697        assert_eq!(out.payload, json!({ "a": 1 }));
698    }
699}