Skip to main content

reifydb_sub_column/
factory.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_column::{
5	compress::{CompressConfig, Compressor},
6	registry::SnapshotRegistry,
7};
8use reifydb_core::util::ioc::IocContainer;
9use reifydb_engine::engine::StandardEngine;
10use reifydb_runtime::SharedRuntime;
11use reifydb_sub_api::subsystem::{Subsystem, SubsystemFactory};
12use reifydb_type::Result;
13
14use crate::{
15	actor::{series::SeriesMaterializationActor, table::TableMaterializationActor},
16	subsystem::{StorageConfig, StorageSubsystem},
17};
18
19pub struct StorageSubsystemFactory {
20	config: StorageConfig,
21}
22
23impl StorageSubsystemFactory {
24	pub fn new(config: StorageConfig) -> Self {
25		Self {
26			config,
27		}
28	}
29}
30
31impl Default for StorageSubsystemFactory {
32	fn default() -> Self {
33		Self::new(StorageConfig::default())
34	}
35}
36
37impl SubsystemFactory for StorageSubsystemFactory {
38	// Resolves `SharedRuntime` + `StandardEngine` from the IoC container and
39	// spawns both materialization actors on the runtime's actor system. The
40	// returned `StorageSubsystem` holds the ref clones needed for an explicit
41	// shutdown signal; actor-system shutdown on `Database::stop()` performs
42	// the actual join.
43	fn create(self: Box<Self>, ioc: &IocContainer) -> Result<Box<dyn Subsystem>> {
44		let runtime = ioc.resolve::<SharedRuntime>()?;
45		let engine = ioc.resolve::<StandardEngine>()?;
46		let actor_system = runtime.actor_system();
47		let registry = SnapshotRegistry::new();
48
49		let table_actor = TableMaterializationActor::new(
50			engine.clone(),
51			registry.clone(),
52			Compressor::new(CompressConfig::default()),
53			self.config.table_tick_interval,
54		);
55		let table_handle = actor_system.spawn("storage-materialize-table", table_actor);
56		let table_ref = table_handle.actor_ref().clone();
57
58		let series_actor = SeriesMaterializationActor::new(
59			engine,
60			registry.clone(),
61			Compressor::new(CompressConfig::default()),
62			self.config.series_tick_interval,
63			self.config.series_bucket_width,
64			self.config.series_grace,
65		);
66		let series_handle = actor_system.spawn("storage-materialize-series", series_actor);
67		let series_ref = series_handle.actor_ref().clone();
68
69		Ok(Box::new(StorageSubsystem::new(registry, table_ref, series_ref)))
70	}
71}