reifydb_sub_flow/subsystem/
factory.rs1use async_trait::async_trait;
5use reifydb_core::{Result, interceptor::StandardInterceptorBuilder, util::ioc::IocContainer};
6use reifydb_engine::StandardCommandTransaction;
7use reifydb_sub_api::{Subsystem, SubsystemFactory};
8
9use super::{FlowSubsystem, intercept::TransactionalFlowInterceptor};
10use crate::builder::FlowBuilder;
11
12pub type FlowConfigurator = Box<dyn FnOnce(FlowBuilder) -> FlowBuilder + Send>;
14
15pub struct FlowSubsystemFactory {
17 configurator: Option<FlowConfigurator>,
18}
19
20impl FlowSubsystemFactory {
21 pub fn new() -> Self {
22 Self {
23 configurator: None,
24 }
25 }
26
27 pub fn with_configurator<F>(configurator: F) -> Self
28 where
29 F: FnOnce(FlowBuilder) -> FlowBuilder + Send + 'static,
30 {
31 Self {
32 configurator: Some(Box::new(configurator)),
33 }
34 }
35}
36
37impl Default for FlowSubsystemFactory {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43#[async_trait]
44impl SubsystemFactory<StandardCommandTransaction> for FlowSubsystemFactory {
45 fn provide_interceptors(
46 &self,
47 builder: StandardInterceptorBuilder<StandardCommandTransaction>,
48 ioc: &IocContainer,
49 ) -> StandardInterceptorBuilder<StandardCommandTransaction> {
50 let ioc = ioc.clone();
51 builder.add_factory(move |interceptors| {
52 interceptors.register(TransactionalFlowInterceptor::new(ioc.clone()));
53 })
54 }
55
56 async fn create(self: Box<Self>, ioc: &IocContainer) -> Result<Box<dyn Subsystem>> {
57 let builder = if let Some(configurator) = self.configurator {
58 configurator(FlowBuilder::new())
59 } else {
60 FlowBuilder::default()
61 };
62 let config = builder.build_config();
63 Ok(Box::new(FlowSubsystem::new(config, ioc).await?))
64 }
65}