Skip to main content

this/events/operators/
fan_out.rs

1//! Fan-out operator — multiplies events across linked entities
2//!
3//! For each link found (e.g., all followers of a user), the fan-out operator
4//! clones the FlowContext and injects the linked entity into a named variable.
5//! This turns 1 event into N events (one per linked entity).
6//!
7//! ```yaml
8//! - fan_out:
9//!     from: target_id
10//!     via: follows
11//!     direction: reverse
12//!     as: follower
13//! ```
14
15use crate::config::events::FanOutConfig;
16use crate::events::context::FlowContext;
17use crate::events::operators::{OpResult, PipelineOperator};
18use anyhow::{Result, anyhow};
19use async_trait::async_trait;
20use serde_json::Value;
21use uuid::Uuid;
22
23/// Compiled fan-out operator
24#[derive(Debug, Clone)]
25pub struct FanOutOp {
26    /// Field name in context to read the entity ID from
27    pub from: String,
28
29    /// Link type to follow
30    pub via: String,
31
32    /// Direction: "forward" or "reverse"
33    pub direction: String,
34
35    /// Variable name for each iterated entity
36    pub output_var: String,
37}
38
39impl FanOutOp {
40    /// Create a FanOutOp from a FanOutConfig
41    pub fn from_config(config: &FanOutConfig) -> Self {
42        Self {
43            from: config.from.clone(),
44            via: config.via.clone(),
45            direction: config.direction.clone(),
46            output_var: config.output_var.clone(),
47        }
48    }
49}
50
51#[async_trait]
52impl PipelineOperator for FanOutOp {
53    async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
54        // Read the source ID from context
55        let from_value = ctx
56            .get_var(&self.from)
57            .ok_or_else(|| anyhow!("fan_out: variable '{}' not found in context", self.from))?
58            .clone();
59
60        let from_id = parse_uuid(&from_value, &self.from)?;
61
62        // Find all links
63        let links = match self.direction.as_str() {
64            "forward" => {
65                ctx.link_service
66                    .find_by_source(&from_id, Some(&self.via), None)
67                    .await?
68            }
69            "reverse" => {
70                ctx.link_service
71                    .find_by_target(&from_id, Some(&self.via), None)
72                    .await?
73            }
74            other => {
75                return Err(anyhow!(
76                    "fan_out: invalid direction '{}', expected 'forward' or 'reverse'",
77                    other
78                ));
79            }
80        };
81
82        if links.is_empty() {
83            return Ok(OpResult::Drop);
84        }
85
86        // For each link, clone the context and inject the linked entity ID
87        let mut contexts = Vec::with_capacity(links.len());
88        for link in &links {
89            let mut new_ctx = ctx.clone();
90            let entity_id = match self.direction.as_str() {
91                "forward" => link.target_id,
92                _ => link.source_id,
93            };
94
95            // Try to fetch the entity data via entity fetchers
96            let entity_value = fetch_entity(&new_ctx, &entity_id).await;
97            match entity_value {
98                Some(data) => {
99                    new_ctx.set_var(&self.output_var, data);
100                }
101                None => {
102                    // If no fetcher found, store just the ID
103                    new_ctx.set_var(&self.output_var, Value::String(entity_id.to_string()));
104                }
105            }
106
107            // Always set the entity ID as a sub-variable for convenience
108            new_ctx.set_var(
109                format!("{}_id", self.output_var),
110                Value::String(entity_id.to_string()),
111            );
112
113            contexts.push(new_ctx);
114        }
115
116        Ok(OpResult::FanOut(contexts))
117    }
118
119    fn name(&self) -> &str {
120        "fan_out"
121    }
122}
123
124/// Try to fetch an entity by ID from any registered fetcher
125async fn fetch_entity(ctx: &FlowContext, id: &Uuid) -> Option<Value> {
126    for fetcher in ctx.entity_fetchers.values() {
127        if let Ok(entity) = fetcher.fetch_as_json(id).await {
128            return Some(entity);
129        }
130    }
131    None
132}
133
134/// Parse a UUID from a serde_json::Value
135fn parse_uuid(value: &Value, field_name: &str) -> Result<Uuid> {
136    match value {
137        Value::String(s) => Uuid::parse_str(s)
138            .map_err(|e| anyhow!("fan_out: '{}' is not a valid UUID: {}", field_name, e)),
139        _ => Err(anyhow!(
140            "fan_out: '{}' expected a string UUID, got {:?}",
141            field_name,
142            value
143        )),
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::config::events::FanOutConfig;
151    use crate::core::events::{FrameworkEvent, LinkEvent};
152    use crate::core::link::LinkEntity;
153    use crate::core::module::EntityFetcher;
154    use crate::core::service::LinkService;
155    use serde_json::json;
156    use std::collections::HashMap;
157    use std::sync::Arc;
158
159    // ── Mocks ────────────────────────────────────────────────────────
160
161    struct MockLinkService {
162        links: Vec<LinkEntity>,
163    }
164
165    #[async_trait]
166    impl LinkService for MockLinkService {
167        async fn create(&self, _link: LinkEntity) -> Result<LinkEntity> {
168            unimplemented!()
169        }
170        async fn get(&self, _id: &Uuid) -> Result<Option<LinkEntity>> {
171            unimplemented!()
172        }
173        async fn list(&self) -> Result<Vec<LinkEntity>> {
174            Ok(self.links.clone())
175        }
176        async fn find_by_source(
177            &self,
178            source_id: &Uuid,
179            link_type: Option<&str>,
180            _target_type: Option<&str>,
181        ) -> Result<Vec<LinkEntity>> {
182            Ok(self
183                .links
184                .iter()
185                .filter(|l| {
186                    l.source_id == *source_id && link_type.is_none_or(|lt| l.link_type == lt)
187                })
188                .cloned()
189                .collect())
190        }
191        async fn find_by_target(
192            &self,
193            target_id: &Uuid,
194            link_type: Option<&str>,
195            _source_type: Option<&str>,
196        ) -> Result<Vec<LinkEntity>> {
197            Ok(self
198                .links
199                .iter()
200                .filter(|l| {
201                    l.target_id == *target_id && link_type.is_none_or(|lt| l.link_type == lt)
202                })
203                .cloned()
204                .collect())
205        }
206        async fn update(&self, _id: &Uuid, _link: LinkEntity) -> Result<LinkEntity> {
207            unimplemented!()
208        }
209        async fn delete(&self, _id: &Uuid) -> Result<()> {
210            unimplemented!()
211        }
212        async fn delete_by_entity(&self, _entity_id: &Uuid) -> Result<()> {
213            unimplemented!()
214        }
215    }
216
217    struct MockEntityFetcher {
218        entities: HashMap<Uuid, Value>,
219    }
220
221    #[async_trait]
222    impl EntityFetcher for MockEntityFetcher {
223        async fn fetch_as_json(&self, entity_id: &Uuid) -> Result<Value> {
224            self.entities
225                .get(entity_id)
226                .cloned()
227                .ok_or_else(|| anyhow!("not found"))
228        }
229    }
230
231    fn make_link(source_id: Uuid, target_id: Uuid) -> LinkEntity {
232        LinkEntity {
233            id: Uuid::new_v4(),
234            entity_type: "link".to_string(),
235            created_at: chrono::Utc::now(),
236            updated_at: chrono::Utc::now(),
237            deleted_at: None,
238            status: "active".to_string(),
239            tenant_id: None,
240            link_type: "follows".to_string(),
241            source_id,
242            target_id,
243            metadata: None,
244        }
245    }
246
247    fn make_context(
248        target_id: Uuid,
249        link_service: Arc<dyn LinkService>,
250        entity_fetchers: HashMap<String, Arc<dyn EntityFetcher>>,
251    ) -> FlowContext {
252        let event = FrameworkEvent::Link(LinkEvent::Created {
253            link_type: "follows".to_string(),
254            link_id: Uuid::new_v4(),
255            source_id: Uuid::new_v4(),
256            target_id,
257            metadata: None,
258        });
259        FlowContext::new(event, link_service, entity_fetchers)
260    }
261
262    // ── Tests ────────────────────────────────────────────────────────
263
264    #[tokio::test]
265    async fn test_fan_out_zero_followers_drops() {
266        let target_id = Uuid::new_v4();
267        let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
268        let mut ctx = make_context(target_id, link_service, HashMap::new());
269
270        let op = FanOutOp::from_config(&FanOutConfig {
271            from: "target_id".to_string(),
272            via: "follows".to_string(),
273            direction: "reverse".to_string(),
274            output_var: "follower".to_string(),
275        });
276
277        let result = op.execute(&mut ctx).await.unwrap();
278        assert!(matches!(result, OpResult::Drop));
279    }
280
281    #[tokio::test]
282    async fn test_fan_out_one_follower() {
283        let target_id = Uuid::new_v4();
284        let follower_id = Uuid::new_v4();
285
286        let links = vec![make_link(follower_id, target_id)];
287        let link_service = Arc::new(MockLinkService { links }) as Arc<dyn LinkService>;
288
289        let mut entities = HashMap::new();
290        entities.insert(follower_id, json!({"name": "Alice"}));
291        let fetcher = Arc::new(MockEntityFetcher { entities }) as Arc<dyn EntityFetcher>;
292        let mut fetchers = HashMap::new();
293        fetchers.insert("user".to_string(), fetcher);
294
295        let mut ctx = make_context(target_id, link_service, fetchers);
296
297        let op = FanOutOp::from_config(&FanOutConfig {
298            from: "target_id".to_string(),
299            via: "follows".to_string(),
300            direction: "reverse".to_string(),
301            output_var: "follower".to_string(),
302        });
303
304        let result = op.execute(&mut ctx).await.unwrap();
305        match result {
306            OpResult::FanOut(contexts) => {
307                assert_eq!(contexts.len(), 1);
308                assert_eq!(
309                    contexts[0].get_var("follower"),
310                    Some(&json!({"name": "Alice"}))
311                );
312                assert!(contexts[0].get_var("follower_id").is_some());
313            }
314            other => panic!("expected FanOut, got {:?}", other),
315        }
316    }
317
318    #[tokio::test]
319    async fn test_fan_out_five_followers() {
320        let target_id = Uuid::new_v4();
321        let follower_ids: Vec<Uuid> = (0..5).map(|_| Uuid::new_v4()).collect();
322
323        let links: Vec<LinkEntity> = follower_ids
324            .iter()
325            .map(|fid| make_link(*fid, target_id))
326            .collect();
327        let link_service = Arc::new(MockLinkService { links }) as Arc<dyn LinkService>;
328
329        // No entity fetchers — should still work with just IDs
330        let mut ctx = make_context(target_id, link_service, HashMap::new());
331
332        let op = FanOutOp::from_config(&FanOutConfig {
333            from: "target_id".to_string(),
334            via: "follows".to_string(),
335            direction: "reverse".to_string(),
336            output_var: "follower".to_string(),
337        });
338
339        let result = op.execute(&mut ctx).await.unwrap();
340        match result {
341            OpResult::FanOut(contexts) => {
342                assert_eq!(contexts.len(), 5);
343                // Each context should have a follower_id
344                for fctx in &contexts {
345                    assert!(fctx.get_var("follower_id").is_some());
346                    // Without fetcher, follower is stored as ID string
347                    assert!(fctx.get_var("follower").is_some());
348                }
349            }
350            other => panic!("expected FanOut, got {:?}", other),
351        }
352    }
353
354    #[tokio::test]
355    async fn test_fan_out_forward_direction() {
356        let source_id = Uuid::new_v4();
357        let target_ids: Vec<Uuid> = (0..3).map(|_| Uuid::new_v4()).collect();
358
359        let links: Vec<LinkEntity> = target_ids
360            .iter()
361            .map(|tid| make_link(source_id, *tid))
362            .collect();
363        let link_service = Arc::new(MockLinkService { links }) as Arc<dyn LinkService>;
364
365        let event = FrameworkEvent::Link(LinkEvent::Created {
366            link_type: "follows".to_string(),
367            link_id: Uuid::new_v4(),
368            source_id,
369            target_id: target_ids[0],
370            metadata: None,
371        });
372        let mut ctx = FlowContext::new(event, link_service, HashMap::new());
373
374        let op = FanOutOp::from_config(&FanOutConfig {
375            from: "source_id".to_string(),
376            via: "follows".to_string(),
377            direction: "forward".to_string(),
378            output_var: "followed".to_string(),
379        });
380
381        let result = op.execute(&mut ctx).await.unwrap();
382        match result {
383            OpResult::FanOut(contexts) => {
384                assert_eq!(contexts.len(), 3);
385            }
386            other => panic!("expected FanOut, got {:?}", other),
387        }
388    }
389
390    #[tokio::test]
391    async fn test_fan_out_missing_variable() {
392        let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
393        let mut ctx = make_context(Uuid::new_v4(), link_service, HashMap::new());
394
395        let op = FanOutOp::from_config(&FanOutConfig {
396            from: "nonexistent".to_string(),
397            via: "follows".to_string(),
398            direction: "reverse".to_string(),
399            output_var: "follower".to_string(),
400        });
401
402        let result = op.execute(&mut ctx).await;
403        assert!(result.is_err());
404    }
405}