use std::sync::Arc;
use std::time::SystemTime;
use async_trait::async_trait;
use crate::config::{StorageConfig, TierConfig};
use crate::storage::partition::PartitionGranularity;
use crate::storage::plainpb::{FdBudget, PlainPbStoragePlugin};
use crate::storage::traits::{AppendMeta, EventStream, StoragePlugin, StoreSummary};
use crate::types::{ArchDbType, ArchiverSample};
const STS_DEFAULT_FD_CAP: usize = 512;
const MTS_DEFAULT_FD_CAP: usize = 64;
const LTS_DEFAULT_FD_CAP: usize = 64;
fn tier_fd_cap(tier: &TierConfig, default_cap: usize) -> usize {
match tier.max_open_writers {
Some(0) => usize::MAX,
Some(n) => n,
None => default_cap,
}
}
pub struct TieredStorage {
pub sts: Arc<PlainPbStoragePlugin>,
pub mts: Arc<PlainPbStoragePlugin>,
pub lts: Arc<PlainPbStoragePlugin>,
}
impl TieredStorage {
pub fn from_config(config: &StorageConfig) -> Self {
let (sts_budget, mts_budget, lts_budget) = match config.max_open_writers_total {
Some(0) => (
FdBudget::unbounded(),
FdBudget::unbounded(),
FdBudget::unbounded(),
),
Some(total) => {
let shared = FdBudget::new(total);
(shared.clone(), shared.clone(), shared)
}
None => (
FdBudget::new(tier_fd_cap(&config.sts, STS_DEFAULT_FD_CAP)),
FdBudget::new(tier_fd_cap(&config.mts, MTS_DEFAULT_FD_CAP)),
FdBudget::new(tier_fd_cap(&config.lts, LTS_DEFAULT_FD_CAP)),
),
};
Self {
sts: Arc::new(PlainPbStoragePlugin::with_fd_budget(
"STS",
config.sts.root_folder.clone(),
config.sts.partition_granularity,
sts_budget,
)),
mts: Arc::new(PlainPbStoragePlugin::with_fd_budget(
"MTS",
config.mts.root_folder.clone(),
config.mts.partition_granularity,
mts_budget,
)),
lts: Arc::new(PlainPbStoragePlugin::with_fd_budget(
"LTS",
config.lts.root_folder.clone(),
config.lts.partition_granularity,
lts_budget,
)),
}
}
pub fn read_order(&self) -> Vec<Arc<PlainPbStoragePlugin>> {
vec![self.lts.clone(), self.mts.clone(), self.sts.clone()]
}
}
#[async_trait]
impl StoragePlugin for TieredStorage {
fn name(&self) -> &str {
"TieredStorage"
}
fn partition_granularity(&self) -> PartitionGranularity {
self.sts.partition_granularity()
}
async fn append_event(
&self,
pv: &str,
dbr_type: ArchDbType,
sample: &ArchiverSample,
) -> anyhow::Result<()> {
self.sts.append_event(pv, dbr_type, sample).await
}
async fn append_event_with_meta(
&self,
pv: &str,
dbr_type: ArchDbType,
sample: &ArchiverSample,
meta: &AppendMeta,
) -> anyhow::Result<()> {
self.sts
.append_event_with_meta(pv, dbr_type, sample, meta)
.await
}
async fn get_data(
&self,
pv: &str,
start: SystemTime,
end: SystemTime,
) -> anyhow::Result<Vec<Box<dyn EventStream>>> {
let mut all_streams = Vec::new();
for tier in self.read_order() {
if crate::flags::skip_tier_for_retrieval(tier.name()) {
tracing::debug!(tier = tier.name(), pv, "skipping tier for retrieval");
continue;
}
let mut streams = tier.get_data(pv, start, end).await?;
all_streams.append(&mut streams);
}
Ok(all_streams)
}
async fn get_last_known_event(&self, pv: &str) -> anyhow::Result<Option<ArchiverSample>> {
if !crate::flags::skip_tier_for_retrieval(self.sts.name())
&& let Some(sample) = self.sts.get_last_known_event(pv).await?
{
return Ok(Some(sample));
}
if !crate::flags::skip_tier_for_retrieval(self.mts.name())
&& let Some(sample) = self.mts.get_last_known_event(pv).await?
{
return Ok(Some(sample));
}
if crate::flags::skip_tier_for_retrieval(self.lts.name()) {
return Ok(None);
}
self.lts.get_last_known_event(pv).await
}
async fn get_last_event_before(
&self,
pv: &str,
target: SystemTime,
) -> anyhow::Result<Option<ArchiverSample>> {
if !crate::flags::skip_tier_for_retrieval(self.sts.name())
&& let Some(sample) = self.sts.get_last_event_before(pv, target).await?
{
return Ok(Some(sample));
}
if !crate::flags::skip_tier_for_retrieval(self.mts.name())
&& let Some(sample) = self.mts.get_last_event_before(pv, target).await?
{
return Ok(Some(sample));
}
if crate::flags::skip_tier_for_retrieval(self.lts.name()) {
return Ok(None);
}
self.lts.get_last_event_before(pv, target).await
}
async fn delete_pv_data(&self, pv: &str) -> anyhow::Result<u64> {
let sts_count = self.sts.delete_pv_data(pv).await?;
let mts_count = self.mts.delete_pv_data(pv).await?;
let lts_count = self.lts.delete_pv_data(pv).await?;
Ok(sts_count + mts_count + lts_count)
}
async fn flush_writes(&self) -> anyhow::Result<()> {
self.sts.flush_writes().await?;
self.mts.flush_writes().await?;
self.lts.flush_writes().await?;
Ok(())
}
async fn flush_ingest_writes(
&self,
) -> anyhow::Result<crate::storage::traits::IngestFlushResult> {
self.sts.flush_ingest_writes().await
}
fn stores_for_pv(&self, pv: &str) -> anyhow::Result<Vec<StoreSummary>> {
let mut all = Vec::with_capacity(3);
all.extend(self.sts.stores_for_pv(pv)?);
all.extend(self.mts.stores_for_pv(pv)?);
all.extend(self.lts.stores_for_pv(pv)?);
Ok(all)
}
fn appliance_metrics(&self) -> anyhow::Result<Vec<StoreSummary>> {
let mut all = Vec::with_capacity(3);
all.extend(self.sts.appliance_metrics()?);
all.extend(self.mts.appliance_metrics()?);
all.extend(self.lts.appliance_metrics()?);
Ok(all)
}
async fn rename_pv(&self, from: &str, to: &str) -> anyhow::Result<u64> {
let s = self.sts.rename_pv(from, to).await?;
let m = self.mts.rename_pv(from, to).await?;
let l = self.lts.rename_pv(from, to).await?;
Ok(s + m + l)
}
}