Skip to main content

this/events/operators/
deliver.rs

1//! Deliver operator — sends the payload to configured sinks
2//!
3//! The deliver operator is the terminal step of a pipeline. It reads the
4//! `_payload` variable from the FlowContext (set by the `map` operator)
5//! and dispatches it to one or more sinks.
6//!
7//! ```yaml
8//! - deliver:
9//!     sink: in_app
10//! # or
11//! - deliver:
12//!     sinks: [in_app, push]
13//! ```
14//!
15//! # Sink Registry
16//!
17//! Actual sink implementations are registered at runtime via the
18//! `SinkRegistry` (see Plan 3). The deliver operator looks up sinks
19//! by name and calls their `send()` method.
20//!
21//! For now, this is a structural placeholder that validates the
22//! configuration and records which sinks should be called.
23
24use crate::config::events::DeliverConfig;
25use crate::events::context::FlowContext;
26use crate::events::operators::{OpResult, PipelineOperator};
27use anyhow::{Result, anyhow};
28use async_trait::async_trait;
29
30/// Compiled deliver operator
31#[derive(Debug, Clone)]
32pub struct DeliverOp {
33    /// Sink names to deliver to
34    pub sink_names: Vec<String>,
35}
36
37impl DeliverOp {
38    /// Create a DeliverOp from a DeliverConfig
39    pub fn from_config(config: &DeliverConfig) -> Result<Self> {
40        let names: Vec<String> = config.sink_names().iter().map(|s| s.to_string()).collect();
41        if names.is_empty() {
42            return Err(anyhow!(
43                "deliver: at least one sink must be specified (use 'sink' or 'sinks')"
44            ));
45        }
46        Ok(Self { sink_names: names })
47    }
48}
49
50#[async_trait]
51impl PipelineOperator for DeliverOp {
52    async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
53        // Verify that _payload exists
54        let payload = ctx
55            .get_var("_payload")
56            .ok_or_else(|| {
57                anyhow!("deliver: no '_payload' variable in context. Did you forget a 'map' step before 'deliver'?")
58            })?
59            .clone();
60
61        // Record which sinks were targeted (for debugging/tracing)
62        let sinks_json: serde_json::Value = self.sink_names.clone().into();
63        ctx.set_var("_delivered_to", sinks_json);
64
65        // Extract recipient_id from context variables (set by resolve/fan_out)
66        let recipient_id = ctx
67            .get_var_str("recipient_id")
68            .or_else(|| ctx.get_var_str("target_id"))
69            .map(|s| s.to_string());
70
71        // Build context vars for sinks (all pipeline variables)
72        let context_vars = ctx
73            .variables
74            .iter()
75            .map(|(k, v)| (k.clone(), v.clone()))
76            .collect::<std::collections::HashMap<String, serde_json::Value>>();
77
78        // Dispatch to sinks via SinkRegistry (if available)
79        if let Some(registry) = &ctx.sink_registry {
80            for sink_name in &self.sink_names {
81                tracing::debug!(
82                    sink = %sink_name,
83                    recipient = ?recipient_id,
84                    "deliver: dispatching to sink"
85                );
86
87                if let Err(e) = registry
88                    .deliver(
89                        sink_name,
90                        payload.clone(),
91                        recipient_id.as_deref(),
92                        &context_vars,
93                    )
94                    .await
95                {
96                    tracing::warn!(
97                        sink = %sink_name,
98                        error = %e,
99                        "deliver: sink delivery failed"
100                    );
101                }
102            }
103        } else {
104            tracing::debug!(
105                sinks = ?self.sink_names,
106                "deliver: no SinkRegistry available, skipping actual dispatch"
107            );
108        }
109
110        Ok(OpResult::Continue)
111    }
112
113    fn name(&self) -> &str {
114        "deliver"
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121    use crate::config::events::DeliverConfig;
122    use crate::core::events::{EntityEvent, FrameworkEvent};
123    use crate::core::service::LinkService;
124    use serde_json::json;
125    use std::collections::HashMap;
126    use std::sync::Arc;
127    use uuid::Uuid;
128
129    struct MockLinkService;
130
131    #[async_trait]
132    impl LinkService for MockLinkService {
133        async fn create(
134            &self,
135            _link: crate::core::link::LinkEntity,
136        ) -> Result<crate::core::link::LinkEntity> {
137            unimplemented!()
138        }
139        async fn get(&self, _id: &Uuid) -> Result<Option<crate::core::link::LinkEntity>> {
140            unimplemented!()
141        }
142        async fn list(&self) -> Result<Vec<crate::core::link::LinkEntity>> {
143            unimplemented!()
144        }
145        async fn find_by_source(
146            &self,
147            _source_id: &Uuid,
148            _link_type: Option<&str>,
149            _target_type: Option<&str>,
150        ) -> Result<Vec<crate::core::link::LinkEntity>> {
151            unimplemented!()
152        }
153        async fn find_by_target(
154            &self,
155            _target_id: &Uuid,
156            _link_type: Option<&str>,
157            _source_type: Option<&str>,
158        ) -> Result<Vec<crate::core::link::LinkEntity>> {
159            unimplemented!()
160        }
161        async fn update(
162            &self,
163            _id: &Uuid,
164            _link: crate::core::link::LinkEntity,
165        ) -> Result<crate::core::link::LinkEntity> {
166            unimplemented!()
167        }
168        async fn delete(&self, _id: &Uuid) -> Result<()> {
169            unimplemented!()
170        }
171        async fn delete_by_entity(&self, _entity_id: &Uuid) -> Result<()> {
172            unimplemented!()
173        }
174    }
175
176    fn make_context() -> FlowContext {
177        let event = FrameworkEvent::Entity(EntityEvent::Created {
178            entity_type: "user".to_string(),
179            entity_id: Uuid::new_v4(),
180            data: json!({}),
181        });
182        FlowContext::new(
183            event,
184            Arc::new(MockLinkService) as Arc<dyn LinkService>,
185            HashMap::new(),
186        )
187    }
188
189    #[tokio::test]
190    async fn test_deliver_single_sink() {
191        let mut ctx = make_context();
192        ctx.set_var("_payload", json!({"title": "Hello"}));
193
194        let op = DeliverOp::from_config(&DeliverConfig {
195            sink: Some("in_app".to_string()),
196            sinks: None,
197        })
198        .unwrap();
199
200        let result = op.execute(&mut ctx).await.unwrap();
201        assert!(matches!(result, OpResult::Continue));
202        assert_eq!(ctx.get_var("_delivered_to"), Some(&json!(["in_app"])));
203    }
204
205    #[tokio::test]
206    async fn test_deliver_multiple_sinks() {
207        let mut ctx = make_context();
208        ctx.set_var("_payload", json!({"title": "Hello"}));
209
210        let op = DeliverOp::from_config(&DeliverConfig {
211            sink: None,
212            sinks: Some(vec!["in_app".to_string(), "push".to_string()]),
213        })
214        .unwrap();
215
216        let result = op.execute(&mut ctx).await.unwrap();
217        assert!(matches!(result, OpResult::Continue));
218        assert_eq!(
219            ctx.get_var("_delivered_to"),
220            Some(&json!(["in_app", "push"]))
221        );
222    }
223
224    #[tokio::test]
225    async fn test_deliver_no_payload_error() {
226        let mut ctx = make_context();
227        // No _payload set
228
229        let op = DeliverOp::from_config(&DeliverConfig {
230            sink: Some("in_app".to_string()),
231            sinks: None,
232        })
233        .unwrap();
234
235        let result = op.execute(&mut ctx).await;
236        assert!(result.is_err());
237        assert!(result.unwrap_err().to_string().contains("_payload"));
238    }
239
240    #[test]
241    fn test_deliver_no_sink_error() {
242        let result = DeliverOp::from_config(&DeliverConfig {
243            sink: None,
244            sinks: None,
245        });
246        assert!(result.is_err());
247    }
248
249    #[test]
250    fn test_deliver_empty_sinks_error() {
251        let result = DeliverOp::from_config(&DeliverConfig {
252            sink: None,
253            sinks: Some(vec![]),
254        });
255        assert!(result.is_err());
256    }
257}