Skip to main content

this/events/operators/
resolve.rs

1//! Resolve operator — resolves an entity by ID or by following a link
2//!
3//! # Direct resolution (no `via`)
4//!
5//! Reads the entity ID from `from` in the context, fetches the entity
6//! via the appropriate `EntityFetcher`, and stores it in `output_var`.
7//!
8//! # Link resolution (with `via`)
9//!
10//! Reads the entity ID from `from`, follows links of type `via` in the
11//! specified `direction` (forward or reverse) via `LinkService`, takes
12//! the first result, fetches the linked entity, and stores it.
13//!
14//! ```yaml
15//! - resolve:
16//!     from: source_id
17//!     via: follows
18//!     direction: reverse
19//!     as: follower
20//! ```
21
22use crate::config::events::ResolveConfig;
23use crate::events::context::FlowContext;
24use crate::events::operators::{OpResult, PipelineOperator};
25use anyhow::{Result, anyhow};
26use async_trait::async_trait;
27use serde_json::Value;
28use uuid::Uuid;
29
30/// Compiled resolve operator
31#[derive(Debug, Clone)]
32pub struct ResolveOp {
33    /// Field name in context to read the entity ID from
34    pub from: String,
35
36    /// Optional link type to follow (None = direct ID resolution)
37    pub via: Option<String>,
38
39    /// Direction: "forward" or "reverse"
40    pub direction: String,
41
42    /// Variable name to store the resolved entity
43    pub output_var: String,
44}
45
46impl ResolveOp {
47    /// Create a ResolveOp from a ResolveConfig
48    pub fn from_config(config: &ResolveConfig) -> Self {
49        Self {
50            from: config.from.clone(),
51            via: config.via.clone(),
52            direction: config.direction.clone(),
53            output_var: config.output_var.clone(),
54        }
55    }
56}
57
58#[async_trait]
59impl PipelineOperator for ResolveOp {
60    async fn execute(&self, ctx: &mut FlowContext) -> Result<OpResult> {
61        // Read the source ID from context
62        let from_value = ctx
63            .get_var(&self.from)
64            .ok_or_else(|| anyhow!("resolve: variable '{}' not found in context", self.from))?
65            .clone();
66
67        let from_id = parse_uuid(&from_value, &self.from)?;
68
69        match &self.via {
70            None => {
71                // Direct resolution: fetch entity by ID
72                // We need to know the entity type to pick the right fetcher.
73                // Convention: if `from` ends with `_id`, the entity type is
74                // derived from the prefix (e.g., "source_id" → look at entity_type var).
75                // For now, we try each fetcher until one succeeds.
76                let entity = self.fetch_entity_by_id(ctx, &from_id).await?;
77                ctx.set_var(&self.output_var, entity);
78            }
79            Some(link_type) => {
80                // Link resolution: follow the link, then fetch the entity
81                let target_id = self.follow_link(ctx, &from_id, link_type).await?;
82                let entity = self.fetch_entity_by_id(ctx, &target_id).await?;
83                ctx.set_var(&self.output_var, entity);
84            }
85        }
86
87        Ok(OpResult::Continue)
88    }
89
90    fn name(&self) -> &str {
91        "resolve"
92    }
93}
94
95impl ResolveOp {
96    /// Follow a link from `from_id` via `link_type` and return the target entity ID
97    async fn follow_link(
98        &self,
99        ctx: &FlowContext,
100        from_id: &Uuid,
101        link_type: &str,
102    ) -> Result<Uuid> {
103        let links = match self.direction.as_str() {
104            "forward" => {
105                ctx.link_service
106                    .find_by_source(from_id, Some(link_type), None)
107                    .await?
108            }
109            "reverse" => {
110                ctx.link_service
111                    .find_by_target(from_id, Some(link_type), None)
112                    .await?
113            }
114            other => {
115                return Err(anyhow!(
116                    "resolve: invalid direction '{}', expected 'forward' or 'reverse'",
117                    other
118                ));
119            }
120        };
121
122        let link = links.first().ok_or_else(|| {
123            anyhow!(
124                "resolve: no '{}' link found from {} (direction: {})",
125                link_type,
126                from_id,
127                self.direction
128            )
129        })?;
130
131        // Return the other end of the link
132        match self.direction.as_str() {
133            "forward" => Ok(link.target_id),
134            _ => Ok(link.source_id),
135        }
136    }
137
138    /// Fetch an entity by ID using available entity fetchers
139    ///
140    /// Tries each fetcher until one returns a result.
141    async fn fetch_entity_by_id(&self, ctx: &FlowContext, id: &Uuid) -> Result<Value> {
142        for fetcher in ctx.entity_fetchers.values() {
143            if let Ok(entity) = fetcher.fetch_as_json(id).await {
144                return Ok(entity);
145            }
146        }
147
148        Err(anyhow!(
149            "resolve: entity {} not found in any registered fetcher",
150            id
151        ))
152    }
153}
154
155/// Parse a UUID from a serde_json::Value
156fn parse_uuid(value: &Value, field_name: &str) -> Result<Uuid> {
157    match value {
158        Value::String(s) => Uuid::parse_str(s)
159            .map_err(|e| anyhow!("resolve: '{}' is not a valid UUID: {}", field_name, e)),
160        _ => Err(anyhow!(
161            "resolve: '{}' expected a string UUID, got {:?}",
162            field_name,
163            value
164        )),
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use crate::config::events::ResolveConfig;
172    use crate::core::events::{FrameworkEvent, LinkEvent};
173    use crate::core::link::LinkEntity;
174    use crate::core::module::EntityFetcher;
175    use crate::core::service::LinkService;
176    use serde_json::json;
177    use std::collections::HashMap;
178    use std::sync::Arc;
179
180    // ── Mock LinkService ─────────────────────────────────────────────
181
182    struct MockLinkService {
183        links: Vec<LinkEntity>,
184    }
185
186    #[async_trait]
187    impl LinkService for MockLinkService {
188        async fn create(&self, _link: LinkEntity) -> Result<LinkEntity> {
189            unimplemented!()
190        }
191        async fn get(&self, _id: &Uuid) -> Result<Option<LinkEntity>> {
192            unimplemented!()
193        }
194        async fn list(&self) -> Result<Vec<LinkEntity>> {
195            Ok(self.links.clone())
196        }
197        async fn find_by_source(
198            &self,
199            source_id: &Uuid,
200            link_type: Option<&str>,
201            _target_type: Option<&str>,
202        ) -> Result<Vec<LinkEntity>> {
203            Ok(self
204                .links
205                .iter()
206                .filter(|l| {
207                    l.source_id == *source_id && link_type.is_none_or(|lt| l.link_type == lt)
208                })
209                .cloned()
210                .collect())
211        }
212        async fn find_by_target(
213            &self,
214            target_id: &Uuid,
215            link_type: Option<&str>,
216            _source_type: Option<&str>,
217        ) -> Result<Vec<LinkEntity>> {
218            Ok(self
219                .links
220                .iter()
221                .filter(|l| {
222                    l.target_id == *target_id && link_type.is_none_or(|lt| l.link_type == lt)
223                })
224                .cloned()
225                .collect())
226        }
227        async fn update(&self, _id: &Uuid, _link: LinkEntity) -> Result<LinkEntity> {
228            unimplemented!()
229        }
230        async fn delete(&self, _id: &Uuid) -> Result<()> {
231            unimplemented!()
232        }
233        async fn delete_by_entity(&self, _entity_id: &Uuid) -> Result<()> {
234            unimplemented!()
235        }
236    }
237
238    // ── Mock EntityFetcher ───────────────────────────────────────────
239
240    struct MockEntityFetcher {
241        entities: HashMap<Uuid, Value>,
242    }
243
244    #[async_trait]
245    impl EntityFetcher for MockEntityFetcher {
246        async fn fetch_as_json(&self, entity_id: &Uuid) -> Result<Value> {
247            self.entities
248                .get(entity_id)
249                .cloned()
250                .ok_or_else(|| anyhow!("entity not found"))
251        }
252    }
253
254    // ── Helpers ──────────────────────────────────────────────────────
255
256    fn make_context(
257        source_id: Uuid,
258        target_id: Uuid,
259        link_service: Arc<dyn LinkService>,
260        entity_fetchers: HashMap<String, Arc<dyn EntityFetcher>>,
261    ) -> FlowContext {
262        let event = FrameworkEvent::Link(LinkEvent::Created {
263            link_type: "follows".to_string(),
264            link_id: Uuid::new_v4(),
265            source_id,
266            target_id,
267            metadata: None,
268        });
269        FlowContext::new(event, link_service, entity_fetchers)
270    }
271
272    // ── Tests ────────────────────────────────────────────────────────
273
274    #[tokio::test]
275    async fn test_resolve_direct_by_id() {
276        let entity_id = Uuid::new_v4();
277        let entity_data = json!({"name": "Alice", "email": "alice@example.com"});
278
279        let mut entities = HashMap::new();
280        entities.insert(entity_id, entity_data.clone());
281
282        let fetcher = Arc::new(MockEntityFetcher { entities }) as Arc<dyn EntityFetcher>;
283        let mut fetchers = HashMap::new();
284        fetchers.insert("user".to_string(), fetcher);
285
286        let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
287
288        let mut ctx = make_context(entity_id, Uuid::new_v4(), link_service, fetchers);
289
290        let op = ResolveOp::from_config(&ResolveConfig {
291            from: "source_id".to_string(),
292            via: None,
293            direction: "forward".to_string(),
294            output_var: "owner".to_string(),
295        });
296
297        let result = op.execute(&mut ctx).await.unwrap();
298        assert!(matches!(result, OpResult::Continue));
299        assert_eq!(ctx.get_var("owner"), Some(&entity_data));
300    }
301
302    #[tokio::test]
303    async fn test_resolve_via_link_forward() {
304        let source_id = Uuid::new_v4();
305        let target_id = Uuid::new_v4();
306        let target_data = json!({"name": "Bob"});
307
308        let link = LinkEntity {
309            id: Uuid::new_v4(),
310            entity_type: "link".to_string(),
311            created_at: chrono::Utc::now(),
312            updated_at: chrono::Utc::now(),
313            deleted_at: None,
314            status: "active".to_string(),
315            tenant_id: None,
316            link_type: "follows".to_string(),
317            source_id,
318            target_id,
319            metadata: None,
320        };
321
322        let mut entities = HashMap::new();
323        entities.insert(target_id, target_data.clone());
324
325        let fetcher = Arc::new(MockEntityFetcher { entities }) as Arc<dyn EntityFetcher>;
326        let mut fetchers = HashMap::new();
327        fetchers.insert("user".to_string(), fetcher);
328
329        let link_service = Arc::new(MockLinkService { links: vec![link] }) as Arc<dyn LinkService>;
330
331        let mut ctx = make_context(source_id, target_id, link_service, fetchers);
332
333        let op = ResolveOp::from_config(&ResolveConfig {
334            from: "source_id".to_string(),
335            via: Some("follows".to_string()),
336            direction: "forward".to_string(),
337            output_var: "followed_user".to_string(),
338        });
339
340        let result = op.execute(&mut ctx).await.unwrap();
341        assert!(matches!(result, OpResult::Continue));
342        assert_eq!(ctx.get_var("followed_user"), Some(&target_data));
343    }
344
345    #[tokio::test]
346    async fn test_resolve_via_link_reverse() {
347        let source_id = Uuid::new_v4();
348        let target_id = Uuid::new_v4();
349        let source_data = json!({"name": "Alice"});
350
351        let link = LinkEntity {
352            id: Uuid::new_v4(),
353            entity_type: "link".to_string(),
354            created_at: chrono::Utc::now(),
355            updated_at: chrono::Utc::now(),
356            deleted_at: None,
357            status: "active".to_string(),
358            tenant_id: None,
359            link_type: "follows".to_string(),
360            source_id,
361            target_id,
362            metadata: None,
363        };
364
365        let mut entities = HashMap::new();
366        entities.insert(source_id, source_data.clone());
367
368        let fetcher = Arc::new(MockEntityFetcher { entities }) as Arc<dyn EntityFetcher>;
369        let mut fetchers = HashMap::new();
370        fetchers.insert("user".to_string(), fetcher);
371
372        let link_service = Arc::new(MockLinkService { links: vec![link] }) as Arc<dyn LinkService>;
373
374        let mut ctx = make_context(source_id, target_id, link_service, fetchers);
375
376        let op = ResolveOp::from_config(&ResolveConfig {
377            from: "target_id".to_string(),
378            via: Some("follows".to_string()),
379            direction: "reverse".to_string(),
380            output_var: "follower".to_string(),
381        });
382
383        let result = op.execute(&mut ctx).await.unwrap();
384        assert!(matches!(result, OpResult::Continue));
385        assert_eq!(ctx.get_var("follower"), Some(&source_data));
386    }
387
388    #[tokio::test]
389    async fn test_resolve_missing_variable() {
390        let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
391
392        let mut ctx = make_context(Uuid::new_v4(), Uuid::new_v4(), link_service, HashMap::new());
393
394        let op = ResolveOp::from_config(&ResolveConfig {
395            from: "nonexistent_var".to_string(),
396            via: None,
397            direction: "forward".to_string(),
398            output_var: "result".to_string(),
399        });
400
401        let result = op.execute(&mut ctx).await;
402        assert!(result.is_err());
403        assert!(result.unwrap_err().to_string().contains("nonexistent_var"));
404    }
405
406    #[tokio::test]
407    async fn test_resolve_no_link_found() {
408        let source_id = Uuid::new_v4();
409
410        let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
411
412        let mut ctx = make_context(source_id, Uuid::new_v4(), link_service, HashMap::new());
413
414        let op = ResolveOp::from_config(&ResolveConfig {
415            from: "source_id".to_string(),
416            via: Some("follows".to_string()),
417            direction: "forward".to_string(),
418            output_var: "result".to_string(),
419        });
420
421        let result = op.execute(&mut ctx).await;
422        assert!(result.is_err());
423        assert!(
424            result
425                .unwrap_err()
426                .to_string()
427                .contains("no 'follows' link found")
428        );
429    }
430
431    #[tokio::test]
432    async fn test_resolve_entity_not_found() {
433        let entity_id = Uuid::new_v4();
434        // Empty fetcher — entity won't be found
435        let fetcher = Arc::new(MockEntityFetcher {
436            entities: HashMap::new(),
437        }) as Arc<dyn EntityFetcher>;
438        let mut fetchers = HashMap::new();
439        fetchers.insert("user".to_string(), fetcher);
440
441        let link_service = Arc::new(MockLinkService { links: vec![] }) as Arc<dyn LinkService>;
442
443        let mut ctx = make_context(entity_id, Uuid::new_v4(), link_service, fetchers);
444
445        let op = ResolveOp::from_config(&ResolveConfig {
446            from: "source_id".to_string(),
447            via: None,
448            direction: "forward".to_string(),
449            output_var: "owner".to_string(),
450        });
451
452        let result = op.execute(&mut ctx).await;
453        assert!(result.is_err());
454        assert!(result.unwrap_err().to_string().contains("not found"));
455    }
456}