reifydb_sub_worker/
factory.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{Result, interceptor::StandardInterceptorBuilder, util::ioc::IocContainer};
5use reifydb_engine::{StandardCommandTransaction, StandardEngine};
6use reifydb_sub_api::{Subsystem, SubsystemFactory};
7
8use super::{WorkerBuilder, WorkerConfig, WorkerSubsystem};
9
10/// Configuration function for the worker pool subsystem
11pub type WorkerPoolConfigurator = Box<dyn FnOnce(WorkerBuilder) -> WorkerBuilder + Send>;
12
13/// Factory for creating WorkerPoolSubsystem instances
14pub struct WorkerSubsystemFactory {
15	configurator: Option<WorkerPoolConfigurator>,
16}
17
18impl WorkerSubsystemFactory {
19	/// Create a new factory with default configuration
20	pub fn new() -> Self {
21		Self {
22			configurator: None,
23		}
24	}
25
26	/// Create a factory with a custom configurator
27	pub fn with_configurator<F>(configurator: F) -> Self
28	where
29		F: FnOnce(WorkerBuilder) -> WorkerBuilder + Send + 'static,
30	{
31		Self {
32			configurator: Some(Box::new(configurator)),
33		}
34	}
35
36	/// Create a new factory with custom configuration (legacy method)
37	pub fn with_config(config: WorkerConfig) -> Self {
38		Self::with_configurator(move |_| {
39			WorkerBuilder::new()
40				.num_workers(config.num_workers)
41				.max_queue_size(config.max_queue_size)
42				.scheduler_interval(config.scheduler_interval)
43				.task_timeout_warning(config.task_timeout_warning)
44		})
45	}
46}
47
48impl Default for WorkerSubsystemFactory {
49	fn default() -> Self {
50		Self::new()
51	}
52}
53
54impl SubsystemFactory<StandardCommandTransaction> for WorkerSubsystemFactory {
55	fn provide_interceptors(
56		&self,
57		builder: StandardInterceptorBuilder<StandardCommandTransaction>,
58		_ioc: &IocContainer,
59	) -> StandardInterceptorBuilder<StandardCommandTransaction> {
60		// WorkerPool doesn't need any interceptors
61		builder
62	}
63
64	fn create(self: Box<Self>, ioc: &IocContainer) -> Result<Box<dyn Subsystem>> {
65		// Build WorkerSubsystem configuration
66		let builder = if let Some(configurator) = self.configurator {
67			configurator(WorkerBuilder::new())
68		} else {
69			WorkerBuilder::default()
70		};
71
72		// Get the StandardEngine from IoC
73		let engine = ioc.resolve::<StandardEngine>()?;
74
75		// Create subsystem
76		let config = builder.build();
77		let subsystem = WorkerSubsystem::new(config, engine);
78
79		Ok(Box::new(subsystem))
80	}
81}