use std::{
any::Any,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
use reifydb_column::registry::SnapshotRegistry;
use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
use reifydb_runtime::actor::mailbox::ActorRef;
use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
use reifydb_type::Result;
use tracing::{debug, info};
use crate::actor::{SeriesMessage, TableMessage};
#[derive(Clone, Debug)]
pub struct StorageConfig {
pub table_tick_interval: Duration,
pub series_tick_interval: Duration,
pub series_bucket_width: u64,
pub series_grace: Duration,
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
table_tick_interval: Duration::from_secs(1),
series_tick_interval: Duration::from_secs(1),
series_bucket_width: 3_600 * 1_000_000_000,
series_grace: Duration::from_secs(5),
}
}
}
pub struct StorageSubsystem {
registry: SnapshotRegistry,
table_ref: ActorRef<TableMessage>,
series_ref: ActorRef<SeriesMessage>,
running: Arc<AtomicBool>,
}
impl StorageSubsystem {
pub fn new(
registry: SnapshotRegistry,
table_ref: ActorRef<TableMessage>,
series_ref: ActorRef<SeriesMessage>,
) -> Self {
Self {
registry,
table_ref,
series_ref,
running: Arc::new(AtomicBool::new(false)),
}
}
pub fn registry(&self) -> SnapshotRegistry {
self.registry.clone()
}
}
impl HasVersion for StorageSubsystem {
fn version(&self) -> SystemVersion {
SystemVersion {
name: "sub-column".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
description: "Columnar snapshot materialization subsystem".to_string(),
r#type: ComponentType::Subsystem,
}
}
}
impl Subsystem for StorageSubsystem {
fn name(&self) -> &'static str {
"Storage"
}
fn start(&mut self) -> Result<()> {
if self.running.swap(true, Ordering::SeqCst) {
return Ok(());
}
info!("Storage (columnar materialization) subsystem started");
Ok(())
}
fn shutdown(&mut self) -> Result<()> {
if !self.running.swap(false, Ordering::SeqCst) {
return Ok(());
}
let _ = self.table_ref.send(TableMessage::Shutdown);
let _ = self.series_ref.send(SeriesMessage::Shutdown);
debug!("Storage subsystem shutdown signalled");
Ok(())
}
fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
fn health_status(&self) -> HealthStatus {
if self.running.load(Ordering::SeqCst) {
HealthStatus::Healthy
} else {
HealthStatus::Failed {
description: "Storage subsystem not running".to_string(),
}
}
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}