reifydb_sub_worker/
factory.rs1use reifydb_core::{Result, interceptor::StandardInterceptorBuilder, util::ioc::IocContainer};
5use reifydb_engine::{StandardCommandTransaction, StandardEngine};
6use reifydb_sub_api::{Subsystem, SubsystemFactory};
7
8use super::{WorkerBuilder, WorkerConfig, WorkerSubsystem};
9
10pub type WorkerPoolConfigurator = Box<dyn FnOnce(WorkerBuilder) -> WorkerBuilder + Send>;
12
13pub struct WorkerSubsystemFactory {
15 configurator: Option<WorkerPoolConfigurator>,
16}
17
18impl WorkerSubsystemFactory {
19 pub fn new() -> Self {
21 Self {
22 configurator: None,
23 }
24 }
25
26 pub fn with_configurator<F>(configurator: F) -> Self
28 where
29 F: FnOnce(WorkerBuilder) -> WorkerBuilder + Send + 'static,
30 {
31 Self {
32 configurator: Some(Box::new(configurator)),
33 }
34 }
35
36 pub fn with_config(config: WorkerConfig) -> Self {
38 Self::with_configurator(move |_| {
39 WorkerBuilder::new()
40 .num_workers(config.num_workers)
41 .max_queue_size(config.max_queue_size)
42 .scheduler_interval(config.scheduler_interval)
43 .task_timeout_warning(config.task_timeout_warning)
44 })
45 }
46}
47
48impl Default for WorkerSubsystemFactory {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl SubsystemFactory<StandardCommandTransaction> for WorkerSubsystemFactory {
55 fn provide_interceptors(
56 &self,
57 builder: StandardInterceptorBuilder<StandardCommandTransaction>,
58 _ioc: &IocContainer,
59 ) -> StandardInterceptorBuilder<StandardCommandTransaction> {
60 builder
62 }
63
64 fn create(self: Box<Self>, ioc: &IocContainer) -> Result<Box<dyn Subsystem>> {
65 let builder = if let Some(configurator) = self.configurator {
67 configurator(WorkerBuilder::new())
68 } else {
69 WorkerBuilder::default()
70 };
71
72 let engine = ioc.resolve::<StandardEngine>()?;
74
75 let config = builder.build();
77 let subsystem = WorkerSubsystem::new(config, engine);
78
79 Ok(Box::new(subsystem))
80 }
81}