reifydb_sub_column/
subsystem.rs1use std::{
5 any::Any,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 time::Duration,
11};
12
13use reifydb_column::registry::SnapshotRegistry;
14use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
15use reifydb_runtime::actor::mailbox::ActorRef;
16use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
17use reifydb_type::Result;
18use tracing::{debug, info};
19
20use crate::actor::{SeriesMessage, TableMessage};
21
22#[derive(Clone, Debug)]
23pub struct StorageConfig {
24 pub table_tick_interval: Duration,
25 pub series_tick_interval: Duration,
26 pub series_bucket_width: u64,
29 pub series_grace: Duration,
32}
33
34impl Default for StorageConfig {
35 fn default() -> Self {
36 Self {
37 table_tick_interval: Duration::from_secs(1),
38 series_tick_interval: Duration::from_secs(1),
39 series_bucket_width: 3_600 * 1_000_000_000,
41 series_grace: Duration::from_secs(5),
42 }
43 }
44}
45
46pub struct StorageSubsystem {
52 registry: SnapshotRegistry,
53 table_ref: ActorRef<TableMessage>,
54 series_ref: ActorRef<SeriesMessage>,
55 running: Arc<AtomicBool>,
56}
57
58impl StorageSubsystem {
59 pub fn new(
60 registry: SnapshotRegistry,
61 table_ref: ActorRef<TableMessage>,
62 series_ref: ActorRef<SeriesMessage>,
63 ) -> Self {
64 Self {
65 registry,
66 table_ref,
67 series_ref,
68 running: Arc::new(AtomicBool::new(false)),
69 }
70 }
71
72 pub fn registry(&self) -> SnapshotRegistry {
73 self.registry.clone()
74 }
75}
76
77impl HasVersion for StorageSubsystem {
78 fn version(&self) -> SystemVersion {
79 SystemVersion {
80 name: "sub-column".to_string(),
81 version: env!("CARGO_PKG_VERSION").to_string(),
82 description: "Columnar snapshot materialization subsystem".to_string(),
83 r#type: ComponentType::Subsystem,
84 }
85 }
86}
87
88impl Subsystem for StorageSubsystem {
89 fn name(&self) -> &'static str {
90 "Storage"
91 }
92
93 fn start(&mut self) -> Result<()> {
94 if self.running.swap(true, Ordering::SeqCst) {
95 return Ok(());
96 }
97 info!("Storage (columnar materialization) subsystem started");
98 Ok(())
99 }
100
101 fn shutdown(&mut self) -> Result<()> {
102 if !self.running.swap(false, Ordering::SeqCst) {
103 return Ok(());
104 }
105 let _ = self.table_ref.send(TableMessage::Shutdown);
109 let _ = self.series_ref.send(SeriesMessage::Shutdown);
110 debug!("Storage subsystem shutdown signalled");
111 Ok(())
112 }
113
114 fn is_running(&self) -> bool {
115 self.running.load(Ordering::SeqCst)
116 }
117
118 fn health_status(&self) -> HealthStatus {
119 if self.running.load(Ordering::SeqCst) {
120 HealthStatus::Healthy
121 } else {
122 HealthStatus::Failed {
123 description: "Storage subsystem not running".to_string(),
124 }
125 }
126 }
127
128 fn as_any(&self) -> &dyn Any {
129 self
130 }
131
132 fn as_any_mut(&mut self) -> &mut dyn Any {
133 self
134 }
135}