1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use serde::Deserialize;
4use serde_json::Value;
5
6use crate::config::parse_config;
7use crate::envelope::Envelope;
8use crate::pipeline::ErrorPolicy;
9use crate::transforms::{BasicTransform, MapOne, Transform};
10
11pub struct MutateTransform {
33 id: String,
34 operations: Vec<Operation>,
35 on_missing: MissingMode,
36}
37
38impl MutateTransform {
39 pub fn new(id: impl Into<String>, operations: Vec<Operation>, on_missing: MissingMode) -> Self {
40 Self {
41 id: id.into(),
42 operations,
43 on_missing,
44 }
45 }
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
49#[serde(rename_all = "snake_case")]
50pub enum MissingMode {
51 #[default]
52 Strict,
53 Lenient,
54}
55
56#[derive(Debug, Clone, Deserialize)]
57#[serde(tag = "type", rename_all = "snake_case")]
58pub enum Operation {
59 AddField {
60 path: String,
61 value: Value,
62 },
63 RemoveField {
64 path: String,
65 },
66 RenameField {
67 from: String,
68 to: String,
69 },
70 Cast {
71 path: String,
72 #[serde(rename = "to")]
73 to_type: CastType,
74 },
75}
76
77#[derive(Debug, Clone, Copy, Deserialize)]
78#[serde(rename_all = "snake_case")]
79pub enum CastType {
80 String,
81 Int,
82 Float,
83 Bool,
84 Json,
85}
86
87#[async_trait]
88impl MapOne for MutateTransform {
89 fn id(&self) -> &str {
90 &self.id
91 }
92
93 async fn map(&self, mut env: Envelope) -> Result<Option<Envelope>> {
94 for op in &self.operations {
95 apply_operation(&mut env, op, self.on_missing)?;
96 }
97 Ok(Some(env))
98 }
99}
100
101fn apply_operation(env: &mut Envelope, op: &Operation, mode: MissingMode) -> Result<()> {
102 match op {
103 Operation::AddField { path, value } => {
104 if mode == MissingMode::Strict {
105 set_path(&mut env.payload, path, value.clone(), true)?;
106 } else {
107 set_path_lenient(&mut env.payload, path, value.clone())?;
108 }
109 }
110 Operation::RemoveField { path } => match remove_path(&mut env.payload, path)? {
111 Some(removed) => {
112 if !removed && mode == MissingMode::Strict {
113 bail!("mutate: field '{}' does not exist", path);
114 }
115 }
116 None => {
117 if mode == MissingMode::Strict {
118 bail!("mutate: field '{}' does not exist", path);
119 }
120 }
121 },
122 Operation::RenameField { from, to } => match remove_path_value(&mut env.payload, from)? {
123 Some(Some(v)) => {
124 set_path(&mut env.payload, to, v, true)?;
125 }
126 Some(None) => {
127 if mode == MissingMode::Strict {
128 bail!("mutate: field '{}' does not exist", from);
129 }
130 }
131 None => {
132 if mode == MissingMode::Strict {
133 bail!("mutate: field '{}' does not exist", from);
134 }
135 }
136 },
137 Operation::Cast { path, to_type } => {
138 let current = get_path_mut(&mut env.payload, path)?;
139 match current {
140 Some(v) => {
141 *v = cast_value(v, *to_type)?;
142 }
143 None => {
144 if mode == MissingMode::Strict {
145 bail!("mutate: field '{}' does not exist", path);
146 }
147 }
148 }
149 }
150 }
151 Ok(())
152}
153
154fn split_path(path: &str) -> Vec<&str> {
156 path.split('.').collect()
157}
158
159fn navigate_to_parent<'a>(
166 root: &'a mut Value,
167 path: &str,
168 create: bool,
169) -> Result<Option<(&'a mut serde_json::Map<String, Value>, String)>> {
170 let segments = split_path(path);
171 if segments.is_empty() {
172 bail!("mutate: empty path");
173 }
174
175 let mut current = root;
176 for segment in &segments[..segments.len() - 1] {
177 if create {
178 if !current.is_object() {
179 bail!("mutate: cannot create field inside non-object");
180 }
181 let map = current.as_object_mut().unwrap();
182 if !map.contains_key(*segment) {
183 map.insert(segment.to_string(), Value::Object(serde_json::Map::new()));
184 }
185 current = map.get_mut(*segment).unwrap();
186 } else {
187 match current {
188 Value::Object(map) => {
189 current = match map.get_mut(*segment) {
190 Some(v) => v,
191 None => return Ok(None),
192 };
193 }
194 _ => return Ok(None),
195 }
196 }
197 }
198
199 match current {
200 Value::Object(map) => Ok(Some((map, segments.last().unwrap().to_string()))),
201 _ => {
202 if create {
203 bail!("mutate: cannot create field inside non-object")
204 } else {
205 Ok(None)
206 }
207 }
208 }
209}
210
211fn set_path(root: &mut Value, path: &str, value: Value, create: bool) -> Result<()> {
212 let (map, key) = navigate_to_parent(root, path, create)?
213 .ok_or_else(|| anyhow::anyhow!("mutate: path '{}' does not exist", path))?;
214 map.insert(key, value);
215 Ok(())
216}
217
218fn set_path_lenient(root: &mut Value, path: &str, value: Value) -> Result<()> {
219 if let Some((map, key)) = navigate_to_parent_lenient_create(root, path)? {
220 map.insert(key, value);
221 }
222 Ok(())
223}
224
225fn navigate_to_parent_lenient_create<'a>(
226 root: &'a mut Value,
227 path: &str,
228) -> Result<Option<(&'a mut serde_json::Map<String, Value>, String)>> {
229 let segments = split_path(path);
230 if segments.is_empty() {
231 bail!("mutate: empty path");
232 }
233
234 let mut current = root;
235 for segment in &segments[..segments.len() - 1] {
236 if !current.is_object() {
237 return Ok(None);
238 }
239
240 let map = current.as_object_mut().unwrap();
241 if !map.contains_key(*segment) {
242 map.insert(segment.to_string(), Value::Object(serde_json::Map::new()));
243 }
244 current = map.get_mut(*segment).unwrap();
245 }
246
247 match current {
248 Value::Object(map) => Ok(Some((map, segments.last().unwrap().to_string()))),
249 _ => Ok(None),
250 }
251}
252
253fn remove_path(root: &mut Value, path: &str) -> Result<Option<bool>> {
254 Ok(navigate_to_parent(root, path, false)?.map(|(map, key)| map.remove(&key).is_some()))
255}
256
257fn remove_path_value(root: &mut Value, path: &str) -> Result<Option<Option<Value>>> {
258 Ok(navigate_to_parent(root, path, false)?.map(|(map, key)| map.remove(&key)))
259}
260
261fn get_path_mut<'a>(root: &'a mut Value, path: &str) -> Result<Option<&'a mut Value>> {
262 let segments = split_path(path);
263 if segments.is_empty() {
264 bail!("mutate: empty path");
265 }
266
267 let mut current = root;
268 for segment in &segments[..segments.len() - 1] {
269 match current {
270 Value::Object(map) => {
271 current = match map.get_mut(*segment) {
272 Some(v) => v,
273 None => return Ok(None),
274 };
275 }
276 _ => return Ok(None),
277 }
278 }
279
280 match current {
281 Value::Object(map) => {
282 let key = segments.last().unwrap();
283 Ok(map.get_mut(*key))
284 }
285 _ => Ok(None),
286 }
287}
288
289fn cast_value(value: &Value, to: CastType) -> Result<Value> {
290 match to {
291 CastType::String => Ok(Value::String(match value {
292 Value::String(s) => s.clone(),
293 other => other.to_string(),
294 })),
295 CastType::Int => {
296 let n = match value {
297 Value::Number(n) => n
298 .as_i64()
299 .or_else(|| n.as_f64().map(|f| f as i64))
300 .unwrap_or(0),
301 Value::String(s) => s.parse().unwrap_or(0),
302 Value::Bool(b) => i64::from(*b),
303 Value::Null => 0,
304 _ => bail!("mutate: cannot cast {} to int", value),
305 };
306 Ok(Value::Number(serde_json::Number::from(n)))
307 }
308 CastType::Float => {
309 let f = match value {
310 Value::Number(n) => n.as_f64().unwrap_or(0.0),
311 Value::String(s) => s.parse().unwrap_or(0.0),
312 Value::Bool(b) => f64::from(*b),
313 Value::Null => 0.0,
314 _ => bail!("mutate: cannot cast {} to float", value),
315 };
316 Ok(Value::Number(
317 serde_json::Number::from_f64(f).unwrap_or_else(|| serde_json::Number::from(0)),
318 ))
319 }
320 CastType::Bool => {
321 let b = match value {
322 Value::Bool(b) => *b,
323 Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
324 Value::String(s) => !s.is_empty() && s != "false" && s != "0",
325 Value::Null => false,
326 _ => bail!("mutate: cannot cast {} to bool", value),
327 };
328 Ok(Value::Bool(b))
329 }
330 CastType::Json => match value {
331 Value::String(s) => serde_json::from_str(s)
332 .map_err(|err| anyhow::anyhow!("mutate: cannot cast string to json: {err}")),
333 other => Ok(other.clone()),
334 },
335 }
336}
337
338#[derive(Debug, Deserialize)]
343struct MutateTransformConfig {
344 operations: Vec<Operation>,
345 #[serde(default)]
346 on_missing: MissingMode,
347}
348
349pub fn mutate_transform_factory(
352 id: &str,
353 config: Value,
354 on_error: ErrorPolicy,
355) -> Result<Box<dyn Transform>> {
356 let config: MutateTransformConfig = parse_config("mutate", config)?;
357 Ok(Box::new(
358 BasicTransform::new(MutateTransform::new(
359 id,
360 config.operations,
361 config.on_missing,
362 ))
363 .with_error_policy(on_error),
364 ))
365}
366
367#[cfg(test)]
368mod tests {
369 use serde_json::json;
370
371 use super::*;
372 use crate::Registry;
373 use crate::config::{ErrorPolicyConfig, TransformSpec};
374 use crate::envelope::Envelope;
375
376 #[tokio::test]
377 async fn add_field_creates_value() {
378 let t = MutateTransform::new(
379 "t",
380 vec![Operation::AddField {
381 path: "user.name".into(),
382 value: json!("alice"),
383 }],
384 MissingMode::Strict,
385 );
386 let env = Envelope::new("src", json!({}));
387 let out = t.map(env).await.unwrap().unwrap();
388 assert_eq!(out.payload["user"]["name"], "alice");
389 }
390
391 #[tokio::test]
392 async fn add_field_overwrites_existing() {
393 let t = MutateTransform::new(
394 "t",
395 vec![Operation::AddField {
396 path: "user.name".into(),
397 value: json!("bob"),
398 }],
399 MissingMode::Strict,
400 );
401 let env = Envelope::new("src", json!({ "user": { "name": "alice" } }));
402 let out = t.map(env).await.unwrap().unwrap();
403 assert_eq!(out.payload["user"]["name"], "bob");
404 }
405
406 #[tokio::test]
407 async fn add_field_lenient_creates_missing_intermediate_path() {
408 let t = MutateTransform::new(
409 "t",
410 vec![Operation::AddField {
411 path: "user.name".into(),
412 value: json!("alice"),
413 }],
414 MissingMode::Lenient,
415 );
416 let env = Envelope::new("src", json!({}));
417 let out = t.map(env).await.unwrap().unwrap();
418 assert_eq!(out.payload, json!({ "user": { "name": "alice" } }));
419 }
420
421 #[tokio::test]
422 async fn add_field_lenient_skips_non_object_parent() {
423 let t = MutateTransform::new(
424 "t",
425 vec![Operation::AddField {
426 path: "user.name".into(),
427 value: json!("alice"),
428 }],
429 MissingMode::Lenient,
430 );
431 let env = Envelope::new("src", json!({ "user": 1 }));
432 let out = t.map(env).await.unwrap().unwrap();
433 assert_eq!(out.payload, json!({ "user": 1 }));
434 }
435
436 #[tokio::test]
437 async fn remove_field_deletes_value() {
438 let t = MutateTransform::new(
439 "t",
440 vec![Operation::RemoveField {
441 path: "old_field".into(),
442 }],
443 MissingMode::Strict,
444 );
445 let env = Envelope::new("src", json!({ "old_field": 1, "keep": 2 }));
446 let out = t.map(env).await.unwrap().unwrap();
447 assert!(!out.payload.as_object().unwrap().contains_key("old_field"));
448 assert_eq!(out.payload["keep"], 2);
449 }
450
451 #[tokio::test]
452 async fn remove_field_strict_errors_when_missing() {
453 let t = MutateTransform::new(
454 "t",
455 vec![Operation::RemoveField {
456 path: "missing".into(),
457 }],
458 MissingMode::Strict,
459 );
460 let env = Envelope::new("src", json!({}));
461 assert!(t.map(env).await.is_err());
462 }
463
464 #[tokio::test]
465 async fn remove_field_lenient_ignores_missing() {
466 let t = MutateTransform::new(
467 "t",
468 vec![Operation::RemoveField {
469 path: "missing".into(),
470 }],
471 MissingMode::Lenient,
472 );
473 let env = Envelope::new("src", json!({}));
474 assert!(t.map(env).await.unwrap().is_some());
475 }
476
477 #[tokio::test]
478 async fn rename_field_moves_value() {
479 let t = MutateTransform::new(
480 "t",
481 vec![Operation::RenameField {
482 from: "old".into(),
483 to: "new".into(),
484 }],
485 MissingMode::Strict,
486 );
487 let env = Envelope::new("src", json!({ "old": 42 }));
488 let out = t.map(env).await.unwrap().unwrap();
489 assert!(!out.payload.as_object().unwrap().contains_key("old"));
490 assert_eq!(out.payload["new"], 42);
491 }
492
493 #[tokio::test]
494 async fn rename_field_lenient_ignores_missing() {
495 let t = MutateTransform::new(
496 "t",
497 vec![Operation::RenameField {
498 from: "missing".into(),
499 to: "new".into(),
500 }],
501 MissingMode::Lenient,
502 );
503 let env = Envelope::new("src", json!({}));
504 assert!(t.map(env).await.unwrap().is_some());
505 }
506
507 #[tokio::test]
508 async fn cast_to_string() {
509 let t = MutateTransform::new(
510 "t",
511 vec![Operation::Cast {
512 path: "value".into(),
513 to_type: CastType::String,
514 }],
515 MissingMode::Strict,
516 );
517 let env = Envelope::new("src", json!({ "value": 42 }));
518 let out = t.map(env).await.unwrap().unwrap();
519 assert_eq!(out.payload["value"], "42");
520 }
521
522 #[tokio::test]
523 async fn cast_string_to_int() {
524 let t = MutateTransform::new(
525 "t",
526 vec![Operation::Cast {
527 path: "value".into(),
528 to_type: CastType::Int,
529 }],
530 MissingMode::Strict,
531 );
532 let env = Envelope::new("src", json!({ "value": "99" }));
533 let out = t.map(env).await.unwrap().unwrap();
534 assert_eq!(out.payload["value"], 99);
535 }
536
537 #[tokio::test]
538 async fn cast_to_bool() {
539 let t = MutateTransform::new(
540 "t",
541 vec![Operation::Cast {
542 path: "value".into(),
543 to_type: CastType::Bool,
544 }],
545 MissingMode::Strict,
546 );
547 let env = Envelope::new("src", json!({ "value": 1 }));
548 let out = t.map(env).await.unwrap().unwrap();
549 assert_eq!(out.payload["value"], true);
550 }
551
552 #[tokio::test]
553 async fn cast_string_to_json() {
554 let t = MutateTransform::new(
555 "t",
556 vec![Operation::Cast {
557 path: "value".into(),
558 to_type: CastType::Json,
559 }],
560 MissingMode::Strict,
561 );
562 let env = Envelope::new("src", json!({ "value": "{\"nested\":true}" }));
563 let out = t.map(env).await.unwrap().unwrap();
564 assert_eq!(out.payload["value"], json!({ "nested": true }));
565 }
566
567 #[tokio::test]
568 async fn cast_string_to_json_errors_when_invalid() {
569 let t = MutateTransform::new(
570 "t",
571 vec![Operation::Cast {
572 path: "value".into(),
573 to_type: CastType::Json,
574 }],
575 MissingMode::Strict,
576 );
577 let env = Envelope::new("src", json!({ "value": "not json" }));
578 let err = t.map(env).await.expect_err("expected invalid json error");
579 assert!(
580 err.to_string().contains("cannot cast string to json"),
581 "{err}"
582 );
583 }
584
585 #[tokio::test]
586 async fn multiple_operations_applied_in_order() {
587 let t = MutateTransform::new(
588 "t",
589 vec![
590 Operation::AddField {
591 path: "a".into(),
592 value: json!(1),
593 },
594 Operation::RenameField {
595 from: "a".into(),
596 to: "b".into(),
597 },
598 ],
599 MissingMode::Strict,
600 );
601 let env = Envelope::new("src", json!({}));
602 let out = t.map(env).await.unwrap().unwrap();
603 assert_eq!(out.payload["b"], 1);
604 assert!(!out.payload.as_object().unwrap().contains_key("a"));
605 }
606
607 #[test]
608 fn factory_resolves_through_registry() {
609 let registry = Registry::with_builtins().unwrap();
610 registry
611 .build_transform(
612 "p/t0",
613 TransformSpec {
614 kind: "mutate".into(),
615 config: json!({
616 "operations": [
617 { "type": "add_field", "path": "x", "value": 1 }
618 ]
619 }),
620 on_error: Some(ErrorPolicyConfig::Drop),
621 },
622 )
623 .unwrap();
624 }
625
626 #[test]
627 fn factory_reports_invalid_config() {
628 let registry = Registry::with_builtins().unwrap();
629 let err = registry
630 .build_transform(
631 "p/t0",
632 TransformSpec {
633 kind: "mutate".into(),
634 config: json!({ "wrong_field": "x" }),
635 on_error: None,
636 },
637 )
638 .err()
639 .expect("expected invalid-config error");
640 let msg = format!("{err:#}");
641 assert!(
642 msg.contains("invalid config for component type 'mutate'"),
643 "{msg}",
644 );
645 }
646
647 #[tokio::test]
648 async fn cast_string_to_string_preserves_value() {
649 let t = MutateTransform::new(
650 "t",
651 vec![Operation::Cast {
652 path: "value".into(),
653 to_type: CastType::String,
654 }],
655 MissingMode::Strict,
656 );
657 let env = Envelope::new("src", json!({ "value": "hello" }));
658 let out = t.map(env).await.unwrap().unwrap();
659 assert_eq!(out.payload["value"], "hello");
660 }
661
662 #[tokio::test]
663 async fn lenient_skips_missing_intermediate_path() {
664 let ops = vec![
665 Operation::RemoveField {
666 path: "a.b.c".into(),
667 },
668 Operation::RenameField {
669 from: "x.y.z".into(),
670 to: "w".into(),
671 },
672 Operation::Cast {
673 path: "m.n.o".into(),
674 to_type: CastType::String,
675 },
676 ];
677 let t = MutateTransform::new("t", ops, MissingMode::Lenient);
678 let env = Envelope::new("src", json!({}));
679 let out = t.map(env).await.unwrap().unwrap();
680 assert_eq!(out.payload, json!({}));
681 }
682
683 #[tokio::test]
684 async fn lenient_skips_non_object_parent() {
685 let ops = vec![
688 Operation::RemoveField { path: "a.b".into() },
689 Operation::RenameField {
690 from: "a.c".into(),
691 to: "d".into(),
692 },
693 ];
694 let t = MutateTransform::new("t", ops, MissingMode::Lenient);
695 let env = Envelope::new("src", json!({ "a": 1 }));
696 let out = t.map(env).await.unwrap().unwrap();
697 assert_eq!(out.payload, json!({ "a": 1 }));
698 }
699}