reifydb_sub_flow/subsystem/
factory.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use async_trait::async_trait;
5use reifydb_core::{Result, interceptor::StandardInterceptorBuilder, util::ioc::IocContainer};
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_sub_api::{Subsystem, SubsystemFactory};
8
9use super::{FlowSubsystem, intercept::TransactionalFlowInterceptor};
10use crate::builder::FlowBuilder;
11
12/// Configuration function for the flow subsystem
13pub type FlowConfigurator = Box<dyn FnOnce(FlowBuilder) -> FlowBuilder + Send>;
14
15/// Factory for creating FlowSubsystem with proper interceptor registration
16pub struct FlowSubsystemFactory {
17	configurator: Option<FlowConfigurator>,
18}
19
20impl FlowSubsystemFactory {
21	pub fn new() -> Self {
22		Self {
23			configurator: None,
24		}
25	}
26
27	pub fn with_configurator<F>(configurator: F) -> Self
28	where
29		F: FnOnce(FlowBuilder) -> FlowBuilder + Send + 'static,
30	{
31		Self {
32			configurator: Some(Box::new(configurator)),
33		}
34	}
35}
36
37impl Default for FlowSubsystemFactory {
38	fn default() -> Self {
39		Self::new()
40	}
41}
42
43#[async_trait]
44impl SubsystemFactory<StandardCommandTransaction> for FlowSubsystemFactory {
45	fn provide_interceptors(
46		&self,
47		builder: StandardInterceptorBuilder<StandardCommandTransaction>,
48		ioc: &IocContainer,
49	) -> StandardInterceptorBuilder<StandardCommandTransaction> {
50		let ioc = ioc.clone();
51		builder.add_factory(move |interceptors| {
52			interceptors.register(TransactionalFlowInterceptor::new(ioc.clone()));
53		})
54	}
55
56	async fn create(self: Box<Self>, ioc: &IocContainer) -> Result<Box<dyn Subsystem>> {
57		let builder = if let Some(configurator) = self.configurator {
58			configurator(FlowBuilder::new())
59		} else {
60			FlowBuilder::default()
61		};
62		let config = builder.build_config();
63		Ok(Box::new(FlowSubsystem::new(config, ioc).await?))
64	}
65}