1use crate::core::module::EntityFetcher;
21use crate::core::service::LinkService;
22use crate::events::compiler::CompiledFlow;
23use crate::events::context::FlowContext;
24use crate::events::log::EventLog;
25use crate::events::operators::{OpResult, PipelineOperator};
26use crate::events::sinks::SinkRegistry;
27use crate::events::types::SeekPosition;
28use std::collections::HashMap;
29use std::sync::Arc;
30use tokio::task::JoinHandle;
31use tokio_stream::StreamExt;
32
33pub struct FlowRuntime {
35 flows: Vec<CompiledFlow>,
37
38 event_log: Arc<dyn EventLog>,
40
41 link_service: Arc<dyn LinkService>,
43
44 entity_fetchers: HashMap<String, Arc<dyn EntityFetcher>>,
46
47 sink_registry: Option<Arc<SinkRegistry>>,
49
50 consumer_name: String,
52}
53
54impl std::fmt::Debug for FlowRuntime {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("FlowRuntime")
57 .field("flows", &self.flows.len())
58 .field("consumer_name", &self.consumer_name)
59 .finish()
60 }
61}
62
63impl FlowRuntime {
64 pub fn new(
66 flows: Vec<CompiledFlow>,
67 event_log: Arc<dyn EventLog>,
68 link_service: Arc<dyn LinkService>,
69 entity_fetchers: HashMap<String, Arc<dyn EntityFetcher>>,
70 ) -> Self {
71 Self {
72 flows,
73 event_log,
74 link_service,
75 entity_fetchers,
76 sink_registry: None,
77 consumer_name: "flow-runtime".to_string(),
78 }
79 }
80
81 pub fn with_consumer_name(mut self, name: impl Into<String>) -> Self {
83 self.consumer_name = name.into();
84 self
85 }
86
87 pub fn with_sink_registry(mut self, registry: Arc<SinkRegistry>) -> Self {
92 self.sink_registry = Some(registry);
93 self
94 }
95
96 pub fn run(self, position: SeekPosition) -> JoinHandle<()> {
101 tokio::spawn(async move {
102 if let Err(e) = self.run_inner(position).await {
103 tracing::error!(error = %e, "flow runtime stopped with error");
104 }
105 })
106 }
107
108 async fn run_inner(self, position: SeekPosition) -> anyhow::Result<()> {
110 tracing::info!(
111 flows = self.flows.len(),
112 consumer = %self.consumer_name,
113 "flow runtime starting"
114 );
115
116 let mut stream = self
117 .event_log
118 .subscribe(&self.consumer_name, position)
119 .await?;
120
121 while let Some(envelope) = stream.next().await {
122 let event = &envelope.event;
123
124 for flow in &self.flows {
126 if flow.matcher.matches(event) {
127 tracing::debug!(
128 flow = %flow.name,
129 event_kind = %event.event_kind(),
130 "flow matched, executing pipeline"
131 );
132
133 let mut ctx = FlowContext::new(
135 event.clone(),
136 self.link_service.clone(),
137 self.entity_fetchers.clone(),
138 );
139
140 if let Some(ref registry) = self.sink_registry {
142 ctx.sink_registry = Some(registry.clone());
143 }
144
145 if let Err(e) = execute_pipeline(&flow.operators, &mut ctx).await {
147 tracing::warn!(
148 flow = %flow.name,
149 error = %e,
150 "pipeline execution failed"
151 );
152 }
153 }
154 }
155
156 if let Some(seq) = envelope.seq_no
158 && let Err(e) = self.event_log.ack(&self.consumer_name, seq).await
159 {
160 tracing::warn!(error = %e, "failed to ack event");
161 }
162 }
163
164 tracing::info!("flow runtime stream ended");
165 Ok(())
166 }
167}
168
169fn execute_pipeline<'a>(
174 operators: &'a [Box<dyn PipelineOperator>],
175 ctx: &'a mut FlowContext,
176) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + 'a>> {
177 Box::pin(async move {
178 for (i, op) in operators.iter().enumerate() {
179 match op.execute(ctx).await? {
180 OpResult::Continue => {
181 }
183 OpResult::Drop => {
184 tracing::debug!(operator = %op.name(), "event dropped by operator");
185 return Ok(());
186 }
187 OpResult::FanOut(contexts) => {
188 tracing::debug!(
189 operator = %op.name(),
190 count = contexts.len(),
191 "fan-out: processing remaining pipeline for each context"
192 );
193
194 let remaining = &operators[i + 1..];
196 for mut sub_ctx in contexts {
197 if let Err(e) = execute_pipeline(remaining, &mut sub_ctx).await {
198 tracing::warn!(
199 operator = %op.name(),
200 error = %e,
201 "fan-out sub-pipeline failed"
202 );
203 }
204 }
205 return Ok(());
206 }
207 }
208 }
209
210 Ok(())
211 })
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use crate::config::events::*;
218 use crate::core::events::{EntityEvent, EventEnvelope, FrameworkEvent, LinkEvent};
219 use crate::events::compiler::compile_flow;
220 use crate::events::memory::InMemoryEventLog;
221 use serde_json::json;
222 use std::sync::Arc;
223 use uuid::Uuid;
224
225 struct MockLinkService;
228
229 #[async_trait::async_trait]
230 impl LinkService for MockLinkService {
231 async fn create(
232 &self,
233 _: crate::core::link::LinkEntity,
234 ) -> anyhow::Result<crate::core::link::LinkEntity> {
235 unimplemented!()
236 }
237 async fn get(&self, _: &Uuid) -> anyhow::Result<Option<crate::core::link::LinkEntity>> {
238 unimplemented!()
239 }
240 async fn list(&self) -> anyhow::Result<Vec<crate::core::link::LinkEntity>> {
241 unimplemented!()
242 }
243 async fn find_by_source(
244 &self,
245 _: &Uuid,
246 _: Option<&str>,
247 _: Option<&str>,
248 ) -> anyhow::Result<Vec<crate::core::link::LinkEntity>> {
249 Ok(vec![])
250 }
251 async fn find_by_target(
252 &self,
253 _: &Uuid,
254 _: Option<&str>,
255 _: Option<&str>,
256 ) -> anyhow::Result<Vec<crate::core::link::LinkEntity>> {
257 Ok(vec![])
258 }
259 async fn update(
260 &self,
261 _: &Uuid,
262 _: crate::core::link::LinkEntity,
263 ) -> anyhow::Result<crate::core::link::LinkEntity> {
264 unimplemented!()
265 }
266 async fn delete(&self, _: &Uuid) -> anyhow::Result<()> {
267 unimplemented!()
268 }
269 async fn delete_by_entity(&self, _: &Uuid) -> anyhow::Result<()> {
270 unimplemented!()
271 }
272 }
273
274 struct MockEntityFetcher;
277
278 #[async_trait::async_trait]
279 impl EntityFetcher for MockEntityFetcher {
280 async fn fetch_as_json(&self, id: &Uuid) -> anyhow::Result<serde_json::Value> {
281 Ok(json!({"id": id.to_string(), "name": "TestUser"}))
282 }
283 }
284
285 fn make_link_event(link_type: &str) -> FrameworkEvent {
288 FrameworkEvent::Link(LinkEvent::Created {
289 link_type: link_type.to_string(),
290 link_id: Uuid::new_v4(),
291 source_id: Uuid::new_v4(),
292 target_id: Uuid::new_v4(),
293 metadata: None,
294 })
295 }
296
297 fn make_entity_event(entity_type: &str) -> FrameworkEvent {
298 FrameworkEvent::Entity(EntityEvent::Created {
299 entity_type: entity_type.to_string(),
300 entity_id: Uuid::new_v4(),
301 data: json!({"name": "test"}),
302 })
303 }
304
305 #[tokio::test]
308 async fn test_runtime_dispatches_matching_event() {
309 let event_log = Arc::new(InMemoryEventLog::new());
310
311 let flow = compile_flow(&FlowConfig {
313 name: "follow_notif".to_string(),
314 description: None,
315 trigger: TriggerConfig {
316 kind: "link.created".to_string(),
317 link_type: Some("follows".to_string()),
318 entity_type: None,
319 },
320 pipeline: vec![
321 PipelineStep::Map(MapConfig {
322 template: json!({"title": "New follower!"}),
323 }),
324 PipelineStep::Deliver(DeliverConfig {
325 sink: Some("in_app".to_string()),
326 sinks: None,
327 }),
328 ],
329 })
330 .unwrap();
331
332 let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
333 let runtime = FlowRuntime::new(vec![flow], event_log.clone(), link_service, HashMap::new());
334
335 let handle = runtime.run(SeekPosition::Latest);
336
337 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
339 event_log
340 .append(EventEnvelope::new(make_link_event("follows")))
341 .await
342 .unwrap();
343
344 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
346
347 handle.abort();
348 }
350
351 #[tokio::test]
352 async fn test_runtime_ignores_non_matching_event() {
353 let event_log = Arc::new(InMemoryEventLog::new());
354
355 let flow = compile_flow(&FlowConfig {
356 name: "follow_notif".to_string(),
357 description: None,
358 trigger: TriggerConfig {
359 kind: "link.created".to_string(),
360 link_type: Some("follows".to_string()),
361 entity_type: None,
362 },
363 pipeline: vec![PipelineStep::Map(MapConfig {
364 template: json!({"title": "New follower!"}),
365 })],
366 })
367 .unwrap();
368
369 let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
370 let runtime = FlowRuntime::new(vec![flow], event_log.clone(), link_service, HashMap::new());
371
372 let handle = runtime.run(SeekPosition::Latest);
373
374 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
376 event_log
377 .append(EventEnvelope::new(make_link_event("likes")))
378 .await
379 .unwrap();
380
381 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
382 handle.abort();
383 }
385
386 #[tokio::test]
387 async fn test_runtime_multiple_flows() {
388 let event_log = Arc::new(InMemoryEventLog::new());
389
390 let flow1 = compile_flow(&FlowConfig {
391 name: "follow_flow".to_string(),
392 description: None,
393 trigger: TriggerConfig {
394 kind: "link.created".to_string(),
395 link_type: Some("follows".to_string()),
396 entity_type: None,
397 },
398 pipeline: vec![PipelineStep::Map(MapConfig {
399 template: json!({"type": "follow"}),
400 })],
401 })
402 .unwrap();
403
404 let flow2 = compile_flow(&FlowConfig {
405 name: "entity_flow".to_string(),
406 description: None,
407 trigger: TriggerConfig {
408 kind: "entity.created".to_string(),
409 link_type: None,
410 entity_type: Some("user".to_string()),
411 },
412 pipeline: vec![PipelineStep::Map(MapConfig {
413 template: json!({"type": "user_created"}),
414 })],
415 })
416 .unwrap();
417
418 let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
419 let runtime = FlowRuntime::new(
420 vec![flow1, flow2],
421 event_log.clone(),
422 link_service,
423 HashMap::new(),
424 );
425
426 let handle = runtime.run(SeekPosition::Latest);
427
428 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
429
430 event_log
432 .append(EventEnvelope::new(make_link_event("follows")))
433 .await
434 .unwrap();
435 event_log
436 .append(EventEnvelope::new(make_entity_event("user")))
437 .await
438 .unwrap();
439 event_log
440 .append(EventEnvelope::new(make_link_event("likes"))) .await
442 .unwrap();
443
444 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
445 handle.abort();
446 }
447
448 #[tokio::test]
449 async fn test_runtime_filter_drops_event() {
450 let event_log = Arc::new(InMemoryEventLog::new());
451
452 let flow = compile_flow(&FlowConfig {
453 name: "filtered_flow".to_string(),
454 description: None,
455 trigger: TriggerConfig {
456 kind: "entity.created".to_string(),
457 link_type: None,
458 entity_type: None,
459 },
460 pipeline: vec![
461 PipelineStep::Filter(FilterConfig {
463 condition: "entity_type == \"admin\"".to_string(),
464 }),
465 PipelineStep::Map(MapConfig {
466 template: json!({"title": "should not reach here"}),
467 }),
468 ],
469 })
470 .unwrap();
471
472 let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
473 let runtime = FlowRuntime::new(vec![flow], event_log.clone(), link_service, HashMap::new());
474
475 let handle = runtime.run(SeekPosition::Latest);
476
477 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
478 event_log
479 .append(EventEnvelope::new(make_entity_event("user"))) .await
481 .unwrap();
482
483 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
484 handle.abort();
485 }
486
487 #[tokio::test]
488 async fn test_runtime_resolve_and_map() {
489 let event_log = Arc::new(InMemoryEventLog::new());
490
491 let flow = compile_flow(&FlowConfig {
492 name: "resolve_map_flow".to_string(),
493 description: None,
494 trigger: TriggerConfig {
495 kind: "link.created".to_string(),
496 link_type: Some("follows".to_string()),
497 entity_type: None,
498 },
499 pipeline: vec![
500 PipelineStep::Resolve(ResolveConfig {
501 from: "source_id".to_string(),
502 via: None,
503 direction: "forward".to_string(),
504 output_var: "source".to_string(),
505 }),
506 PipelineStep::Map(MapConfig {
507 template: json!({
508 "title": "{{ source.name }} followed you",
509 "source_id": "{{ source_id }}"
510 }),
511 }),
512 PipelineStep::Deliver(DeliverConfig {
513 sink: Some("in_app".to_string()),
514 sinks: None,
515 }),
516 ],
517 })
518 .unwrap();
519
520 let fetcher = Arc::new(MockEntityFetcher) as Arc<dyn EntityFetcher>;
521 let mut fetchers = HashMap::new();
522 fetchers.insert("user".to_string(), fetcher);
523
524 let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
525 let runtime = FlowRuntime::new(vec![flow], event_log.clone(), link_service, fetchers);
526
527 let handle = runtime.run(SeekPosition::Latest);
528
529 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
530 event_log
531 .append(EventEnvelope::new(make_link_event("follows")))
532 .await
533 .unwrap();
534
535 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
536 handle.abort();
537 }
538
539 #[tokio::test]
542 async fn test_execute_pipeline_end_to_end() {
543 let ops: Vec<Box<dyn PipelineOperator>> = vec![
544 Box::new(
545 crate::events::operators::FilterOp::from_config(&FilterConfig {
546 condition: "entity_type == \"user\"".to_string(),
547 })
548 .unwrap(),
549 ),
550 Box::new(crate::events::operators::MapOp::from_config(&MapConfig {
551 template: json!({"msg": "Hello {{ entity_type }}"}),
552 })),
553 ];
554
555 let event = FrameworkEvent::Entity(EntityEvent::Created {
556 entity_type: "user".to_string(),
557 entity_id: Uuid::new_v4(),
558 data: json!({}),
559 });
560
561 let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
562 let mut ctx = FlowContext::new(event, link_service, HashMap::new());
563
564 execute_pipeline(&ops, &mut ctx).await.unwrap();
565
566 let payload = ctx.get_var("_payload").unwrap();
568 assert_eq!(payload["msg"], "Hello user");
569 }
570
571 #[tokio::test]
572 async fn test_execute_pipeline_filter_drops() {
573 let ops: Vec<Box<dyn PipelineOperator>> = vec![
574 Box::new(
575 crate::events::operators::FilterOp::from_config(&FilterConfig {
576 condition: "entity_type == \"admin\"".to_string(),
577 })
578 .unwrap(),
579 ),
580 Box::new(crate::events::operators::MapOp::from_config(&MapConfig {
581 template: json!({"msg": "should not reach"}),
582 })),
583 ];
584
585 let event = FrameworkEvent::Entity(EntityEvent::Created {
586 entity_type: "user".to_string(),
587 entity_id: Uuid::new_v4(),
588 data: json!({}),
589 });
590
591 let link_service = Arc::new(MockLinkService) as Arc<dyn LinkService>;
592 let mut ctx = FlowContext::new(event, link_service, HashMap::new());
593
594 execute_pipeline(&ops, &mut ctx).await.unwrap();
595
596 assert!(ctx.get_var("_payload").is_none());
598 }
599}