courier/transforms/
set_key.rs1use anyhow::Result;
2use async_trait::async_trait;
3use serde::Deserialize;
4use serde_json::Value;
5
6use crate::config::parse_config;
7use crate::envelope::Envelope;
8use crate::pipeline::ErrorPolicy;
9use crate::transforms::{BasicTransform, MapOne, Transform};
10
11pub struct SetKeyTransform {
15 id: String,
16 from_field: String,
17}
18
19impl SetKeyTransform {
20 pub fn new(id: impl Into<String>, from_field: impl Into<String>) -> Self {
21 Self {
22 id: id.into(),
23 from_field: from_field.into(),
24 }
25 }
26}
27
28#[async_trait]
29impl MapOne for SetKeyTransform {
30 fn id(&self) -> &str {
31 &self.id
32 }
33
34 async fn map(&self, mut env: Envelope) -> Result<Option<Envelope>> {
35 if let Some(v) = env.payload.get(&self.from_field) {
36 env.meta.key = Some(match v {
37 Value::String(s) => s.clone(),
38 other => other.to_string(),
39 });
40 }
41 Ok(Some(env))
42 }
43}
44
45#[derive(Debug, Deserialize)]
46struct SetKeyTransformConfig {
47 from_field: String,
48}
49
50pub fn set_key_transform_factory(
53 id: &str,
54 config: Value,
55 on_error: ErrorPolicy,
56) -> Result<Box<dyn Transform>> {
57 let config: SetKeyTransformConfig = parse_config("set_key", config)?;
58 Ok(Box::new(
59 BasicTransform::new(SetKeyTransform::new(id, config.from_field))
60 .with_error_policy(on_error),
61 ))
62}
63
64#[cfg(test)]
65mod tests {
66 use super::*;
67 use serde_json::json;
68
69 use crate::Registry;
70 use crate::config::{ErrorPolicyConfig, TransformSpec};
71
72 #[tokio::test]
73 async fn sets_key_from_string_field() {
74 let t = SetKeyTransform::new("t", "user_id");
75 let env = Envelope::new("src", json!({ "user_id": "abc" }));
76 let out = t.map(env).await.unwrap().unwrap();
77 assert_eq!(out.meta.key.as_deref(), Some("abc"));
78 }
79
80 #[tokio::test]
81 async fn stringifies_non_string_field() {
82 let t = SetKeyTransform::new("t", "id");
83 let env = Envelope::new("src", json!({ "id": 42 }));
84 let out = t.map(env).await.unwrap().unwrap();
85 assert_eq!(out.meta.key.as_deref(), Some("42"));
86 }
87
88 #[tokio::test]
89 async fn leaves_key_unchanged_when_missing() {
90 let t = SetKeyTransform::new("t", "missing");
91 let env = Envelope::new("src", json!({ "other": 1 }));
92 let out = t.map(env).await.unwrap().unwrap();
93 assert!(out.meta.key.is_none());
94 }
95
96 #[test]
97 fn factory_resolves_through_registry() {
98 let registry = Registry::with_builtins().unwrap();
100 registry
101 .build_transform(
102 "p/t0",
103 TransformSpec {
104 kind: "set_key".into(),
105 config: json!({ "from_field": "user_id" }),
106 on_error: Some(ErrorPolicyConfig::Drop),
107 },
108 )
109 .unwrap();
110 }
111
112 #[test]
113 fn factory_reports_invalid_config() {
114 let registry = Registry::with_builtins().unwrap();
115 let err = registry
116 .build_transform(
117 "p/t0",
118 TransformSpec {
119 kind: "set_key".into(),
120 config: json!({ "wrong_field": "x" }),
121 on_error: None,
122 },
123 )
124 .err()
125 .expect("expected invalid-config error");
126 let msg = format!("{err:#}");
127 assert!(
128 msg.contains("invalid config for component type 'set_key'"),
129 "{msg}",
130 );
131 }
132}