use nodedb_types::timeseries::{
PartitionInterval, PartitionMeta, PartitionState, TieredPartitionConfig, TimeRange,
};
use super::entry::format_partition_dir;
use super::rate::RateEstimator;
use super::registry::PartitionRegistry;
fn test_config() -> TieredPartitionConfig {
let mut cfg = TieredPartitionConfig::origin_defaults();
cfg.partition_by = PartitionInterval::Duration(86_400_000); cfg.merge_after_ms = 7 * 86_400_000;
cfg.merge_count = 3;
cfg.retention_period_ms = 30 * 86_400_000;
cfg
}
#[test]
fn create_partition() {
let mut reg = PartitionRegistry::new(test_config());
let (entry, is_new) = reg.get_or_create_partition(86_400_000 * 5 + 1000);
assert!(is_new);
assert_eq!(entry.meta.state, PartitionState::Active);
assert!(entry.dir_name.starts_with("ts-"));
let (_, is_new2) = reg.get_or_create_partition(86_400_000 * 5 + 2000);
assert!(!is_new2);
assert_eq!(reg.partition_count(), 1);
}
#[test]
fn different_days_different_partitions() {
let mut reg = PartitionRegistry::new(test_config());
reg.get_or_create_partition(86_400_000); reg.get_or_create_partition(86_400_000 * 2); reg.get_or_create_partition(86_400_000 * 3); assert_eq!(reg.partition_count(), 3);
}
#[test]
fn seal_partition() {
let mut reg = PartitionRegistry::new(test_config());
let day1_start = 86_400_000i64;
reg.get_or_create_partition(day1_start);
assert_eq!(reg.active_count(), 1);
assert!(reg.seal_partition(day1_start));
assert_eq!(reg.active_count(), 0);
assert_eq!(reg.sealed_count(), 1);
}
#[test]
fn query_partitions_pruning() {
let mut reg = PartitionRegistry::new(test_config());
let day_ms = 86_400_000i64;
for d in 1..=10 {
let (_, _) = reg.get_or_create_partition(d * day_ms);
}
let range = TimeRange::new(3 * day_ms, 5 * day_ms + day_ms - 1);
let results = reg.query_partitions(&range);
assert_eq!(results.len(), 3);
}
#[test]
fn find_mergeable() {
let mut reg = PartitionRegistry::new(test_config());
let day_ms = 86_400_000i64;
for d in 1..=6 {
reg.get_or_create_partition(d * day_ms);
reg.seal_partition(d * day_ms);
}
let now = 7 * day_ms;
assert!(reg.find_mergeable(now).is_empty());
let now = 22 * day_ms;
let groups = reg.find_mergeable(now);
assert_eq!(groups.len(), 2);
assert_eq!(groups[0].len(), 3);
}
#[test]
fn find_expired() {
let mut reg = PartitionRegistry::new(test_config());
let day_ms = 86_400_000i64;
for d in 1..=5 {
let start = d * day_ms;
reg.get_or_create_partition(start);
if let Some(entry) = reg.partitions.get_mut(&start) {
entry.meta.max_ts = start + day_ms - 1;
}
}
let now = 40 * day_ms;
let expired = reg.find_expired(now, false);
assert_eq!(expired.len(), 5);
}
#[test]
fn commit_merge_and_purge() {
let mut reg = PartitionRegistry::new(test_config());
let day_ms = 86_400_000i64;
let starts: Vec<i64> = (1..=3).map(|d| d * day_ms).collect();
for &s in &starts {
reg.get_or_create_partition(s);
reg.seal_partition(s);
}
for &s in &starts {
reg.mark_merging(s);
}
let merged_meta = PartitionMeta {
min_ts: starts[0],
max_ts: starts[2] + day_ms - 1,
row_count: 3000,
size_bytes: 1024,
schema_version: 1,
state: PartitionState::Merged,
interval_ms: 3 * day_ms as u64,
last_flushed_wal_lsn: 100,
column_stats: std::collections::HashMap::new(),
max_system_ts: 0,
};
reg.commit_merge(merged_meta, "ts-merged".into(), &starts);
assert_eq!(reg.partition_count(), 3);
let dirs = reg.purge_deleted();
assert_eq!(dirs.len(), 2); assert_eq!(reg.partition_count(), 1); }
#[test]
fn auto_mode_widen_on_small_partition() {
let mut cfg = test_config();
cfg.partition_by = PartitionInterval::Auto;
let mut reg = PartitionRegistry::new(cfg);
assert_eq!(reg.current_interval().as_millis(), Some(86_400_000));
let start = 86_400_000i64;
reg.get_or_create_partition(start);
if let Some(entry) = reg.partitions.get_mut(&start) {
entry.meta.row_count = 50;
}
reg.seal_partition(start);
assert_eq!(reg.current_interval().as_millis(), Some(2 * 86_400_000));
}
#[test]
fn set_partition_interval_online() {
let mut reg = PartitionRegistry::new(test_config());
let day_ms = 86_400_000i64;
reg.get_or_create_partition(day_ms);
reg.get_or_create_partition(2 * day_ms);
reg.set_partition_interval(PartitionInterval::Duration(3 * day_ms as u64));
assert_eq!(reg.current_interval().as_millis(), Some(3 * 86_400_000));
reg.get_or_create_partition(10 * day_ms);
assert_eq!(reg.partition_count(), 3);
}
#[test]
fn rate_estimator_basic() {
let mut est = RateEstimator::new();
for i in 0..5 {
est.record(1000, i * 1000);
}
assert!(est.rate() > 100.0); }
#[test]
fn format_partition_dir_test() {
let dir = format_partition_dir(1_704_067_200_000, 1_704_153_600_000);
assert_eq!(dir, "ts-20240101-000000_20240102-000000");
}
#[test]
fn unbounded_partition() {
let mut cfg = test_config();
cfg.partition_by = PartitionInterval::Unbounded;
let mut reg = PartitionRegistry::new(cfg);
reg.get_or_create_partition(1000);
reg.get_or_create_partition(999_999_999);
assert_eq!(reg.partition_count(), 1);
}