Skip to main content

reifydb_sub_column/
subsystem.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::Any,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10	time::Duration,
11};
12
13use reifydb_column::registry::SnapshotRegistry;
14use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
15use reifydb_runtime::actor::mailbox::ActorRef;
16use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
17use reifydb_type::Result;
18use tracing::{debug, info};
19
20use crate::actor::{SeriesMessage, TableMessage};
21
22#[derive(Clone, Debug)]
23pub struct StorageConfig {
24	pub table_tick_interval: Duration,
25	pub series_tick_interval: Duration,
26	// Default bucket width for series (in key units: ms/us/ns/s depending on
27	// the series' `TimestampPrecision`, or plain u64 for integer keys).
28	pub series_bucket_width: u64,
29	// Wall-clock grace before a DateTime series bucket is considered closed.
30	// Ignored for integer-keyed series.
31	pub series_grace: Duration,
32}
33
34impl Default for StorageConfig {
35	fn default() -> Self {
36		Self {
37			table_tick_interval: Duration::from_secs(1),
38			series_tick_interval: Duration::from_secs(1),
39			// 1 hour in nanoseconds - reasonable default for a DateTime series.
40			series_bucket_width: 3_600 * 1_000_000_000,
41			series_grace: Duration::from_secs(5),
42		}
43	}
44}
45
46// `StorageSubsystem` is a thin lifecycle marker: the factory spawns both actors
47// during `create()`, and this struct just holds clones of their refs so
48// `shutdown()` can deliver an explicit stop signal. Joining actually happens
49// via actor-system shutdown on `Database::stop()` - same pattern as
50// `MetricSubsystem`.
51pub struct StorageSubsystem {
52	registry: SnapshotRegistry,
53	table_ref: ActorRef<TableMessage>,
54	series_ref: ActorRef<SeriesMessage>,
55	running: Arc<AtomicBool>,
56}
57
58impl StorageSubsystem {
59	pub fn new(
60		registry: SnapshotRegistry,
61		table_ref: ActorRef<TableMessage>,
62		series_ref: ActorRef<SeriesMessage>,
63	) -> Self {
64		Self {
65			registry,
66			table_ref,
67			series_ref,
68			running: Arc::new(AtomicBool::new(false)),
69		}
70	}
71
72	pub fn registry(&self) -> SnapshotRegistry {
73		self.registry.clone()
74	}
75}
76
77impl HasVersion for StorageSubsystem {
78	fn version(&self) -> SystemVersion {
79		SystemVersion {
80			name: "sub-column".to_string(),
81			version: env!("CARGO_PKG_VERSION").to_string(),
82			description: "Columnar snapshot materialization subsystem".to_string(),
83			r#type: ComponentType::Subsystem,
84		}
85	}
86}
87
88impl Subsystem for StorageSubsystem {
89	fn name(&self) -> &'static str {
90		"Storage"
91	}
92
93	fn start(&mut self) -> Result<()> {
94		if self.running.swap(true, Ordering::SeqCst) {
95			return Ok(());
96		}
97		info!("Storage (columnar materialization) subsystem started");
98		Ok(())
99	}
100
101	fn shutdown(&mut self) -> Result<()> {
102		if !self.running.swap(false, Ordering::SeqCst) {
103			return Ok(());
104		}
105		// Best-effort stop messages; actor-system shutdown is the authoritative
106		// join point. A closed mailbox (`SendError::Closed`) means the actor has
107		// already stopped, which is fine.
108		let _ = self.table_ref.send(TableMessage::Shutdown);
109		let _ = self.series_ref.send(SeriesMessage::Shutdown);
110		debug!("Storage subsystem shutdown signalled");
111		Ok(())
112	}
113
114	fn is_running(&self) -> bool {
115		self.running.load(Ordering::SeqCst)
116	}
117
118	fn health_status(&self) -> HealthStatus {
119		if self.running.load(Ordering::SeqCst) {
120			HealthStatus::Healthy
121		} else {
122			HealthStatus::Failed {
123				description: "Storage subsystem not running".to_string(),
124			}
125		}
126	}
127
128	fn as_any(&self) -> &dyn Any {
129		self
130	}
131
132	fn as_any_mut(&mut self) -> &mut dyn Any {
133		self
134	}
135}