1use crate::config::events::ResolveConfig;
23use crate::events::context::FlowContext;
24use crate::events::operators::{OpResult, PipelineOperator};
25use anyhow::{Result, anyhow};
26use async_trait::async_trait;
27use serde_json::Value;
28use uuid::Uuid;
29
30#[derive(Debug, Clone)]
32pub struct ResolveOp {
33 pub from: String,
35
36 pub via: Option<String>,
38
39 pub direction: String,
41
42 pub output_var: String,
44}
45
46impl ResolveOp {
47 pub fn from_config(config: &ResolveConfig) -> Self {
49 Self {
50 from: config.from.clone(),
51 via: config.via.clone(),
52 direction: config.direction.clone(),
53 output_var: config.output_var.clone(),
54 }
55 }
56}
57
58#[async_trait]
59impl PipelineOperator for ResolveOp {
60 async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
61 let from_value = ctx
63 .get_var(&self.from)
64 .ok_or_else(|| anyhow!("resolve: variable '{}' not found in context", self.from))?
65 .clone();
66
67 let from_id = parse_uuid(&from_value, &self.from)?;
68
69 match &self.via {
70 None => {
71 let entity = self.fetch_entity_by_id(ctx, &from_id).await?;
77 ctx.set_var(&self.output_var, entity);
78 }
79 Some(link_type) => {
80 let target_id = self.follow_link(ctx, &from_id, link_type).await?;
82 let entity = self.fetch_entity_by_id(ctx, &target_id).await?;
83 ctx.set_var(&self.output_var, entity);
84 }
85 }
86
87 Ok(OpResult::Continue)
88 }
89
90 fn name(&self) -> &str {
91 "resolve"
92 }
93}
94
95impl ResolveOp {
96 async fn follow_link(
98 &self,
99 ctx: &FlowContext,
100 from_id: &Uuid,
101 link_type: &str,
102 ) -> Result<Uuid> {
103 let links = match self.direction.as_str() {
104 "forward" => {
105 ctx.link_service
106 .find_by_source(from_id, Some(link_type), None)
107 .await?
108 }
109 "reverse" => {
110 ctx.link_service
111 .find_by_target(from_id, Some(link_type), None)
112 .await?
113 }
114 other => {
115 return Err(anyhow!(
116 "resolve: invalid direction '{}', expected 'forward' or 'reverse'",
117 other
118 ));
119 }
120 };
121
122 let link = links.first().ok_or_else(|| {
123 anyhow!(
124 "resolve: no '{}' link found from {} (direction: {})",
125 link_type,
126 from_id,
127 self.direction
128 )
129 })?;
130
131 match self.direction.as_str() {
133 "forward" => Ok(link.target_id),
134 _ => Ok(link.source_id),
135 }
136 }
137
138 async fn fetch_entity_by_id(&self, ctx: &FlowContext, id: &Uuid) -> Result<Value> {
142 for fetcher in ctx.entity_fetchers.values() {
143 if let Ok(entity) = fetcher.fetch_as_json(id).await {
144 return Ok(entity);
145 }
146 }
147
148 Err(anyhow!(
149 "resolve: entity {} not found in any registered fetcher",
150 id
151 ))
152 }
153}
154
155fn parse_uuid(value: &Value, field_name: &str) -> Result<Uuid> {
157 match value {
158 Value::String(s) => Uuid::parse_str(s)
159 .map_err(|e| anyhow!("resolve: '{}' is not a valid UUID: {}", field_name, e)),
160 _ => Err(anyhow!(
161 "resolve: '{}' expected a string UUID, got {:?}",
162 field_name,
163 value
164 )),
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use crate::config::events::ResolveConfig;
172 use crate::core::events::{FrameworkEvent, LinkEvent};
173 use crate::core::link::LinkEntity;
174 use crate::core::module::EntityFetcher;
175 use crate::core::service::LinkService;
176 use serde_json::json;
177 use std::collections::HashMap;
178 use std::sync::Arc;
179
180 struct MockLinkService {
183 links: Vec<LinkEntity>,
184 }
185
186 #[async_trait]
187 impl LinkService for MockLinkService {
188 async fn create(&self, _link: LinkEntity) -> Result<LinkEntity> {
189 unimplemented!()
190 }
191 async fn get(&self, _id: &Uuid) -> Result<Option<LinkEntity>> {
192 unimplemented!()
193 }
194 async fn list(&self) -> Result<Vec<LinkEntity>> {
195 Ok(self.links.clone())
196 }
197 async fn find_by_source(
198 &self,
199 source_id: &Uuid,
200 link_type: Option<&str>,
201 _target_type: Option<&str>,
202 ) -> Result<Vec<LinkEntity>> {
203 Ok(self
204 .links
205 .iter()
206 .filter(|l| {
207 l.source_id == *source_id && link_type.is_none_or(|lt| l.link_type == lt)
208 })
209 .cloned()
210 .collect())
211 }
212 async fn find_by_target(
213 &self,
214 target_id: &Uuid,
215 link_type: Option<&str>,
216 _source_type: Option<&str>,
217 ) -> Result<Vec<LinkEntity>> {
218 Ok(self
219 .links
220 .iter()
221 .filter(|l| {
222 l.target_id == *target_id && link_type.is_none_or(|lt| l.link_type == lt)
223 })
224 .cloned()
225 .collect())
226 }
227 async fn update(&self, _id: &Uuid, _link: LinkEntity) -> Result<LinkEntity> {
228 unimplemented!()
229 }
230 async fn delete(&self, _id: &Uuid) -> Result<()> {
231 unimplemented!()
232 }
233 async fn delete_by_entity(&self, _entity_id: &Uuid) -> Result<()> {
234 unimplemented!()
235 }
236 }
237
238 struct MockEntityFetcher {
241 entities: HashMap<Uuid, Value>,
242 }
243
244 #[async_trait]
245 impl EntityFetcher for MockEntityFetcher {
246 async fn fetch_as_json(&self, entity_id: &Uuid) -> Result<Value> {
247 self.entities
248 .get(entity_id)
249 .cloned()
250 .ok_or_else(|| anyhow!("entity not found"))
251 }
252 }
253
254 fn make_context(
257 source_id: Uuid,
258 target_id: Uuid,
259 link_service: Arc<dyn LinkService>,
260 entity_fetchers: HashMap<String, Arc<dyn EntityFetcher>>,
261 ) -> FlowContext {
262 let event = FrameworkEvent::Link(LinkEvent::Created {
263 link_type: "follows".to_string(),
264 link_id: Uuid::new_v4(),
265 source_id,
266 target_id,
267 metadata: None,
268 });
269 FlowContext::new(event, link_service, entity_fetchers)
270 }
271
272 #[tokio::test]
275 async fn test_resolve_direct_by_id() {
276 let entity_id = Uuid::new_v4();
277 let entity_data = json!({"name": "Alice", "email": "alice@example.com"});
278
279 let mut entities = HashMap::new();
280 entities.insert(entity_id, entity_data.clone());
281
282 let fetcher = Arc::new(MockEntityFetcher { entities }) as Arc<dyn EntityFetcher>;
283 let mut fetchers = HashMap::new();
284 fetchers.insert("user".to_string(), fetcher);
285
286 let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
287
288 let mut ctx = make_context(entity_id, Uuid::new_v4(), link_service, fetchers);
289
290 let op = ResolveOp::from_config(&ResolveConfig {
291 from: "source_id".to_string(),
292 via: None,
293 direction: "forward".to_string(),
294 output_var: "owner".to_string(),
295 });
296
297 let result = op.execute(&mut ctx).await.unwrap();
298 assert!(matches!(result, OpResult::Continue));
299 assert_eq!(ctx.get_var("owner"), Some(&entity_data));
300 }
301
302 #[tokio::test]
303 async fn test_resolve_via_link_forward() {
304 let source_id = Uuid::new_v4();
305 let target_id = Uuid::new_v4();
306 let target_data = json!({"name": "Bob"});
307
308 let link = LinkEntity {
309 id: Uuid::new_v4(),
310 entity_type: "link".to_string(),
311 created_at: chrono::Utc::now(),
312 updated_at: chrono::Utc::now(),
313 deleted_at: None,
314 status: "active".to_string(),
315 tenant_id: None,
316 link_type: "follows".to_string(),
317 source_id,
318 target_id,
319 metadata: None,
320 };
321
322 let mut entities = HashMap::new();
323 entities.insert(target_id, target_data.clone());
324
325 let fetcher = Arc::new(MockEntityFetcher { entities }) as Arc<dyn EntityFetcher>;
326 let mut fetchers = HashMap::new();
327 fetchers.insert("user".to_string(), fetcher);
328
329 let link_service = Arc::new(MockLinkService { links: vec![link] }) as Arc<dyn LinkService>;
330
331 let mut ctx = make_context(source_id, target_id, link_service, fetchers);
332
333 let op = ResolveOp::from_config(&ResolveConfig {
334 from: "source_id".to_string(),
335 via: Some("follows".to_string()),
336 direction: "forward".to_string(),
337 output_var: "followed_user".to_string(),
338 });
339
340 let result = op.execute(&mut ctx).await.unwrap();
341 assert!(matches!(result, OpResult::Continue));
342 assert_eq!(ctx.get_var("followed_user"), Some(&target_data));
343 }
344
345 #[tokio::test]
346 async fn test_resolve_via_link_reverse() {
347 let source_id = Uuid::new_v4();
348 let target_id = Uuid::new_v4();
349 let source_data = json!({"name": "Alice"});
350
351 let link = LinkEntity {
352 id: Uuid::new_v4(),
353 entity_type: "link".to_string(),
354 created_at: chrono::Utc::now(),
355 updated_at: chrono::Utc::now(),
356 deleted_at: None,
357 status: "active".to_string(),
358 tenant_id: None,
359 link_type: "follows".to_string(),
360 source_id,
361 target_id,
362 metadata: None,
363 };
364
365 let mut entities = HashMap::new();
366 entities.insert(source_id, source_data.clone());
367
368 let fetcher = Arc::new(MockEntityFetcher { entities }) as Arc<dyn EntityFetcher>;
369 let mut fetchers = HashMap::new();
370 fetchers.insert("user".to_string(), fetcher);
371
372 let link_service = Arc::new(MockLinkService { links: vec![link] }) as Arc<dyn LinkService>;
373
374 let mut ctx = make_context(source_id, target_id, link_service, fetchers);
375
376 let op = ResolveOp::from_config(&ResolveConfig {
377 from: "target_id".to_string(),
378 via: Some("follows".to_string()),
379 direction: "reverse".to_string(),
380 output_var: "follower".to_string(),
381 });
382
383 let result = op.execute(&mut ctx).await.unwrap();
384 assert!(matches!(result, OpResult::Continue));
385 assert_eq!(ctx.get_var("follower"), Some(&source_data));
386 }
387
388 #[tokio::test]
389 async fn test_resolve_missing_variable() {
390 let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
391
392 let mut ctx = make_context(Uuid::new_v4(), Uuid::new_v4(), link_service, HashMap::new());
393
394 let op = ResolveOp::from_config(&ResolveConfig {
395 from: "nonexistent_var".to_string(),
396 via: None,
397 direction: "forward".to_string(),
398 output_var: "result".to_string(),
399 });
400
401 let result = op.execute(&mut ctx).await;
402 assert!(result.is_err());
403 assert!(result.unwrap_err().to_string().contains("nonexistent_var"));
404 }
405
406 #[tokio::test]
407 async fn test_resolve_no_link_found() {
408 let source_id = Uuid::new_v4();
409
410 let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
411
412 let mut ctx = make_context(source_id, Uuid::new_v4(), link_service, HashMap::new());
413
414 let op = ResolveOp::from_config(&ResolveConfig {
415 from: "source_id".to_string(),
416 via: Some("follows".to_string()),
417 direction: "forward".to_string(),
418 output_var: "result".to_string(),
419 });
420
421 let result = op.execute(&mut ctx).await;
422 assert!(result.is_err());
423 assert!(
424 result
425 .unwrap_err()
426 .to_string()
427 .contains("no 'follows' link found")
428 );
429 }
430
431 #[tokio::test]
432 async fn test_resolve_entity_not_found() {
433 let entity_id = Uuid::new_v4();
434 let fetcher = Arc::new(MockEntityFetcher {
436 entities: HashMap::new(),
437 }) as Arc<dyn EntityFetcher>;
438 let mut fetchers = HashMap::new();
439 fetchers.insert("user".to_string(), fetcher);
440
441 let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
442
443 let mut ctx = make_context(entity_id, Uuid::new_v4(), link_service, fetchers);
444
445 let op = ResolveOp::from_config(&ResolveConfig {
446 from: "source_id".to_string(),
447 via: None,
448 direction: "forward".to_string(),
449 output_var: "owner".to_string(),
450 });
451
452 let result = op.execute(&mut ctx).await;
453 assert!(result.is_err());
454 assert!(result.unwrap_err().to_string().contains("not found"));
455 }
456}