reifydb_sub_flow/subsystem/
factory.rs1use reifydb_core::{Result, interceptor::StandardInterceptorBuilder, util::ioc::IocContainer};
5use reifydb_engine::StandardCommandTransaction;
6use reifydb_sub_api::{Subsystem, SubsystemFactory};
7
8use super::{FlowSubsystem, intercept::TransactionalFlowInterceptor};
9use crate::builder::FlowBuilder;
10
11pub type FlowConfigurator = Box<dyn FnOnce(FlowBuilder) -> FlowBuilder + Send>;
13
14pub struct FlowSubsystemFactory {
16 configurator: Option<FlowConfigurator>,
17}
18
19impl FlowSubsystemFactory {
20 pub fn new() -> Self {
21 Self {
22 configurator: None,
23 }
24 }
25
26 pub fn with_configurator<F>(configurator: F) -> Self
27 where
28 F: FnOnce(FlowBuilder) -> FlowBuilder + Send + 'static,
29 {
30 Self {
31 configurator: Some(Box::new(configurator)),
32 }
33 }
34}
35
36impl Default for FlowSubsystemFactory {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl SubsystemFactory<StandardCommandTransaction> for FlowSubsystemFactory {
43 fn provide_interceptors(
44 &self,
45 builder: StandardInterceptorBuilder<StandardCommandTransaction>,
46 ioc: &IocContainer,
47 ) -> StandardInterceptorBuilder<StandardCommandTransaction> {
48 let ioc = ioc.clone();
49 builder.add_factory(move |interceptors| {
50 interceptors.register(TransactionalFlowInterceptor::new(ioc.clone()));
51 })
52 }
53
54 fn create(self: Box<Self>, ioc: &IocContainer) -> Result<Box<dyn Subsystem>> {
55 let builder = if let Some(configurator) = self.configurator {
56 configurator(FlowBuilder::new())
57 } else {
58 FlowBuilder::default()
59 };
60 let config = builder.build_config();
61 Ok(Box::new(FlowSubsystem::new(config, ioc)?))
62 }
63}