1use crate::config::events::MapConfig;
22use crate::events::context::FlowContext;
23use crate::events::operators::{OpResult, PipelineOperator};
24use anyhow::{Result, anyhow};
25use async_trait::async_trait;
26use serde_json::Value;
27
28#[derive(Debug, Clone)]
30pub struct MapOp {
31 template: Value,
33}
34
35impl MapOp {
36 pub fn from_config(config: &MapConfig) -> Self {
38 Self {
39 template: config.template.clone(),
40 }
41 }
42}
43
44#[async_trait]
45impl PipelineOperator for MapOp {
46 async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
47 let tera_ctx = build_tera_context(ctx);
48 let rendered = render_value(&self.template, &tera_ctx)?;
49 ctx.set_var("_payload", rendered);
50 Ok(OpResult::Continue)
51 }
52
53 fn name(&self) -> &str {
54 "map"
55 }
56}
57
58fn build_tera_context(ctx: &FlowContext) -> tera::Context {
60 let mut tera_ctx = tera::Context::new();
61 for (key, value) in &ctx.variables {
62 tera_ctx.insert(key, value);
63 }
64 tera_ctx
65}
66
67fn render_value(template: &Value, tera_ctx: &tera::Context) -> Result<Value> {
69 match template {
70 Value::String(s) => {
71 let rendered = tera::Tera::one_off(s, tera_ctx, false)
73 .map_err(|e| anyhow!("map: template rendering failed for '{}': {}", s, e))?;
74 Ok(Value::String(rendered))
75 }
76 Value::Object(map) => {
77 let mut result = serde_json::Map::new();
79 for (key, value) in map {
80 result.insert(key.clone(), render_value(value, tera_ctx)?);
81 }
82 Ok(Value::Object(result))
83 }
84 Value::Array(arr) => {
85 let result: Result<Vec<Value>> =
87 arr.iter().map(|v| render_value(v, tera_ctx)).collect();
88 Ok(Value::Array(result?))
89 }
90 other => Ok(other.clone()),
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98 use crate::config::events::MapConfig;
99 use crate::core::events::{EntityEvent, FrameworkEvent, LinkEvent};
100 use crate::core::service::LinkService;
101 use serde_json::json;
102 use std::collections::HashMap;
103 use std::sync::Arc;
104 use uuid::Uuid;
105
106 struct MockLinkService;
109
110 #[async_trait]
111 impl LinkService for MockLinkService {
112 async fn create(
113 &self,
114 _link: crate::core::link::LinkEntity,
115 ) -> Result<crate::core::link::LinkEntity> {
116 unimplemented!()
117 }
118 async fn get(&self, _id: &Uuid) -> Result<Option<crate::core::link::LinkEntity>> {
119 unimplemented!()
120 }
121 async fn list(&self) -> Result<Vec<crate::core::link::LinkEntity>> {
122 unimplemented!()
123 }
124 async fn find_by_source(
125 &self,
126 _source_id: &Uuid,
127 _link_type: Option<&str>,
128 _target_type: Option<&str>,
129 ) -> Result<Vec<crate::core::link::LinkEntity>> {
130 unimplemented!()
131 }
132 async fn find_by_target(
133 &self,
134 _target_id: &Uuid,
135 _link_type: Option<&str>,
136 _source_type: Option<&str>,
137 ) -> Result<Vec<crate::core::link::LinkEntity>> {
138 unimplemented!()
139 }
140 async fn update(
141 &self,
142 _id: &Uuid,
143 _link: crate::core::link::LinkEntity,
144 ) -> Result<crate::core::link::LinkEntity> {
145 unimplemented!()
146 }
147 async fn delete(&self, _id: &Uuid) -> Result<()> {
148 unimplemented!()
149 }
150 async fn delete_by_entity(&self, _entity_id: &Uuid) -> Result<()> {
151 unimplemented!()
152 }
153 }
154
155 fn mock_link_service() -> Arc<dyn LinkService> {
156 Arc::new(MockLinkService)
157 }
158
159 fn make_link_context() -> FlowContext {
160 let source_id = Uuid::new_v4();
161 let target_id = Uuid::new_v4();
162 let event = FrameworkEvent::Link(LinkEvent::Created {
163 link_type: "follows".to_string(),
164 link_id: Uuid::new_v4(),
165 source_id,
166 target_id,
167 metadata: None,
168 });
169 FlowContext::new(event, mock_link_service(), HashMap::new())
170 }
171
172 fn make_entity_context() -> FlowContext {
173 let event = FrameworkEvent::Entity(EntityEvent::Created {
174 entity_type: "user".to_string(),
175 entity_id: Uuid::new_v4(),
176 data: json!({"name": "Alice"}),
177 });
178 FlowContext::new(event, mock_link_service(), HashMap::new())
179 }
180
181 #[tokio::test]
184 async fn test_map_simple_string() {
185 let mut ctx = make_entity_context();
186 ctx.set_var("owner", json!({"name": "Alice"}));
187
188 let op = MapOp::from_config(&MapConfig {
189 template: json!({
190 "title": "Hello {{ owner.name }}!",
191 "body": "Welcome"
192 }),
193 });
194
195 let result = op.execute(&mut ctx).await.unwrap();
196 assert!(matches!(result, OpResult::Continue));
197
198 let payload = ctx.get_var("_payload").unwrap();
199 assert_eq!(payload["title"], "Hello Alice!");
200 assert_eq!(payload["body"], "Welcome");
201 }
202
203 #[tokio::test]
204 async fn test_map_with_context_variables() {
205 let mut ctx = make_link_context();
206 ctx.set_var("owner", json!({"name": "Alice", "id": "abc-123"}));
207 ctx.set_var("follower", json!({"name": "Bob", "id": "def-456"}));
208
209 let op = MapOp::from_config(&MapConfig {
210 template: json!({
211 "title": "{{ follower.name }} started following {{ owner.name }}",
212 "icon": "follow",
213 "data": {
214 "follower_id": "{{ follower.id }}",
215 "owner_id": "{{ owner.id }}"
216 }
217 }),
218 });
219
220 let result = op.execute(&mut ctx).await.unwrap();
221 assert!(matches!(result, OpResult::Continue));
222
223 let payload = ctx.get_var("_payload").unwrap();
224 assert_eq!(payload["title"], "Bob started following Alice");
225 assert_eq!(payload["icon"], "follow");
226 assert_eq!(payload["data"]["follower_id"], "def-456");
227 assert_eq!(payload["data"]["owner_id"], "abc-123");
228 }
229
230 #[tokio::test]
231 async fn test_map_preserves_non_string_values() {
232 let mut ctx = make_entity_context();
233
234 let op = MapOp::from_config(&MapConfig {
235 template: json!({
236 "count": 42,
237 "active": true,
238 "tags": ["a", "b"],
239 "title": "Static title"
240 }),
241 });
242
243 let result = op.execute(&mut ctx).await.unwrap();
244 assert!(matches!(result, OpResult::Continue));
245
246 let payload = ctx.get_var("_payload").unwrap();
247 assert_eq!(payload["count"], 42);
248 assert_eq!(payload["active"], true);
249 assert_eq!(payload["tags"][0], "a");
250 assert_eq!(payload["title"], "Static title");
251 }
252
253 #[tokio::test]
254 async fn test_map_with_tera_conditionals() {
255 let mut ctx = make_entity_context();
256 ctx.set_var("owner", json!({"name": "Alice", "vip": true}));
257
258 let op = MapOp::from_config(&MapConfig {
259 template: json!({
260 "title": "{% if owner.vip %}VIP: {% endif %}{{ owner.name }}"
261 }),
262 });
263
264 let result = op.execute(&mut ctx).await.unwrap();
265 assert!(matches!(result, OpResult::Continue));
266
267 let payload = ctx.get_var("_payload").unwrap();
268 assert_eq!(payload["title"], "VIP: Alice");
269 }
270
271 #[tokio::test]
272 async fn test_map_with_array_template() {
273 let mut ctx = make_entity_context();
274 ctx.set_var("user", json!({"name": "Alice"}));
275
276 let op = MapOp::from_config(&MapConfig {
277 template: json!(["Hello {{ user.name }}", "static"]),
278 });
279
280 let result = op.execute(&mut ctx).await.unwrap();
281 assert!(matches!(result, OpResult::Continue));
282
283 let payload = ctx.get_var("_payload").unwrap();
284 assert_eq!(payload[0], "Hello Alice");
285 assert_eq!(payload[1], "static");
286 }
287
288 #[tokio::test]
289 async fn test_map_invalid_template() {
290 let mut ctx = make_entity_context();
291
292 let op = MapOp::from_config(&MapConfig {
293 template: json!({
294 "title": "{{ unclosed"
295 }),
296 });
297
298 let result = op.execute(&mut ctx).await;
299 assert!(result.is_err());
300 }
301
302 #[tokio::test]
303 async fn test_map_uses_event_variables() {
304 let mut ctx = make_entity_context();
305
306 let op = MapOp::from_config(&MapConfig {
308 template: json!({
309 "message": "New {{ entity_type }} created"
310 }),
311 });
312
313 let result = op.execute(&mut ctx).await.unwrap();
314 assert!(matches!(result, OpResult::Continue));
315
316 let payload = ctx.get_var("_payload").unwrap();
317 assert_eq!(payload["message"], "New user created");
318 }
319}