this/events/operators/
deliver.rs1use crate::config::events::DeliverConfig;
25use crate::events::context::FlowContext;
26use crate::events::operators::{OpResult, PipelineOperator};
27use anyhow::{Result, anyhow};
28use async_trait::async_trait;
29
30#[derive(Debug, Clone)]
32pub struct DeliverOp {
33 pub sink_names: Vec<String>,
35}
36
37impl DeliverOp {
38 pub fn from_config(config: &DeliverConfig) -> Result<Self> {
40 let names: Vec<String> = config.sink_names().iter().map(|s| s.to_string()).collect();
41 if names.is_empty() {
42 return Err(anyhow!(
43 "deliver: at least one sink must be specified (use 'sink' or 'sinks')"
44 ));
45 }
46 Ok(Self { sink_names: names })
47 }
48}
49
50#[async_trait]
51impl PipelineOperator for DeliverOp {
52 async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
53 let payload = ctx
55 .get_var("_payload")
56 .ok_or_else(|| {
57 anyhow!("deliver: no '_payload' variable in context. Did you forget a 'map' step before 'deliver'?")
58 })?
59 .clone();
60
61 let sinks_json: serde_json::Value = self.sink_names.clone().into();
63 ctx.set_var("_delivered_to", sinks_json);
64
65 let recipient_id = ctx
67 .get_var_str("recipient_id")
68 .or_else(|| ctx.get_var_str("target_id"))
69 .map(|s| s.to_string());
70
71 let context_vars = ctx
73 .variables
74 .iter()
75 .map(|(k, v)| (k.clone(), v.clone()))
76 .collect::<std::collections::HashMap<String, serde_json::Value>>();
77
78 if let Some(registry) = &ctx.sink_registry {
80 for sink_name in &self.sink_names {
81 tracing::debug!(
82 sink = %sink_name,
83 recipient = ?recipient_id,
84 "deliver: dispatching to sink"
85 );
86
87 if let Err(e) = registry
88 .deliver(
89 sink_name,
90 payload.clone(),
91 recipient_id.as_deref(),
92 &context_vars,
93 )
94 .await
95 {
96 tracing::warn!(
97 sink = %sink_name,
98 error = %e,
99 "deliver: sink delivery failed"
100 );
101 }
102 }
103 } else {
104 tracing::debug!(
105 sinks = ?self.sink_names,
106 "deliver: no SinkRegistry available, skipping actual dispatch"
107 );
108 }
109
110 Ok(OpResult::Continue)
111 }
112
113 fn name(&self) -> &str {
114 "deliver"
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121 use crate::config::events::DeliverConfig;
122 use crate::core::events::{EntityEvent, FrameworkEvent};
123 use crate::core::service::LinkService;
124 use serde_json::json;
125 use std::collections::HashMap;
126 use std::sync::Arc;
127 use uuid::Uuid;
128
129 struct MockLinkService;
130
131 #[async_trait]
132 impl LinkService for MockLinkService {
133 async fn create(
134 &self,
135 _link: crate::core::link::LinkEntity,
136 ) -> Result<crate::core::link::LinkEntity> {
137 unimplemented!()
138 }
139 async fn get(&self, _id: &Uuid) -> Result<Option<crate::core::link::LinkEntity>> {
140 unimplemented!()
141 }
142 async fn list(&self) -> Result<Vec<crate::core::link::LinkEntity>> {
143 unimplemented!()
144 }
145 async fn find_by_source(
146 &self,
147 _source_id: &Uuid,
148 _link_type: Option<&str>,
149 _target_type: Option<&str>,
150 ) -> Result<Vec<crate::core::link::LinkEntity>> {
151 unimplemented!()
152 }
153 async fn find_by_target(
154 &self,
155 _target_id: &Uuid,
156 _link_type: Option<&str>,
157 _source_type: Option<&str>,
158 ) -> Result<Vec<crate::core::link::LinkEntity>> {
159 unimplemented!()
160 }
161 async fn update(
162 &self,
163 _id: &Uuid,
164 _link: crate::core::link::LinkEntity,
165 ) -> Result<crate::core::link::LinkEntity> {
166 unimplemented!()
167 }
168 async fn delete(&self, _id: &Uuid) -> Result<()> {
169 unimplemented!()
170 }
171 async fn delete_by_entity(&self, _entity_id: &Uuid) -> Result<()> {
172 unimplemented!()
173 }
174 }
175
176 fn make_context() -> FlowContext {
177 let event = FrameworkEvent::Entity(EntityEvent::Created {
178 entity_type: "user".to_string(),
179 entity_id: Uuid::new_v4(),
180 data: json!({}),
181 });
182 FlowContext::new(
183 event,
184 Arc::new(MockLinkService) as Arc<dyn LinkService>,
185 HashMap::new(),
186 )
187 }
188
189 #[tokio::test]
190 async fn test_deliver_single_sink() {
191 let mut ctx = make_context();
192 ctx.set_var("_payload", json!({"title": "Hello"}));
193
194 let op = DeliverOp::from_config(&DeliverConfig {
195 sink: Some("in_app".to_string()),
196 sinks: None,
197 })
198 .unwrap();
199
200 let result = op.execute(&mut ctx).await.unwrap();
201 assert!(matches!(result, OpResult::Continue));
202 assert_eq!(ctx.get_var("_delivered_to"), Some(&json!(["in_app"])));
203 }
204
205 #[tokio::test]
206 async fn test_deliver_multiple_sinks() {
207 let mut ctx = make_context();
208 ctx.set_var("_payload", json!({"title": "Hello"}));
209
210 let op = DeliverOp::from_config(&DeliverConfig {
211 sink: None,
212 sinks: Some(vec!["in_app".to_string(), "push".to_string()]),
213 })
214 .unwrap();
215
216 let result = op.execute(&mut ctx).await.unwrap();
217 assert!(matches!(result, OpResult::Continue));
218 assert_eq!(
219 ctx.get_var("_delivered_to"),
220 Some(&json!(["in_app", "push"]))
221 );
222 }
223
224 #[tokio::test]
225 async fn test_deliver_no_payload_error() {
226 let mut ctx = make_context();
227 let op = DeliverOp::from_config(&DeliverConfig {
230 sink: Some("in_app".to_string()),
231 sinks: None,
232 })
233 .unwrap();
234
235 let result = op.execute(&mut ctx).await;
236 assert!(result.is_err());
237 assert!(result.unwrap_err().to_string().contains("_payload"));
238 }
239
240 #[test]
241 fn test_deliver_no_sink_error() {
242 let result = DeliverOp::from_config(&DeliverConfig {
243 sink: None,
244 sinks: None,
245 });
246 assert!(result.is_err());
247 }
248
249 #[test]
250 fn test_deliver_empty_sinks_error() {
251 let result = DeliverOp::from_config(&DeliverConfig {
252 sink: None,
253 sinks: Some(vec![]),
254 });
255 assert!(result.is_err());
256 }
257}