1use crate::config::events::FanOutConfig;
16use crate::events::context::FlowContext;
17use crate::events::operators::{OpResult, PipelineOperator};
18use anyhow::{Result, anyhow};
19use async_trait::async_trait;
20use serde_json::Value;
21use uuid::Uuid;
22
23#[derive(Debug, Clone)]
25pub struct FanOutOp {
26 pub from: String,
28
29 pub via: String,
31
32 pub direction: String,
34
35 pub output_var: String,
37}
38
39impl FanOutOp {
40 pub fn from_config(config: &FanOutConfig) -> Self {
42 Self {
43 from: config.from.clone(),
44 via: config.via.clone(),
45 direction: config.direction.clone(),
46 output_var: config.output_var.clone(),
47 }
48 }
49}
50
51#[async_trait]
52impl PipelineOperator for FanOutOp {
53 async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
54 let from_value = ctx
56 .get_var(&self.from)
57 .ok_or_else(|| anyhow!("fan_out: variable '{}' not found in context", self.from))?
58 .clone();
59
60 let from_id = parse_uuid(&from_value, &self.from)?;
61
62 let links = match self.direction.as_str() {
64 "forward" => {
65 ctx.link_service
66 .find_by_source(&from_id, Some(&self.via), None)
67 .await?
68 }
69 "reverse" => {
70 ctx.link_service
71 .find_by_target(&from_id, Some(&self.via), None)
72 .await?
73 }
74 other => {
75 return Err(anyhow!(
76 "fan_out: invalid direction '{}', expected 'forward' or 'reverse'",
77 other
78 ));
79 }
80 };
81
82 if links.is_empty() {
83 return Ok(OpResult::Drop);
84 }
85
86 let mut contexts = Vec::with_capacity(links.len());
88 for link in &links {
89 let mut new_ctx = ctx.clone();
90 let entity_id = match self.direction.as_str() {
91 "forward" => link.target_id,
92 _ => link.source_id,
93 };
94
95 let entity_value = fetch_entity(&new_ctx, &entity_id).await;
97 match entity_value {
98 Some(data) => {
99 new_ctx.set_var(&self.output_var, data);
100 }
101 None => {
102 new_ctx.set_var(&self.output_var, Value::String(entity_id.to_string()));
104 }
105 }
106
107 new_ctx.set_var(
109 format!("{}_id", self.output_var),
110 Value::String(entity_id.to_string()),
111 );
112
113 contexts.push(new_ctx);
114 }
115
116 Ok(OpResult::FanOut(contexts))
117 }
118
119 fn name(&self) -> &str {
120 "fan_out"
121 }
122}
123
124async fn fetch_entity(ctx: &FlowContext, id: &Uuid) -> Option<Value> {
126 for fetcher in ctx.entity_fetchers.values() {
127 if let Ok(entity) = fetcher.fetch_as_json(id).await {
128 return Some(entity);
129 }
130 }
131 None
132}
133
134fn parse_uuid(value: &Value, field_name: &str) -> Result<Uuid> {
136 match value {
137 Value::String(s) => Uuid::parse_str(s)
138 .map_err(|e| anyhow!("fan_out: '{}' is not a valid UUID: {}", field_name, e)),
139 _ => Err(anyhow!(
140 "fan_out: '{}' expected a string UUID, got {:?}",
141 field_name,
142 value
143 )),
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::config::events::FanOutConfig;
151 use crate::core::events::{FrameworkEvent, LinkEvent};
152 use crate::core::link::LinkEntity;
153 use crate::core::module::EntityFetcher;
154 use crate::core::service::LinkService;
155 use serde_json::json;
156 use std::collections::HashMap;
157 use std::sync::Arc;
158
159 struct MockLinkService {
162 links: Vec<LinkEntity>,
163 }
164
165 #[async_trait]
166 impl LinkService for MockLinkService {
167 async fn create(&self, _link: LinkEntity) -> Result<LinkEntity> {
168 unimplemented!()
169 }
170 async fn get(&self, _id: &Uuid) -> Result<Option<LinkEntity>> {
171 unimplemented!()
172 }
173 async fn list(&self) -> Result<Vec<LinkEntity>> {
174 Ok(self.links.clone())
175 }
176 async fn find_by_source(
177 &self,
178 source_id: &Uuid,
179 link_type: Option<&str>,
180 _target_type: Option<&str>,
181 ) -> Result<Vec<LinkEntity>> {
182 Ok(self
183 .links
184 .iter()
185 .filter(|l| {
186 l.source_id == *source_id && link_type.is_none_or(|lt| l.link_type == lt)
187 })
188 .cloned()
189 .collect())
190 }
191 async fn find_by_target(
192 &self,
193 target_id: &Uuid,
194 link_type: Option<&str>,
195 _source_type: Option<&str>,
196 ) -> Result<Vec<LinkEntity>> {
197 Ok(self
198 .links
199 .iter()
200 .filter(|l| {
201 l.target_id == *target_id && link_type.is_none_or(|lt| l.link_type == lt)
202 })
203 .cloned()
204 .collect())
205 }
206 async fn update(&self, _id: &Uuid, _link: LinkEntity) -> Result<LinkEntity> {
207 unimplemented!()
208 }
209 async fn delete(&self, _id: &Uuid) -> Result<()> {
210 unimplemented!()
211 }
212 async fn delete_by_entity(&self, _entity_id: &Uuid) -> Result<()> {
213 unimplemented!()
214 }
215 }
216
217 struct MockEntityFetcher {
218 entities: HashMap<Uuid, Value>,
219 }
220
221 #[async_trait]
222 impl EntityFetcher for MockEntityFetcher {
223 async fn fetch_as_json(&self, entity_id: &Uuid) -> Result<Value> {
224 self.entities
225 .get(entity_id)
226 .cloned()
227 .ok_or_else(|| anyhow!("not found"))
228 }
229 }
230
231 fn make_link(source_id: Uuid, target_id: Uuid) -> LinkEntity {
232 LinkEntity {
233 id: Uuid::new_v4(),
234 entity_type: "link".to_string(),
235 created_at: chrono::Utc::now(),
236 updated_at: chrono::Utc::now(),
237 deleted_at: None,
238 status: "active".to_string(),
239 tenant_id: None,
240 link_type: "follows".to_string(),
241 source_id,
242 target_id,
243 metadata: None,
244 }
245 }
246
247 fn make_context(
248 target_id: Uuid,
249 link_service: Arc<dyn LinkService>,
250 entity_fetchers: HashMap<String, Arc<dyn EntityFetcher>>,
251 ) -> FlowContext {
252 let event = FrameworkEvent::Link(LinkEvent::Created {
253 link_type: "follows".to_string(),
254 link_id: Uuid::new_v4(),
255 source_id: Uuid::new_v4(),
256 target_id,
257 metadata: None,
258 });
259 FlowContext::new(event, link_service, entity_fetchers)
260 }
261
262 #[tokio::test]
265 async fn test_fan_out_zero_followers_drops() {
266 let target_id = Uuid::new_v4();
267 let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
268 let mut ctx = make_context(target_id, link_service, HashMap::new());
269
270 let op = FanOutOp::from_config(&FanOutConfig {
271 from: "target_id".to_string(),
272 via: "follows".to_string(),
273 direction: "reverse".to_string(),
274 output_var: "follower".to_string(),
275 });
276
277 let result = op.execute(&mut ctx).await.unwrap();
278 assert!(matches!(result, OpResult::Drop));
279 }
280
281 #[tokio::test]
282 async fn test_fan_out_one_follower() {
283 let target_id = Uuid::new_v4();
284 let follower_id = Uuid::new_v4();
285
286 let links = vec![make_link(follower_id, target_id)];
287 let link_service = Arc::new(MockLinkService { links }) as Arc<dyn LinkService>;
288
289 let mut entities = HashMap::new();
290 entities.insert(follower_id, json!({"name": "Alice"}));
291 let fetcher = Arc::new(MockEntityFetcher { entities }) as Arc<dyn EntityFetcher>;
292 let mut fetchers = HashMap::new();
293 fetchers.insert("user".to_string(), fetcher);
294
295 let mut ctx = make_context(target_id, link_service, fetchers);
296
297 let op = FanOutOp::from_config(&FanOutConfig {
298 from: "target_id".to_string(),
299 via: "follows".to_string(),
300 direction: "reverse".to_string(),
301 output_var: "follower".to_string(),
302 });
303
304 let result = op.execute(&mut ctx).await.unwrap();
305 match result {
306 OpResult::FanOut(contexts) => {
307 assert_eq!(contexts.len(), 1);
308 assert_eq!(
309 contexts[0].get_var("follower"),
310 Some(&json!({"name": "Alice"}))
311 );
312 assert!(contexts[0].get_var("follower_id").is_some());
313 }
314 other => panic!("expected FanOut, got {:?}", other),
315 }
316 }
317
318 #[tokio::test]
319 async fn test_fan_out_five_followers() {
320 let target_id = Uuid::new_v4();
321 let follower_ids: Vec<Uuid> = (0..5).map(|_| Uuid::new_v4()).collect();
322
323 let links: Vec<LinkEntity> = follower_ids
324 .iter()
325 .map(|fid| make_link(*fid, target_id))
326 .collect();
327 let link_service = Arc::new(MockLinkService { links }) as Arc<dyn LinkService>;
328
329 let mut ctx = make_context(target_id, link_service, HashMap::new());
331
332 let op = FanOutOp::from_config(&FanOutConfig {
333 from: "target_id".to_string(),
334 via: "follows".to_string(),
335 direction: "reverse".to_string(),
336 output_var: "follower".to_string(),
337 });
338
339 let result = op.execute(&mut ctx).await.unwrap();
340 match result {
341 OpResult::FanOut(contexts) => {
342 assert_eq!(contexts.len(), 5);
343 for fctx in &contexts {
345 assert!(fctx.get_var("follower_id").is_some());
346 assert!(fctx.get_var("follower").is_some());
348 }
349 }
350 other => panic!("expected FanOut, got {:?}", other),
351 }
352 }
353
354 #[tokio::test]
355 async fn test_fan_out_forward_direction() {
356 let source_id = Uuid::new_v4();
357 let target_ids: Vec<Uuid> = (0..3).map(|_| Uuid::new_v4()).collect();
358
359 let links: Vec<LinkEntity> = target_ids
360 .iter()
361 .map(|tid| make_link(source_id, *tid))
362 .collect();
363 let link_service = Arc::new(MockLinkService { links }) as Arc<dyn LinkService>;
364
365 let event = FrameworkEvent::Link(LinkEvent::Created {
366 link_type: "follows".to_string(),
367 link_id: Uuid::new_v4(),
368 source_id,
369 target_id: target_ids[0],
370 metadata: None,
371 });
372 let mut ctx = FlowContext::new(event, link_service, HashMap::new());
373
374 let op = FanOutOp::from_config(&FanOutConfig {
375 from: "source_id".to_string(),
376 via: "follows".to_string(),
377 direction: "forward".to_string(),
378 output_var: "followed".to_string(),
379 });
380
381 let result = op.execute(&mut ctx).await.unwrap();
382 match result {
383 OpResult::FanOut(contexts) => {
384 assert_eq!(contexts.len(), 3);
385 }
386 other => panic!("expected FanOut, got {:?}", other),
387 }
388 }
389
390 #[tokio::test]
391 async fn test_fan_out_missing_variable() {
392 let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
393 let mut ctx = make_context(Uuid::new_v4(), link_service, HashMap::new());
394
395 let op = FanOutOp::from_config(&FanOutConfig {
396 from: "nonexistent".to_string(),
397 via: "follows".to_string(),
398 direction: "reverse".to_string(),
399 output_var: "follower".to_string(),
400 });
401
402 let result = op.execute(&mut ctx).await;
403 assert!(result.is_err());
404 }
405}