Skip to main content

this/events/
context.rs

1//! FlowContext — the context bag passed through the pipeline
2//!
3//! The FlowContext carries the original event, resolved variables, and
4//! access to services (LinkService, EntityFetchers) needed by operators.
5
6use crate::core::events::FrameworkEvent;
7use crate::core::module::EntityFetcher;
8use crate::core::service::LinkService;
9use crate::events::sinks::SinkRegistry;
10use serde_json::Value;
11use std::collections::HashMap;
12use std::sync::Arc;
13
14/// Context passed through each operator in the pipeline
15///
16/// Accumulates variables as operators resolve entities and fan out.
17/// Each operator can read/write variables via `get_var`/`set_var`.
18///
19/// # Variables
20///
21/// Variables are stored as `serde_json::Value` and named by the `as` field
22/// of operators. For example, `resolve(from: source_id, as: follower)` stores
23/// the resolved entity as `follower` in the context.
24///
25/// Special variables set from the trigger event:
26/// - `source_id` — Source entity ID (for link events)
27/// - `target_id` — Target entity ID (for link events)
28/// - `link_type` — Link type (for link events)
29/// - `entity_type` — Entity type (for entity events)
30/// - `entity_id` — Entity ID (for entity events)
31/// - `metadata` — Link metadata (for link events)
32/// - `data` — Entity data (for entity events)
33#[derive(Clone)]
34pub struct FlowContext {
35    /// The original framework event that triggered this flow
36    pub event: FrameworkEvent,
37
38    /// Accumulated variables from pipeline operators
39    pub variables: HashMap<String, Value>,
40
41    /// Access to the link service for resolve/fan_out operators
42    pub link_service: Arc<dyn LinkService>,
43
44    /// Access to entity fetchers, keyed by entity type
45    pub entity_fetchers: HashMap<String, Arc<dyn EntityFetcher>>,
46
47    /// Access to the sink registry for deliver operators
48    pub sink_registry: Option<Arc<SinkRegistry>>,
49}
50
51impl std::fmt::Debug for FlowContext {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.debug_struct("FlowContext")
54            .field("event", &self.event)
55            .field("variables", &self.variables)
56            .field(
57                "entity_fetchers",
58                &self.entity_fetchers.keys().collect::<Vec<_>>(),
59            )
60            .finish()
61    }
62}
63
64impl FlowContext {
65    /// Create a new FlowContext from a framework event
66    ///
67    /// Automatically extracts event fields into variables.
68    pub fn new(
69        event: FrameworkEvent,
70        link_service: Arc<dyn LinkService>,
71        entity_fetchers: HashMap<String, Arc<dyn EntityFetcher>>,
72    ) -> Self {
73        let mut variables = HashMap::new();
74
75        // Extract event-specific variables
76        match &event {
77            FrameworkEvent::Entity(entity_event) => {
78                use crate::core::events::EntityEvent;
79                match entity_event {
80                    EntityEvent::Created {
81                        entity_type,
82                        entity_id,
83                        data,
84                    } => {
85                        variables.insert(
86                            "entity_type".to_string(),
87                            Value::String(entity_type.clone()),
88                        );
89                        variables.insert(
90                            "entity_id".to_string(),
91                            Value::String(entity_id.to_string()),
92                        );
93                        variables.insert("data".to_string(), data.clone());
94                    }
95                    EntityEvent::Updated {
96                        entity_type,
97                        entity_id,
98                        data,
99                    } => {
100                        variables.insert(
101                            "entity_type".to_string(),
102                            Value::String(entity_type.clone()),
103                        );
104                        variables.insert(
105                            "entity_id".to_string(),
106                            Value::String(entity_id.to_string()),
107                        );
108                        variables.insert("data".to_string(), data.clone());
109                    }
110                    EntityEvent::Deleted {
111                        entity_type,
112                        entity_id,
113                    } => {
114                        variables.insert(
115                            "entity_type".to_string(),
116                            Value::String(entity_type.clone()),
117                        );
118                        variables.insert(
119                            "entity_id".to_string(),
120                            Value::String(entity_id.to_string()),
121                        );
122                    }
123                }
124            }
125            FrameworkEvent::Link(link_event) => {
126                use crate::core::events::LinkEvent;
127                match link_event {
128                    LinkEvent::Created {
129                        link_type,
130                        link_id,
131                        source_id,
132                        target_id,
133                        metadata,
134                    } => {
135                        variables.insert("link_type".to_string(), Value::String(link_type.clone()));
136                        variables.insert("link_id".to_string(), Value::String(link_id.to_string()));
137                        variables.insert(
138                            "source_id".to_string(),
139                            Value::String(source_id.to_string()),
140                        );
141                        variables.insert(
142                            "target_id".to_string(),
143                            Value::String(target_id.to_string()),
144                        );
145                        if let Some(meta) = metadata {
146                            variables.insert("metadata".to_string(), meta.clone());
147                        }
148                    }
149                    LinkEvent::Deleted {
150                        link_type,
151                        link_id,
152                        source_id,
153                        target_id,
154                    } => {
155                        variables.insert("link_type".to_string(), Value::String(link_type.clone()));
156                        variables.insert("link_id".to_string(), Value::String(link_id.to_string()));
157                        variables.insert(
158                            "source_id".to_string(),
159                            Value::String(source_id.to_string()),
160                        );
161                        variables.insert(
162                            "target_id".to_string(),
163                            Value::String(target_id.to_string()),
164                        );
165                    }
166                }
167            }
168        }
169
170        Self {
171            event,
172            variables,
173            link_service,
174            entity_fetchers,
175            sink_registry: None,
176        }
177    }
178
179    /// Set a variable in the context
180    pub fn set_var(&mut self, name: impl Into<String>, value: Value) {
181        self.variables.insert(name.into(), value);
182    }
183
184    /// Get a variable from the context
185    pub fn get_var(&self, name: &str) -> Option<&Value> {
186        // Support dotted access: "owner.id" -> variables["owner"]["id"]
187        if let Some(dot_pos) = name.find('.') {
188            let (root, rest) = name.split_at(dot_pos);
189            let rest = &rest[1..]; // skip the dot
190            if let Some(root_val) = self.variables.get(root) {
191                return get_nested(root_val, rest);
192            }
193            return None;
194        }
195        self.variables.get(name)
196    }
197
198    /// Set the sink registry for deliver operators
199    pub fn with_sink_registry(mut self, registry: Arc<SinkRegistry>) -> Self {
200        self.sink_registry = Some(registry);
201        self
202    }
203
204    /// Get a variable as a string (convenience)
205    pub fn get_var_str(&self, name: &str) -> Option<&str> {
206        self.get_var(name).and_then(|v| v.as_str())
207    }
208}
209
210/// Navigate into nested JSON values via dotted path
211fn get_nested<'a>(value: &'a Value, path: &str) -> Option<&'a Value> {
212    if let Some(dot_pos) = path.find('.') {
213        let (key, rest) = path.split_at(dot_pos);
214        let rest = &rest[1..];
215        match value {
216            Value::Object(map) => map.get(key).and_then(|v| get_nested(v, rest)),
217            _ => None,
218        }
219    } else {
220        match value {
221            Value::Object(map) => map.get(path),
222            _ => None,
223        }
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use crate::core::events::{EntityEvent, LinkEvent};
231    use serde_json::json;
232    use uuid::Uuid;
233
234    // Minimal mock for tests
235    struct MockLinkService;
236
237    #[async_trait::async_trait]
238    impl LinkService for MockLinkService {
239        async fn create(
240            &self,
241            _link: crate::core::link::LinkEntity,
242        ) -> anyhow::Result<crate::core::link::LinkEntity> {
243            unimplemented!()
244        }
245        async fn get(&self, _id: &Uuid) -> anyhow::Result<Option<crate::core::link::LinkEntity>> {
246            unimplemented!()
247        }
248        async fn list(&self) -> anyhow::Result<Vec<crate::core::link::LinkEntity>> {
249            unimplemented!()
250        }
251        async fn find_by_source(
252            &self,
253            _source_id: &Uuid,
254            _link_type: Option<&str>,
255            _target_type: Option<&str>,
256        ) -> anyhow::Result<Vec<crate::core::link::LinkEntity>> {
257            unimplemented!()
258        }
259        async fn find_by_target(
260            &self,
261            _target_id: &Uuid,
262            _link_type: Option<&str>,
263            _source_type: Option<&str>,
264        ) -> anyhow::Result<Vec<crate::core::link::LinkEntity>> {
265            unimplemented!()
266        }
267        async fn update(
268            &self,
269            _id: &Uuid,
270            _link: crate::core::link::LinkEntity,
271        ) -> anyhow::Result<crate::core::link::LinkEntity> {
272            unimplemented!()
273        }
274        async fn delete(&self, _id: &Uuid) -> anyhow::Result<()> {
275            unimplemented!()
276        }
277        async fn delete_by_entity(&self, _entity_id: &Uuid) -> anyhow::Result<()> {
278            unimplemented!()
279        }
280    }
281
282    fn mock_link_service() -> Arc<dyn LinkService> {
283        Arc::new(MockLinkService)
284    }
285
286    #[test]
287    fn test_context_from_link_created() {
288        let source_id = Uuid::new_v4();
289        let target_id = Uuid::new_v4();
290        let event = FrameworkEvent::Link(LinkEvent::Created {
291            link_type: "follows".to_string(),
292            link_id: Uuid::new_v4(),
293            source_id,
294            target_id,
295            metadata: Some(json!({"note": "hello"})),
296        });
297
298        let ctx = FlowContext::new(event, mock_link_service(), HashMap::new());
299
300        assert_eq!(ctx.get_var_str("link_type"), Some("follows"));
301        assert_eq!(
302            ctx.get_var_str("source_id"),
303            Some(source_id.to_string().as_str())
304        );
305        assert_eq!(
306            ctx.get_var_str("target_id"),
307            Some(target_id.to_string().as_str())
308        );
309        assert_eq!(ctx.get_var("metadata"), Some(&json!({"note": "hello"})));
310    }
311
312    #[test]
313    fn test_context_from_entity_created() {
314        let entity_id = Uuid::new_v4();
315        let event = FrameworkEvent::Entity(EntityEvent::Created {
316            entity_type: "user".to_string(),
317            entity_id,
318            data: json!({"name": "Alice"}),
319        });
320
321        let ctx = FlowContext::new(event, mock_link_service(), HashMap::new());
322
323        assert_eq!(ctx.get_var_str("entity_type"), Some("user"));
324        assert_eq!(
325            ctx.get_var_str("entity_id"),
326            Some(entity_id.to_string().as_str())
327        );
328        assert_eq!(ctx.get_var("data"), Some(&json!({"name": "Alice"})));
329    }
330
331    #[test]
332    fn test_set_and_get_var() {
333        let event = FrameworkEvent::Entity(EntityEvent::Created {
334            entity_type: "user".to_string(),
335            entity_id: Uuid::new_v4(),
336            data: json!({}),
337        });
338
339        let mut ctx = FlowContext::new(event, mock_link_service(), HashMap::new());
340        ctx.set_var("owner", json!({"id": "abc", "name": "Bob"}));
341
342        assert_eq!(
343            ctx.get_var("owner"),
344            Some(&json!({"id": "abc", "name": "Bob"}))
345        );
346    }
347
348    #[test]
349    fn test_dotted_access() {
350        let event = FrameworkEvent::Entity(EntityEvent::Created {
351            entity_type: "user".to_string(),
352            entity_id: Uuid::new_v4(),
353            data: json!({}),
354        });
355
356        let mut ctx = FlowContext::new(event, mock_link_service(), HashMap::new());
357        ctx.set_var(
358            "owner",
359            json!({"id": "abc", "profile": {"name": "Bob", "age": 30}}),
360        );
361
362        assert_eq!(ctx.get_var_str("owner.id"), Some("abc"));
363        assert_eq!(ctx.get_var_str("owner.profile.name"), Some("Bob"));
364        assert_eq!(ctx.get_var("owner.profile.age"), Some(&json!(30)));
365        assert_eq!(ctx.get_var("owner.nonexistent"), None);
366        assert_eq!(ctx.get_var("nonexistent.field"), None);
367    }
368
369    #[test]
370    fn test_link_deleted_context() {
371        let source_id = Uuid::new_v4();
372        let target_id = Uuid::new_v4();
373        let event = FrameworkEvent::Link(LinkEvent::Deleted {
374            link_type: "follows".to_string(),
375            link_id: Uuid::new_v4(),
376            source_id,
377            target_id,
378        });
379
380        let ctx = FlowContext::new(event, mock_link_service(), HashMap::new());
381        assert_eq!(ctx.get_var_str("link_type"), Some("follows"));
382        assert_eq!(ctx.get_var("metadata"), None); // Deleted has no metadata
383    }
384}