Skip to main content

reifydb_sub_column/
subsystem.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::Any,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10	time::Duration,
11};
12
13use reifydb_column::registry::SnapshotRegistry;
14use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
15use reifydb_runtime::actor::mailbox::ActorRef;
16use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
17use reifydb_type::Result;
18use tracing::{debug, info};
19
20use crate::actor::{SeriesMessage, TableMessage};
21
22#[derive(Clone, Debug)]
23pub struct StorageConfig {
24	pub table_tick_interval: Duration,
25	pub series_tick_interval: Duration,
26
27	pub series_bucket_width: u64,
28
29	pub series_grace: Duration,
30}
31
32impl Default for StorageConfig {
33	fn default() -> Self {
34		Self {
35			table_tick_interval: Duration::from_secs(1),
36			series_tick_interval: Duration::from_secs(1),
37
38			series_bucket_width: 3_600 * 1_000_000_000,
39			series_grace: Duration::from_secs(5),
40		}
41	}
42}
43
44pub struct StorageSubsystem {
45	registry: SnapshotRegistry,
46	table_ref: ActorRef<TableMessage>,
47	series_ref: ActorRef<SeriesMessage>,
48	running: Arc<AtomicBool>,
49}
50
51impl StorageSubsystem {
52	pub fn new(
53		registry: SnapshotRegistry,
54		table_ref: ActorRef<TableMessage>,
55		series_ref: ActorRef<SeriesMessage>,
56	) -> Self {
57		Self {
58			registry,
59			table_ref,
60			series_ref,
61			running: Arc::new(AtomicBool::new(false)),
62		}
63	}
64
65	pub fn registry(&self) -> SnapshotRegistry {
66		self.registry.clone()
67	}
68}
69
70impl HasVersion for StorageSubsystem {
71	fn version(&self) -> SystemVersion {
72		SystemVersion {
73			name: "sub-column".to_string(),
74			version: env!("CARGO_PKG_VERSION").to_string(),
75			description: "Columnar snapshot materialization subsystem".to_string(),
76			r#type: ComponentType::Subsystem,
77		}
78	}
79}
80
81impl Subsystem for StorageSubsystem {
82	fn name(&self) -> &'static str {
83		"Storage"
84	}
85
86	fn start(&mut self) -> Result<()> {
87		if self.running.swap(true, Ordering::SeqCst) {
88			return Ok(());
89		}
90		info!("Storage (columnar materialization) subsystem started");
91		Ok(())
92	}
93
94	fn shutdown(&mut self) -> Result<()> {
95		if !self.running.swap(false, Ordering::SeqCst) {
96			return Ok(());
97		}
98
99		let _ = self.table_ref.send(TableMessage::Shutdown);
100		let _ = self.series_ref.send(SeriesMessage::Shutdown);
101		debug!("Storage subsystem shutdown signalled");
102		Ok(())
103	}
104
105	fn is_running(&self) -> bool {
106		self.running.load(Ordering::SeqCst)
107	}
108
109	fn health_status(&self) -> HealthStatus {
110		if self.running.load(Ordering::SeqCst) {
111			HealthStatus::Healthy
112		} else {
113			HealthStatus::Failed {
114				description: "Storage subsystem not running".to_string(),
115			}
116		}
117	}
118
119	fn as_any(&self) -> &dyn Any {
120		self
121	}
122
123	fn as_any_mut(&mut self) -> &mut dyn Any {
124		self
125	}
126}