reifydb_sub_flow/subsystem/
factory.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{Result, interceptor::StandardInterceptorBuilder, util::ioc::IocContainer};
5use reifydb_engine::StandardCommandTransaction;
6use reifydb_sub_api::{Subsystem, SubsystemFactory};
7
8use super::{FlowSubsystem, intercept::TransactionalFlowInterceptor};
9use crate::builder::FlowBuilder;
10
11/// Configuration function for the flow subsystem
12pub type FlowConfigurator = Box<dyn FnOnce(FlowBuilder) -> FlowBuilder + Send>;
13
14/// Factory for creating FlowSubsystem with proper interceptor registration
15pub struct FlowSubsystemFactory {
16	configurator: Option<FlowConfigurator>,
17}
18
19impl FlowSubsystemFactory {
20	pub fn new() -> Self {
21		Self {
22			configurator: None,
23		}
24	}
25
26	pub fn with_configurator<F>(configurator: F) -> Self
27	where
28		F: FnOnce(FlowBuilder) -> FlowBuilder + Send + 'static,
29	{
30		Self {
31			configurator: Some(Box::new(configurator)),
32		}
33	}
34}
35
36impl Default for FlowSubsystemFactory {
37	fn default() -> Self {
38		Self::new()
39	}
40}
41
42impl SubsystemFactory<StandardCommandTransaction> for FlowSubsystemFactory {
43	fn provide_interceptors(
44		&self,
45		builder: StandardInterceptorBuilder<StandardCommandTransaction>,
46		ioc: &IocContainer,
47	) -> StandardInterceptorBuilder<StandardCommandTransaction> {
48		let ioc = ioc.clone();
49		builder.add_factory(move |interceptors| {
50			interceptors.register(TransactionalFlowInterceptor::new(ioc.clone()));
51		})
52	}
53
54	fn create(self: Box<Self>, ioc: &IocContainer) -> Result<Box<dyn Subsystem>> {
55		let builder = if let Some(configurator) = self.configurator {
56			configurator(FlowBuilder::new())
57		} else {
58			FlowBuilder::default()
59		};
60		let config = builder.build_config();
61		Ok(Box::new(FlowSubsystem::new(config, ioc)?))
62	}
63}