Skip to main content

this/events/operators/
map.rs

1//! Map operator — transforms the payload via Tera templates
2//!
3//! The map operator takes a JSON template where string values can contain
4//! Tera expressions (e.g., `{{ owner.name }}`). At execution time, the
5//! FlowContext variables are injected into the Tera context and each
6//! string value is rendered as a template.
7//!
8//! The rendered result is stored as the `_payload` variable in the context,
9//! which is then used by the `deliver` operator to send to sinks.
10//!
11//! ```yaml
12//! - map:
13//!     template:
14//!       title: "{{ owner.name }} started following you"
15//!       body: "You have a new follower!"
16//!       icon: "follow"
17//!       data:
18//!         follower_id: "{{ source_id }}"
19//! ```
20
21use crate::config::events::MapConfig;
22use crate::events::context::FlowContext;
23use crate::events::operators::{OpResult, PipelineOperator};
24use anyhow::{Result, anyhow};
25use async_trait::async_trait;
26use serde_json::Value;
27
28/// Compiled map operator
29#[derive(Debug, Clone)]
30pub struct MapOp {
31    /// The JSON template to render
32    template: Value,
33}
34
35impl MapOp {
36    /// Create a MapOp from a MapConfig
37    pub fn from_config(config: &MapConfig) -> Self {
38        Self {
39            template: config.template.clone(),
40        }
41    }
42}
43
44#[async_trait]
45impl PipelineOperator for MapOp {
46    async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
47        let tera_ctx = build_tera_context(ctx);
48        let rendered = render_value(&self.template, &tera_ctx)?;
49        ctx.set_var("_payload", rendered);
50        Ok(OpResult::Continue)
51    }
52
53    fn name(&self) -> &str {
54        "map"
55    }
56}
57
58/// Build a Tera context from FlowContext variables
59fn build_tera_context(ctx: &FlowContext) -> tera::Context {
60    let mut tera_ctx = tera::Context::new();
61    for (key, value) in &ctx.variables {
62        tera_ctx.insert(key, value);
63    }
64    tera_ctx
65}
66
67/// Recursively render a JSON value, treating string values as Tera templates
68fn render_value(template: &Value, tera_ctx: &tera::Context) -> Result<Value> {
69    match template {
70        Value::String(s) => {
71            // Render the string as a Tera template
72            let rendered = tera::Tera::one_off(s, tera_ctx, false)
73                .map_err(|e| anyhow!("map: template rendering failed for '{}': {}", s, e))?;
74            Ok(Value::String(rendered))
75        }
76        Value::Object(map) => {
77            // Recursively render each value in the object
78            let mut result = serde_json::Map::new();
79            for (key, value) in map {
80                result.insert(key.clone(), render_value(value, tera_ctx)?);
81            }
82            Ok(Value::Object(result))
83        }
84        Value::Array(arr) => {
85            // Recursively render each element in the array
86            let result: Result<Vec<Value>> =
87                arr.iter().map(|v| render_value(v, tera_ctx)).collect();
88            Ok(Value::Array(result?))
89        }
90        // Numbers, booleans, null — pass through unchanged
91        other => Ok(other.clone()),
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98    use crate::config::events::MapConfig;
99    use crate::core::events::{EntityEvent, FrameworkEvent, LinkEvent};
100    use crate::core::service::LinkService;
101    use serde_json::json;
102    use std::collections::HashMap;
103    use std::sync::Arc;
104    use uuid::Uuid;
105
106    // ── Mock LinkService ─────────────────────────────────────────────
107
108    struct MockLinkService;
109
110    #[async_trait]
111    impl LinkService for MockLinkService {
112        async fn create(
113            &self,
114            _link: crate::core::link::LinkEntity,
115        ) -> Result<crate::core::link::LinkEntity> {
116            unimplemented!()
117        }
118        async fn get(&self, _id: &Uuid) -> Result<Option<crate::core::link::LinkEntity>> {
119            unimplemented!()
120        }
121        async fn list(&self) -> Result<Vec<crate::core::link::LinkEntity>> {
122            unimplemented!()
123        }
124        async fn find_by_source(
125            &self,
126            _source_id: &Uuid,
127            _link_type: Option<&str>,
128            _target_type: Option<&str>,
129        ) -> Result<Vec<crate::core::link::LinkEntity>> {
130            unimplemented!()
131        }
132        async fn find_by_target(
133            &self,
134            _target_id: &Uuid,
135            _link_type: Option<&str>,
136            _source_type: Option<&str>,
137        ) -> Result<Vec<crate::core::link::LinkEntity>> {
138            unimplemented!()
139        }
140        async fn update(
141            &self,
142            _id: &Uuid,
143            _link: crate::core::link::LinkEntity,
144        ) -> Result<crate::core::link::LinkEntity> {
145            unimplemented!()
146        }
147        async fn delete(&self, _id: &Uuid) -> Result<()> {
148            unimplemented!()
149        }
150        async fn delete_by_entity(&self, _entity_id: &Uuid) -> Result<()> {
151            unimplemented!()
152        }
153    }
154
155    fn mock_link_service() -> Arc<dyn LinkService> {
156        Arc::new(MockLinkService)
157    }
158
159    fn make_link_context() -> FlowContext {
160        let source_id = Uuid::new_v4();
161        let target_id = Uuid::new_v4();
162        let event = FrameworkEvent::Link(LinkEvent::Created {
163            link_type: "follows".to_string(),
164            link_id: Uuid::new_v4(),
165            source_id,
166            target_id,
167            metadata: None,
168        });
169        FlowContext::new(event, mock_link_service(), HashMap::new())
170    }
171
172    fn make_entity_context() -> FlowContext {
173        let event = FrameworkEvent::Entity(EntityEvent::Created {
174            entity_type: "user".to_string(),
175            entity_id: Uuid::new_v4(),
176            data: json!({"name": "Alice"}),
177        });
178        FlowContext::new(event, mock_link_service(), HashMap::new())
179    }
180
181    // ── Tests ────────────────────────────────────────────────────────
182
183    #[tokio::test]
184    async fn test_map_simple_string() {
185        let mut ctx = make_entity_context();
186        ctx.set_var("owner", json!({"name": "Alice"}));
187
188        let op = MapOp::from_config(&MapConfig {
189            template: json!({
190                "title": "Hello {{ owner.name }}!",
191                "body": "Welcome"
192            }),
193        });
194
195        let result = op.execute(&mut ctx).await.unwrap();
196        assert!(matches!(result, OpResult::Continue));
197
198        let payload = ctx.get_var("_payload").unwrap();
199        assert_eq!(payload["title"], "Hello Alice!");
200        assert_eq!(payload["body"], "Welcome");
201    }
202
203    #[tokio::test]
204    async fn test_map_with_context_variables() {
205        let mut ctx = make_link_context();
206        ctx.set_var("owner", json!({"name": "Alice", "id": "abc-123"}));
207        ctx.set_var("follower", json!({"name": "Bob", "id": "def-456"}));
208
209        let op = MapOp::from_config(&MapConfig {
210            template: json!({
211                "title": "{{ follower.name }} started following {{ owner.name }}",
212                "icon": "follow",
213                "data": {
214                    "follower_id": "{{ follower.id }}",
215                    "owner_id": "{{ owner.id }}"
216                }
217            }),
218        });
219
220        let result = op.execute(&mut ctx).await.unwrap();
221        assert!(matches!(result, OpResult::Continue));
222
223        let payload = ctx.get_var("_payload").unwrap();
224        assert_eq!(payload["title"], "Bob started following Alice");
225        assert_eq!(payload["icon"], "follow");
226        assert_eq!(payload["data"]["follower_id"], "def-456");
227        assert_eq!(payload["data"]["owner_id"], "abc-123");
228    }
229
230    #[tokio::test]
231    async fn test_map_preserves_non_string_values() {
232        let mut ctx = make_entity_context();
233
234        let op = MapOp::from_config(&MapConfig {
235            template: json!({
236                "count": 42,
237                "active": true,
238                "tags": ["a", "b"],
239                "title": "Static title"
240            }),
241        });
242
243        let result = op.execute(&mut ctx).await.unwrap();
244        assert!(matches!(result, OpResult::Continue));
245
246        let payload = ctx.get_var("_payload").unwrap();
247        assert_eq!(payload["count"], 42);
248        assert_eq!(payload["active"], true);
249        assert_eq!(payload["tags"][0], "a");
250        assert_eq!(payload["title"], "Static title");
251    }
252
253    #[tokio::test]
254    async fn test_map_with_tera_conditionals() {
255        let mut ctx = make_entity_context();
256        ctx.set_var("owner", json!({"name": "Alice", "vip": true}));
257
258        let op = MapOp::from_config(&MapConfig {
259            template: json!({
260                "title": "{% if owner.vip %}VIP: {% endif %}{{ owner.name }}"
261            }),
262        });
263
264        let result = op.execute(&mut ctx).await.unwrap();
265        assert!(matches!(result, OpResult::Continue));
266
267        let payload = ctx.get_var("_payload").unwrap();
268        assert_eq!(payload["title"], "VIP: Alice");
269    }
270
271    #[tokio::test]
272    async fn test_map_with_array_template() {
273        let mut ctx = make_entity_context();
274        ctx.set_var("user", json!({"name": "Alice"}));
275
276        let op = MapOp::from_config(&MapConfig {
277            template: json!(["Hello {{ user.name }}", "static"]),
278        });
279
280        let result = op.execute(&mut ctx).await.unwrap();
281        assert!(matches!(result, OpResult::Continue));
282
283        let payload = ctx.get_var("_payload").unwrap();
284        assert_eq!(payload[0], "Hello Alice");
285        assert_eq!(payload[1], "static");
286    }
287
288    #[tokio::test]
289    async fn test_map_invalid_template() {
290        let mut ctx = make_entity_context();
291
292        let op = MapOp::from_config(&MapConfig {
293            template: json!({
294                "title": "{{ unclosed"
295            }),
296        });
297
298        let result = op.execute(&mut ctx).await;
299        assert!(result.is_err());
300    }
301
302    #[tokio::test]
303    async fn test_map_uses_event_variables() {
304        let mut ctx = make_entity_context();
305
306        // entity_type and entity_id are auto-extracted by FlowContext
307        let op = MapOp::from_config(&MapConfig {
308            template: json!({
309                "message": "New {{ entity_type }} created"
310            }),
311        });
312
313        let result = op.execute(&mut ctx).await.unwrap();
314        assert!(matches!(result, OpResult::Continue));
315
316        let payload = ctx.get_var("_payload").unwrap();
317        assert_eq!(payload["message"], "New user created");
318    }
319}