Skip to main content

courier/transforms/
set_key.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use serde::Deserialize;
4use serde_json::Value;
5
6use crate::config::parse_config;
7use crate::envelope::Envelope;
8use crate::pipeline::ErrorPolicy;
9use crate::transforms::{BasicTransform, MapOne, Transform};
10
11/// Populates `meta.key` from a top-level field of the payload. String
12/// values are used as-is; other JSON types are stringified via `to_string`.
13/// Missing fields leave `meta.key` unchanged.
14pub struct SetKeyTransform {
15    id: String,
16    from_field: String,
17}
18
19impl SetKeyTransform {
20    pub fn new(id: impl Into<String>, from_field: impl Into<String>) -> Self {
21        Self {
22            id: id.into(),
23            from_field: from_field.into(),
24        }
25    }
26}
27
28#[async_trait]
29impl MapOne for SetKeyTransform {
30    fn id(&self) -> &str {
31        &self.id
32    }
33
34    async fn map(&self, mut env: Envelope) -> Result<Option<Envelope>> {
35        if let Some(v) = env.payload.get(&self.from_field) {
36            env.meta.key = Some(match v {
37                Value::String(s) => s.clone(),
38                other => other.to_string(),
39            });
40        }
41        Ok(Some(env))
42    }
43}
44
45#[derive(Debug, Deserialize)]
46struct SetKeyTransformConfig {
47    from_field: String,
48}
49
50/// Registry factory for [`SetKeyTransform`]. Registered by
51/// `courier::registry::register_builtin` under kind `"set_key"`.
52pub fn set_key_transform_factory(
53    id: &str,
54    config: Value,
55    on_error: ErrorPolicy,
56) -> Result<Box<dyn Transform>> {
57    let config: SetKeyTransformConfig = parse_config("set_key", config)?;
58    Ok(Box::new(
59        BasicTransform::new(SetKeyTransform::new(id, config.from_field))
60            .with_error_policy(on_error),
61    ))
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use serde_json::json;
68
69    use crate::Registry;
70    use crate::config::{ErrorPolicyConfig, TransformSpec};
71
72    #[tokio::test]
73    async fn sets_key_from_string_field() {
74        let t = SetKeyTransform::new("t", "user_id");
75        let env = Envelope::new("src", json!({ "user_id": "abc" }));
76        let out = t.map(env).await.unwrap().unwrap();
77        assert_eq!(out.meta.key.as_deref(), Some("abc"));
78    }
79
80    #[tokio::test]
81    async fn stringifies_non_string_field() {
82        let t = SetKeyTransform::new("t", "id");
83        let env = Envelope::new("src", json!({ "id": 42 }));
84        let out = t.map(env).await.unwrap().unwrap();
85        assert_eq!(out.meta.key.as_deref(), Some("42"));
86    }
87
88    #[tokio::test]
89    async fn leaves_key_unchanged_when_missing() {
90        let t = SetKeyTransform::new("t", "missing");
91        let env = Envelope::new("src", json!({ "other": 1 }));
92        let out = t.map(env).await.unwrap().unwrap();
93        assert!(out.meta.key.is_none());
94    }
95
96    #[test]
97    fn factory_resolves_through_registry() {
98        // Happy path for the factory wired into `with_builtins`.
99        let registry = Registry::with_builtins().unwrap();
100        registry
101            .build_transform(
102                "p/t0",
103                TransformSpec {
104                    kind: "set_key".into(),
105                    config: json!({ "from_field": "user_id" }),
106                    on_error: Some(ErrorPolicyConfig::Drop),
107                },
108            )
109            .unwrap();
110    }
111
112    #[test]
113    fn factory_reports_invalid_config() {
114        let registry = Registry::with_builtins().unwrap();
115        let err = registry
116            .build_transform(
117                "p/t0",
118                TransformSpec {
119                    kind: "set_key".into(),
120                    config: json!({ "wrong_field": "x" }),
121                    on_error: None,
122                },
123            )
124            .err()
125            .expect("expected invalid-config error");
126        let msg = format!("{err:#}");
127        assert!(
128            msg.contains("invalid config for component type 'set_key'"),
129            "{msg}",
130        );
131    }
132}