Skip to main content

this/events/
runtime.rs

1//! FlowRuntime — subscribes to EventLog and dispatches events to compiled flows
2//!
3//! The FlowRuntime is the main execution engine for declarative event flows.
4//! It runs as a background task that:
5//!
6//! 1. Subscribes to the EventLog (from a configurable seek position)
7//! 2. For each incoming event, finds matching flows via EventMatcher
8//! 3. Executes the pipeline operators sequentially
9//! 4. Handles fan-out by recursively processing sub-pipelines
10//! 5. Logs errors without propagating them (one flow's error doesn't affect others)
11//!
12//! # Usage
13//!
14//! ```ignore
15//! let runtime = FlowRuntime::new(compiled_flows, event_log, link_service, fetchers);
16//! let handle = runtime.run(SeekPosition::Latest);
17//! // handle is a JoinHandle that can be used to monitor/cancel the runtime
18//! ```
19
20use crate::core::module::EntityFetcher;
21use crate::core::service::LinkService;
22use crate::events::compiler::CompiledFlow;
23use crate::events::context::FlowContext;
24use crate::events::log::EventLog;
25use crate::events::operators::{OpResult, PipelineOperator};
26use crate::events::sinks::SinkRegistry;
27use crate::events::types::SeekPosition;
28use std::collections::HashMap;
29use std::sync::Arc;
30use tokio::task::JoinHandle;
31use tokio_stream::StreamExt;
32
33/// The main flow execution runtime
34pub struct FlowRuntime {
35    /// Compiled flows to evaluate for each event
36    flows: Vec<CompiledFlow>,
37
38    /// Event log to subscribe to
39    event_log: Arc<dyn EventLog>,
40
41    /// Shared link service for resolve/fan_out operators
42    link_service: Arc<dyn LinkService>,
43
44    /// Entity fetchers keyed by entity type
45    entity_fetchers: HashMap<String, Arc<dyn EntityFetcher>>,
46
47    /// Sink registry for deliver operators
48    sink_registry: Option<Arc<SinkRegistry>>,
49
50    /// Consumer name for tracking position
51    consumer_name: String,
52}
53
54impl std::fmt::Debug for FlowRuntime {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("FlowRuntime")
57            .field("flows", &self.flows.len())
58            .field("consumer_name", &self.consumer_name)
59            .finish()
60    }
61}
62
63impl FlowRuntime {
64    /// Create a new FlowRuntime
65    pub fn new(
66        flows: Vec<CompiledFlow>,
67        event_log: Arc<dyn EventLog>,
68        link_service: Arc<dyn LinkService>,
69        entity_fetchers: HashMap<String, Arc<dyn EntityFetcher>>,
70    ) -> Self {
71        Self {
72            flows,
73            event_log,
74            link_service,
75            entity_fetchers,
76            sink_registry: None,
77            consumer_name: "flow-runtime".to_string(),
78        }
79    }
80
81    /// Set a custom consumer name (for multi-consumer setups)
82    pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
83        self.consumer_name = name.into();
84        self
85    }
86
87    /// Set the sink registry for deliver operators
88    ///
89    /// Without a sink registry, the `deliver` operator will log but not
90    /// actually dispatch to any sink.
91    pub fn with_sink_registry(mut self, registry: Arc<SinkRegistry>) -> Self {
92        self.sink_registry = Some(registry);
93        self
94    }
95
96    /// Start the runtime as a background task
97    ///
98    /// Returns a JoinHandle that resolves when the runtime stops.
99    /// The runtime runs indefinitely, processing events as they arrive.
100    pub fn run(self, position: SeekPosition) -> JoinHandle<()> {
101        tokio::spawn(async move {
102            if let Err(e) = self.run_inner(position).await {
103                tracing::error!(error = %e, "flow runtime stopped with error");
104            }
105        })
106    }
107
108    /// Internal run loop
109    async fn run_inner(self, position: SeekPosition) -> anyhow::Result<()> {
110        tracing::info!(
111            flows = self.flows.len(),
112            consumer = %self.consumer_name,
113            "flow runtime starting"
114        );
115
116        let mut stream = self
117            .event_log
118            .subscribe(&self.consumer_name, position)
119            .await?;
120
121        while let Some(envelope) = stream.next().await {
122            let event = &envelope.event;
123
124            // Find matching flows
125            for flow in &self.flows {
126                if flow.matcher.matches(event) {
127                    tracing::debug!(
128                        flow = %flow.name,
129                        event_kind = %event.event_kind(),
130                        "flow matched, executing pipeline"
131                    );
132
133                    // Create a FlowContext for this execution
134                    let mut ctx = FlowContext::new(
135                        event.clone(),
136                        self.link_service.clone(),
137                        self.entity_fetchers.clone(),
138                    );
139
140                    // Attach sink registry if available
141                    if let Some(ref registry) = self.sink_registry {
142                        ctx.sink_registry = Some(registry.clone());
143                    }
144
145                    // Execute the pipeline
146                    if let Err(e) = execute_pipeline(&flow.operators, &mut ctx).await {
147                        tracing::warn!(
148                            flow = %flow.name,
149                            error = %e,
150                            "pipeline execution failed"
151                        );
152                    }
153                }
154            }
155
156            // Ack the exact event we just processed
157            if let Some(seq) = envelope.seq_no
158                && let Err(e) = self.event_log.ack(&self.consumer_name, seq).await
159            {
160                tracing::warn!(error = %e, "failed to ack event");
161            }
162        }
163
164        tracing::info!("flow runtime stream ended");
165        Ok(())
166    }
167}
168
169/// Execute a pipeline of operators on a FlowContext
170///
171/// Handles fan-out by recursively executing remaining operators on each sub-context.
172/// Uses `Box::pin` for async recursion since fan-out creates sub-pipelines.
173fn execute_pipeline<'a>(
174    operators: &'a [Box<dyn PipelineOperator>],
175    ctx: &'a mut FlowContext,
176) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + 'a>> {
177    Box::pin(async move {
178        for (i, op) in operators.iter().enumerate() {
179            match op.execute(ctx).await? {
180                OpResult::Continue => {
181                    // Continue to next operator
182                }
183                OpResult::Drop => {
184                    tracing::debug!(operator = %op.name(), "event dropped by operator");
185                    return Ok(());
186                }
187                OpResult::FanOut(contexts) => {
188                    tracing::debug!(
189                        operator = %op.name(),
190                        count = contexts.len(),
191                        "fan-out: processing remaining pipeline for each context"
192                    );
193
194                    // Execute remaining operators for each fanned-out context
195                    let remaining = &operators[i + 1..];
196                    for mut sub_ctx in contexts {
197                        if let Err(e) = execute_pipeline(remaining, &mut sub_ctx).await {
198                            tracing::warn!(
199                                operator = %op.name(),
200                                error = %e,
201                                "fan-out sub-pipeline failed"
202                            );
203                        }
204                    }
205                    return Ok(());
206                }
207            }
208        }
209
210        Ok(())
211    })
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use crate::config::events::*;
218    use crate::core::events::{EntityEvent, EventEnvelope, FrameworkEvent, LinkEvent};
219    use crate::events::compiler::compile_flow;
220    use crate::events::memory::InMemoryEventLog;
221    use serde_json::json;
222    use std::sync::Arc;
223    use uuid::Uuid;
224
225    // ── Mock LinkService ─────────────────────────────────────────────
226
227    struct MockLinkService;
228
229    #[async_trait::async_trait]
230    impl LinkService for MockLinkService {
231        async fn create(
232            &self,
233            _: crate::core::link::LinkEntity,
234        ) -> anyhow::Result<crate::core::link::LinkEntity> {
235            unimplemented!()
236        }
237        async fn get(&self, _: &Uuid) -> anyhow::Result<Option<crate::core::link::LinkEntity>> {
238            unimplemented!()
239        }
240        async fn list(&self) -> anyhow::Result<Vec<crate::core::link::LinkEntity>> {
241            unimplemented!()
242        }
243        async fn find_by_source(
244            &self,
245            _: &Uuid,
246            _: Option<&str>,
247            _: Option<&str>,
248        ) -> anyhow::Result<Vec<crate::core::link::LinkEntity>> {
249            Ok(vec![])
250        }
251        async fn find_by_target(
252            &self,
253            _: &Uuid,
254            _: Option<&str>,
255            _: Option<&str>,
256        ) -> anyhow::Result<Vec<crate::core::link::LinkEntity>> {
257            Ok(vec![])
258        }
259        async fn update(
260            &self,
261            _: &Uuid,
262            _: crate::core::link::LinkEntity,
263        ) -> anyhow::Result<crate::core::link::LinkEntity> {
264            unimplemented!()
265        }
266        async fn delete(&self, _: &Uuid) -> anyhow::Result<()> {
267            unimplemented!()
268        }
269        async fn delete_by_entity(&self, _: &Uuid) -> anyhow::Result<()> {
270            unimplemented!()
271        }
272    }
273
274    // ── Mock EntityFetcher ───────────────────────────────────────────
275
276    struct MockEntityFetcher;
277
278    #[async_trait::async_trait]
279    impl EntityFetcher for MockEntityFetcher {
280        async fn fetch_as_json(&self, id: &Uuid) -> anyhow::Result<serde_json::Value> {
281            Ok(json!({"id": id.to_string(), "name": "TestUser"}))
282        }
283    }
284
285    // ── Helpers ──────────────────────────────────────────────────────
286
287    fn make_link_event(link_type: &str) -> FrameworkEvent {
288        FrameworkEvent::Link(LinkEvent::Created {
289            link_type: link_type.to_string(),
290            link_id: Uuid::new_v4(),
291            source_id: Uuid::new_v4(),
292            target_id: Uuid::new_v4(),
293            metadata: None,
294        })
295    }
296
297    fn make_entity_event(entity_type: &str) -> FrameworkEvent {
298        FrameworkEvent::Entity(EntityEvent::Created {
299            entity_type: entity_type.to_string(),
300            entity_id: Uuid::new_v4(),
301            data: json!({"name": "test"}),
302        })
303    }
304
305    // ── Tests ────────────────────────────────────────────────────────
306
307    #[tokio::test]
308    async fn test_runtime_dispatches_matching_event() {
309        let event_log = Arc::new(InMemoryEventLog::new());
310
311        // Compile a simple flow: link.created/follows → map → deliver
312        let flow = compile_flow(&FlowConfig {
313            name: "follow_notif".to_string(),
314            description: None,
315            trigger: TriggerConfig {
316                kind: "link.created".to_string(),
317                link_type: Some("follows".to_string()),
318                entity_type: None,
319            },
320            pipeline: vec![
321                PipelineStep::Map(MapConfig {
322                    template: json!({"title": "New follower!"}),
323                }),
324                PipelineStep::Deliver(DeliverConfig {
325                    sink: Some("in_app".to_string()),
326                    sinks: None,
327                }),
328            ],
329        })
330        .unwrap();
331
332        let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
333        let runtime = FlowRuntime::new(vec![flow], event_log.clone(), link_service, HashMap::new());
334
335        let handle = runtime.run(SeekPosition::Latest);
336
337        // Publish a matching event
338        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
339        event_log
340            .append(EventEnvelope::new(make_link_event("follows")))
341            .await
342            .unwrap();
343
344        // Give the runtime time to process
345        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
346
347        handle.abort();
348        // If we got here without panics, the runtime processed the event correctly
349    }
350
351    #[tokio::test]
352    async fn test_runtime_ignores_non_matching_event() {
353        let event_log = Arc::new(InMemoryEventLog::new());
354
355        let flow = compile_flow(&FlowConfig {
356            name: "follow_notif".to_string(),
357            description: None,
358            trigger: TriggerConfig {
359                kind: "link.created".to_string(),
360                link_type: Some("follows".to_string()),
361                entity_type: None,
362            },
363            pipeline: vec![PipelineStep::Map(MapConfig {
364                template: json!({"title": "New follower!"}),
365            })],
366        })
367        .unwrap();
368
369        let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
370        let runtime = FlowRuntime::new(vec![flow], event_log.clone(), link_service, HashMap::new());
371
372        let handle = runtime.run(SeekPosition::Latest);
373
374        // Publish a NON-matching event (likes, not follows)
375        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
376        event_log
377            .append(EventEnvelope::new(make_link_event("likes")))
378            .await
379            .unwrap();
380
381        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
382        handle.abort();
383        // Should process without errors — the flow simply doesn't match
384    }
385
386    #[tokio::test]
387    async fn test_runtime_multiple_flows() {
388        let event_log = Arc::new(InMemoryEventLog::new());
389
390        let flow1 = compile_flow(&FlowConfig {
391            name: "follow_flow".to_string(),
392            description: None,
393            trigger: TriggerConfig {
394                kind: "link.created".to_string(),
395                link_type: Some("follows".to_string()),
396                entity_type: None,
397            },
398            pipeline: vec![PipelineStep::Map(MapConfig {
399                template: json!({"type": "follow"}),
400            })],
401        })
402        .unwrap();
403
404        let flow2 = compile_flow(&FlowConfig {
405            name: "entity_flow".to_string(),
406            description: None,
407            trigger: TriggerConfig {
408                kind: "entity.created".to_string(),
409                link_type: None,
410                entity_type: Some("user".to_string()),
411            },
412            pipeline: vec![PipelineStep::Map(MapConfig {
413                template: json!({"type": "user_created"}),
414            })],
415        })
416        .unwrap();
417
418        let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
419        let runtime = FlowRuntime::new(
420            vec![flow1, flow2],
421            event_log.clone(),
422            link_service,
423            HashMap::new(),
424        );
425
426        let handle = runtime.run(SeekPosition::Latest);
427
428        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
429
430        // Publish events that match different flows
431        event_log
432            .append(EventEnvelope::new(make_link_event("follows")))
433            .await
434            .unwrap();
435        event_log
436            .append(EventEnvelope::new(make_entity_event("user")))
437            .await
438            .unwrap();
439        event_log
440            .append(EventEnvelope::new(make_link_event("likes"))) // matches nothing
441            .await
442            .unwrap();
443
444        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
445        handle.abort();
446    }
447
448    #[tokio::test]
449    async fn test_runtime_filter_drops_event() {
450        let event_log = Arc::new(InMemoryEventLog::new());
451
452        let flow = compile_flow(&FlowConfig {
453            name: "filtered_flow".to_string(),
454            description: None,
455            trigger: TriggerConfig {
456                kind: "entity.created".to_string(),
457                link_type: None,
458                entity_type: None,
459            },
460            pipeline: vec![
461                // This filter will drop the event
462                PipelineStep::Filter(FilterConfig {
463                    condition: "entity_type == \"admin\"".to_string(),
464                }),
465                PipelineStep::Map(MapConfig {
466                    template: json!({"title": "should not reach here"}),
467                }),
468            ],
469        })
470        .unwrap();
471
472        let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
473        let runtime = FlowRuntime::new(vec![flow], event_log.clone(), link_service, HashMap::new());
474
475        let handle = runtime.run(SeekPosition::Latest);
476
477        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
478        event_log
479            .append(EventEnvelope::new(make_entity_event("user"))) // type is "user", filter expects "admin"
480            .await
481            .unwrap();
482
483        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
484        handle.abort();
485    }
486
487    #[tokio::test]
488    async fn test_runtime_resolve_and_map() {
489        let event_log = Arc::new(InMemoryEventLog::new());
490
491        let flow = compile_flow(&FlowConfig {
492            name: "resolve_map_flow".to_string(),
493            description: None,
494            trigger: TriggerConfig {
495                kind: "link.created".to_string(),
496                link_type: Some("follows".to_string()),
497                entity_type: None,
498            },
499            pipeline: vec![
500                PipelineStep::Resolve(ResolveConfig {
501                    from: "source_id".to_string(),
502                    via: None,
503                    direction: "forward".to_string(),
504                    output_var: "source".to_string(),
505                }),
506                PipelineStep::Map(MapConfig {
507                    template: json!({
508                        "title": "{{ source.name }} followed you",
509                        "source_id": "{{ source_id }}"
510                    }),
511                }),
512                PipelineStep::Deliver(DeliverConfig {
513                    sink: Some("in_app".to_string()),
514                    sinks: None,
515                }),
516            ],
517        })
518        .unwrap();
519
520        let fetcher = Arc::new(MockEntityFetcher) as Arc<dyn EntityFetcher>;
521        let mut fetchers = HashMap::new();
522        fetchers.insert("user".to_string(), fetcher);
523
524        let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
525        let runtime = FlowRuntime::new(vec![flow], event_log.clone(), link_service, fetchers);
526
527        let handle = runtime.run(SeekPosition::Latest);
528
529        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
530        event_log
531            .append(EventEnvelope::new(make_link_event("follows")))
532            .await
533            .unwrap();
534
535        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
536        handle.abort();
537    }
538
539    // ── Unit test for execute_pipeline ────────────────────────────────
540
541    #[tokio::test]
542    async fn test_execute_pipeline_end_to_end() {
543        let ops: Vec<Box<dyn PipelineOperator>> = vec![
544            Box::new(
545                crate::events::operators::FilterOp::from_config(&FilterConfig {
546                    condition: "entity_type == \"user\"".to_string(),
547                })
548                .unwrap(),
549            ),
550            Box::new(crate::events::operators::MapOp::from_config(&MapConfig {
551                template: json!({"msg": "Hello {{ entity_type }}"}),
552            })),
553        ];
554
555        let event = FrameworkEvent::Entity(EntityEvent::Created {
556            entity_type: "user".to_string(),
557            entity_id: Uuid::new_v4(),
558            data: json!({}),
559        });
560
561        let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
562        let mut ctx = FlowContext::new(event, link_service, HashMap::new());
563
564        execute_pipeline(&ops, &mut ctx).await.unwrap();
565
566        // Map should have set _payload
567        let payload = ctx.get_var("_payload").unwrap();
568        assert_eq!(payload["msg"], "Hello user");
569    }
570
571    #[tokio::test]
572    async fn test_execute_pipeline_filter_drops() {
573        let ops: Vec<Box<dyn PipelineOperator>> = vec![
574            Box::new(
575                crate::events::operators::FilterOp::from_config(&FilterConfig {
576                    condition: "entity_type == \"admin\"".to_string(),
577                })
578                .unwrap(),
579            ),
580            Box::new(crate::events::operators::MapOp::from_config(&MapConfig {
581                template: json!({"msg": "should not reach"}),
582            })),
583        ];
584
585        let event = FrameworkEvent::Entity(EntityEvent::Created {
586            entity_type: "user".to_string(),
587            entity_id: Uuid::new_v4(),
588            data: json!({}),
589        });
590
591        let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
592        let mut ctx = FlowContext::new(event, link_service, HashMap::new());
593
594        execute_pipeline(&ops, &mut ctx).await.unwrap();
595
596        // Map should NOT have been executed (filter dropped)
597        assert!(ctx.get_var("_payload").is_none());
598    }
599}