use super::bucket::{BucketConfig, BucketManager};
use super::compress::LogDictionary;
use nodedb_types::timeseries::{FlushedSeries, IngestResult, LogEntry, MetricSample, SeriesId};
use super::memtable::{MemtableConfig, TimeseriesMemtable};
pub(super) const DEFAULT_TOTAL_BUDGET_BYTES: usize = 100 * 1024 * 1024;
#[derive(Debug, Clone)]
pub struct TimeseriesManagerConfig {
pub total_memory_budget: usize,
pub max_series: usize,
pub bucket_config: BucketConfig,
pub active_budget_fraction: f64,
}
impl Default for TimeseriesManagerConfig {
fn default() -> Self {
Self {
total_memory_budget: DEFAULT_TOTAL_BUDGET_BYTES,
max_series: 500_000,
bucket_config: BucketConfig::default(),
active_budget_fraction: 0.6,
}
}
}
pub struct TimeseriesManager {
active: TimeseriesMemtable,
frozen: Option<Vec<FlushedSeries>>,
bucket: BucketManager,
log_dict: Option<LogDictionary>,
config: TimeseriesManagerConfig,
flush_count: u64,
total_ingested: u64,
total_rejected: u64,
}
impl TimeseriesManager {
pub fn new(config: TimeseriesManagerConfig) -> Self {
let active_budget =
(config.total_memory_budget as f64 * config.active_budget_fraction) as usize;
let hard_limit = config.total_memory_budget;
let mt_config = MemtableConfig {
max_memory_bytes: active_budget,
max_series: config.max_series,
hard_memory_limit: hard_limit,
};
Self {
active: TimeseriesMemtable::with_config(mt_config),
frozen: None,
bucket: BucketManager::new(config.bucket_config.clone()),
log_dict: None,
config,
flush_count: 0,
total_ingested: 0,
total_rejected: 0,
}
}
pub fn set_log_dictionary(&mut self, dict: LogDictionary) {
self.log_dict = Some(dict);
}
pub fn ingest_metric(&mut self, series_id: SeriesId, sample: MetricSample) -> IngestResult {
self.do_ingest(|mt| mt.ingest_metric(series_id, sample))
}
pub fn ingest_log(&mut self, series_id: SeriesId, entry: LogEntry) -> IngestResult {
let entry_clone = entry.clone();
self.do_ingest(|mt| mt.ingest_log(series_id, entry_clone.clone()))
}
fn do_ingest(
&mut self,
ingest_fn: impl Fn(&mut TimeseriesMemtable) -> IngestResult,
) -> IngestResult {
let result = ingest_fn(&mut self.active);
match result {
IngestResult::Rejected => {
if self.frozen.is_some() {
self.flush_frozen();
}
let retry = ingest_fn(&mut self.active);
if retry.is_rejected() {
self.total_rejected += 1;
}
self.total_ingested += 1;
retry
}
_ => {
self.total_ingested += 1;
result
}
}
}
pub fn try_flush(&mut self) -> std::io::Result<usize> {
if self.frozen.is_some() {
self.flush_frozen();
}
let flushed_data = self.active.drain();
if flushed_data.is_empty() {
return Ok(0);
}
self.frozen = Some(flushed_data);
self.flush_frozen();
Ok(self.bucket.l1_segments_written() as usize)
}
fn flush_frozen(&mut self) {
let Some(data) = self.frozen.take() else {
return;
};
let dict = self.log_dict.as_ref();
match self.bucket.flush_to_l1(data, dict) {
Ok(count) => {
self.flush_count += 1;
tracing::info!(
segments = count,
flush_count = self.flush_count,
"timeseries L0→L1 flush complete"
);
}
Err(e) => {
tracing::error!(error = %e, "timeseries L0→L1 flush failed");
}
}
}
pub fn enforce_retention(&mut self, max_age_ms: i64, now_ms: i64) -> std::io::Result<usize> {
let cutoff = now_ms - max_age_ms;
let old_segments = self.bucket.segment_index().segments_older_than(cutoff);
let mut deleted = 0;
for (series_id, min_ts, seg) in &old_segments {
let full_path = self.config.bucket_config.l1_dir.join(&seg.path);
match std::fs::remove_file(&full_path) {
Ok(()) => {
self.bucket.segment_index_mut().remove(*series_id, *min_ts);
deleted += 1;
tracing::debug!(
path = %seg.path,
series_id = series_id,
"deleted expired segment"
);
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
self.bucket.segment_index_mut().remove(*series_id, *min_ts);
deleted += 1;
}
Err(e) => {
tracing::warn!(
path = %seg.path,
error = %e,
"failed to delete expired segment"
);
}
}
}
if deleted > 0 {
tracing::info!(
deleted,
cutoff_ms = cutoff,
"retention enforcement complete"
);
}
Ok(deleted)
}
pub fn compact_to_l2(
&mut self,
before_ts: i64,
) -> std::io::Result<super::bucket::CompactionResult> {
self.bucket.compact_to_l2(before_ts)
}
pub fn bucket(&self) -> &BucketManager {
&self.bucket
}
pub fn flush_count(&self) -> u64 {
self.flush_count
}
pub fn total_ingested(&self) -> u64 {
self.total_ingested
}
pub fn total_rejected(&self) -> u64 {
self.total_rejected
}
pub fn active_memory_bytes(&self) -> usize {
self.active.memory_bytes()
}
pub fn active_series_count(&self) -> usize {
self.active.series_count()
}
pub fn active_eviction_count(&self) -> u64 {
self.active.eviction_count()
}
pub fn has_frozen(&self) -> bool {
self.frozen.is_some()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn test_config(dir: &TempDir) -> TimeseriesManagerConfig {
TimeseriesManagerConfig {
total_memory_budget: 1024 * 1024, max_series: 100,
bucket_config: BucketConfig {
l1_dir: dir.path().join("l1"),
l2_dir: dir.path().join("l2"),
..Default::default()
},
active_budget_fraction: 0.6,
}
}
#[test]
fn basic_ingest_and_flush() {
let dir = TempDir::new().unwrap();
let mut mgr = TimeseriesManager::new(test_config(&dir));
for i in 0..100 {
mgr.ingest_metric(
1,
MetricSample {
timestamp_ms: i * 1000,
value: i as f64,
},
);
}
let segments = mgr.try_flush().unwrap();
assert!(segments > 0);
assert_eq!(mgr.flush_count(), 1);
assert_eq!(mgr.active_series_count(), 0);
}
#[test]
fn double_buffer_allows_continuous_ingest() {
let dir = TempDir::new().unwrap();
let config = TimeseriesManagerConfig {
total_memory_budget: 2048,
max_series: 10,
bucket_config: BucketConfig {
l1_dir: dir.path().join("l1"),
l2_dir: dir.path().join("l2"),
..Default::default()
},
active_budget_fraction: 0.6,
};
let mut mgr = TimeseriesManager::new(config);
let mut flush_needed = false;
for i in 0..500 {
let result = mgr.ingest_metric(
(i % 5) as u64,
MetricSample {
timestamp_ms: i * 1000,
value: i as f64,
},
);
if result.is_flush_needed() {
flush_needed = true;
mgr.try_flush().unwrap();
let post_flush = mgr.ingest_metric(
1,
MetricSample {
timestamp_ms: 999_999,
value: 0.0,
},
);
assert!(!post_flush.is_rejected());
break;
}
}
assert!(flush_needed, "should have triggered flush");
}
#[test]
fn retention_deletes_old_segments() {
let dir = TempDir::new().unwrap();
let mut mgr = TimeseriesManager::new(test_config(&dir));
mgr.ingest_metric(
1,
MetricSample {
timestamp_ms: 100,
value: 1.0,
},
);
mgr.ingest_metric(
2,
MetricSample {
timestamp_ms: 200,
value: 2.0,
},
);
mgr.try_flush().unwrap();
let deleted = mgr.enforce_retention(500, 1000).unwrap();
assert_eq!(deleted, 2);
}
#[test]
fn cardinality_eviction_through_manager() {
let dir = TempDir::new().unwrap();
let config = TimeseriesManagerConfig {
total_memory_budget: 10 * 1024 * 1024,
max_series: 3,
bucket_config: BucketConfig {
l1_dir: dir.path().join("l1"),
l2_dir: dir.path().join("l2"),
..Default::default()
},
active_budget_fraction: 0.6,
};
let mut mgr = TimeseriesManager::new(config);
mgr.ingest_metric(
1,
MetricSample {
timestamp_ms: 100,
value: 1.0,
},
);
mgr.ingest_metric(
2,
MetricSample {
timestamp_ms: 200,
value: 2.0,
},
);
mgr.ingest_metric(
3,
MetricSample {
timestamp_ms: 300,
value: 3.0,
},
);
mgr.ingest_metric(
4,
MetricSample {
timestamp_ms: 400,
value: 4.0,
},
);
assert_eq!(mgr.active_series_count(), 3);
assert_eq!(mgr.active_eviction_count(), 1);
}
}