reifydb_sub_column/
subsystem.rs1use std::{
5 any::Any,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 time::Duration,
11};
12
13use reifydb_column::registry::SnapshotRegistry;
14use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
15use reifydb_runtime::actor::mailbox::ActorRef;
16use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
17use reifydb_type::Result;
18use tracing::{debug, info};
19
20use crate::actor::{SeriesMessage, TableMessage};
21
22#[derive(Clone, Debug)]
23pub struct StorageConfig {
24 pub table_tick_interval: Duration,
25 pub series_tick_interval: Duration,
26
27 pub series_bucket_width: u64,
28
29 pub series_grace: Duration,
30}
31
32impl Default for StorageConfig {
33 fn default() -> Self {
34 Self {
35 table_tick_interval: Duration::from_secs(1),
36 series_tick_interval: Duration::from_secs(1),
37
38 series_bucket_width: 3_600 * 1_000_000_000,
39 series_grace: Duration::from_secs(5),
40 }
41 }
42}
43
44pub struct StorageSubsystem {
45 registry: SnapshotRegistry,
46 table_ref: ActorRef<TableMessage>,
47 series_ref: ActorRef<SeriesMessage>,
48 running: Arc<AtomicBool>,
49}
50
51impl StorageSubsystem {
52 pub fn new(
53 registry: SnapshotRegistry,
54 table_ref: ActorRef<TableMessage>,
55 series_ref: ActorRef<SeriesMessage>,
56 ) -> Self {
57 Self {
58 registry,
59 table_ref,
60 series_ref,
61 running: Arc::new(AtomicBool::new(false)),
62 }
63 }
64
65 pub fn registry(&self) -> SnapshotRegistry {
66 self.registry.clone()
67 }
68}
69
70impl HasVersion for StorageSubsystem {
71 fn version(&self) -> SystemVersion {
72 SystemVersion {
73 name: "sub-column".to_string(),
74 version: env!("CARGO_PKG_VERSION").to_string(),
75 description: "Columnar snapshot materialization subsystem".to_string(),
76 r#type: ComponentType::Subsystem,
77 }
78 }
79}
80
81impl Subsystem for StorageSubsystem {
82 fn name(&self) -> &'static str {
83 "Storage"
84 }
85
86 fn start(&mut self) -> Result<()> {
87 if self.running.swap(true, Ordering::SeqCst) {
88 return Ok(());
89 }
90 info!("Storage (columnar materialization) subsystem started");
91 Ok(())
92 }
93
94 fn shutdown(&mut self) -> Result<()> {
95 if !self.running.swap(false, Ordering::SeqCst) {
96 return Ok(());
97 }
98
99 let _ = self.table_ref.send(TableMessage::Shutdown);
100 let _ = self.series_ref.send(SeriesMessage::Shutdown);
101 debug!("Storage subsystem shutdown signalled");
102 Ok(())
103 }
104
105 fn is_running(&self) -> bool {
106 self.running.load(Ordering::SeqCst)
107 }
108
109 fn health_status(&self) -> HealthStatus {
110 if self.running.load(Ordering::SeqCst) {
111 HealthStatus::Healthy
112 } else {
113 HealthStatus::Failed {
114 description: "Storage subsystem not running".to_string(),
115 }
116 }
117 }
118
119 fn as_any(&self) -> &dyn Any {
120 self
121 }
122
123 fn as_any_mut(&mut self) -> &mut dyn Any {
124 self
125 }
126}