use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher};
use serde::{Deserialize, Serialize};
#[derive(Debug, thiserror::Error)]
pub enum IntervalParseError {
#[error("invalid interval format: '{input}' — expected format like '1h', '3d', '1w'")]
InvalidFormat { input: String },
#[error("invalid number in interval: '{input}'")]
InvalidNumber { input: String },
#[error("partition interval must be > 0")]
ZeroInterval,
#[error("unknown unit '{unit}': expected s, m, h, d, w, M, y")]
UnknownUnit { unit: String },
#[error("unsupported calendar interval '{input}': {hint}")]
UnsupportedCalendar { input: String, hint: &'static str },
}
#[derive(Debug, thiserror::Error)]
#[error("config validation: {field} — {reason}")]
pub struct ConfigValidationError {
pub field: String,
pub reason: String,
}
pub type SeriesId = u64;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SeriesKey {
pub metric: String,
pub tags: Vec<(String, String)>,
}
impl SeriesKey {
pub fn new(metric: impl Into<String>, mut tags: Vec<(String, String)>) -> Self {
tags.sort();
Self {
metric: metric.into(),
tags,
}
}
pub fn to_series_id(&self, salt: u64) -> SeriesId {
let mut hasher = DefaultHasher::new();
salt.hash(&mut hasher);
self.metric.hash(&mut hasher);
for (k, v) in &self.tags {
k.hash(&mut hasher);
v.hash(&mut hasher);
}
hasher.finish()
}
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct SeriesCatalog {
entries: HashMap<SeriesId, (SeriesKey, u64)>,
}
impl SeriesCatalog {
pub fn new() -> Self {
Self::default()
}
pub fn resolve(&mut self, key: &SeriesKey) -> SeriesId {
let mut salt = 0u64;
loop {
let id = key.to_series_id(salt);
match self.entries.get(&id) {
None => {
self.entries.insert(id, (key.clone(), salt));
return id;
}
Some((existing_key, _)) if existing_key == key => {
return id;
}
Some(_) => {
salt += 1;
}
}
}
}
pub fn get(&self, id: SeriesId) -> Option<&SeriesKey> {
self.entries.get(&id).map(|(k, _)| k)
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
pub type LiteId = String;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
pub enum BatteryState {
Normal,
Low,
Charging,
#[default]
Unknown,
}
impl BatteryState {
pub fn should_defer_flush(&self) -> bool {
matches!(self, Self::Low)
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct MetricSample {
pub timestamp_ms: i64,
pub value: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub timestamp_ms: i64,
pub data: Vec<u8>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IngestResult {
Ok,
FlushNeeded,
Rejected,
}
impl IngestResult {
pub fn is_flush_needed(&self) -> bool {
matches!(self, Self::FlushNeeded)
}
pub fn is_rejected(&self) -> bool {
matches!(self, Self::Rejected)
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TimeRange {
pub start_ms: i64,
pub end_ms: i64,
}
impl TimeRange {
pub fn new(start_ms: i64, end_ms: i64) -> Self {
Self { start_ms, end_ms }
}
pub fn contains(&self, ts: i64) -> bool {
ts >= self.start_ms && ts <= self.end_ms
}
pub fn overlaps(&self, other: &TimeRange) -> bool {
self.start_ms <= other.end_ms && other.start_ms <= self.end_ms
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SymbolDictionary {
forward: HashMap<String, u32>,
reverse: Vec<String>,
}
impl SymbolDictionary {
pub fn new() -> Self {
Self::default()
}
pub fn resolve(&mut self, value: &str, max_cardinality: u32) -> Option<u32> {
if let Some(&id) = self.forward.get(value) {
return Some(id);
}
if self.reverse.len() as u32 >= max_cardinality {
return None; }
let id = self.reverse.len() as u32;
self.forward.insert(value.to_string(), id);
self.reverse.push(value.to_string());
Some(id)
}
pub fn get(&self, id: u32) -> Option<&str> {
self.reverse.get(id as usize).map(|s| s.as_str())
}
pub fn get_id(&self, value: &str) -> Option<u32> {
self.forward.get(value).copied()
}
pub fn len(&self) -> usize {
self.reverse.len()
}
pub fn is_empty(&self) -> bool {
self.reverse.is_empty()
}
pub fn merge(&mut self, other: &SymbolDictionary, max_cardinality: u32) -> Vec<u32> {
let mut remap = Vec::with_capacity(other.reverse.len());
for symbol in &other.reverse {
match self.resolve(symbol, max_cardinality) {
Some(new_id) => remap.push(new_id),
None => {
remap.push(u32::MAX);
}
}
}
remap
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeseriesDelta {
pub source_id: LiteId,
pub series_id: SeriesId,
pub series_key: SeriesKey,
pub min_ts: i64,
pub max_ts: i64,
pub encoded_block: Vec<u8>,
pub sample_count: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PartitionState {
Active,
Sealed,
Merging,
Merged,
Deleted,
Archived,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionMeta {
pub min_ts: i64,
pub max_ts: i64,
pub row_count: u64,
pub size_bytes: u64,
pub schema_version: u32,
pub state: PartitionState,
pub interval_ms: u64,
pub last_flushed_wal_lsn: u64,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub column_stats: HashMap<String, nodedb_codec::ColumnStatistics>,
}
impl PartitionMeta {
pub fn overlaps(&self, range: &TimeRange) -> bool {
self.min_ts <= range.end_ms && range.start_ms <= self.max_ts
}
pub fn is_queryable(&self) -> bool {
matches!(
self.state,
PartitionState::Active | PartitionState::Sealed | PartitionState::Merged
)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum PartitionInterval {
Duration(u64),
Month,
Year,
Unbounded,
Auto,
}
impl PartitionInterval {
pub fn parse(s: &str) -> Result<Self, IntervalParseError> {
let s = s.trim();
match s.to_uppercase().as_str() {
"AUTO" => return Ok(Self::Auto),
"UNBOUNDED" | "NONE" => return Ok(Self::Unbounded),
_ => {}
}
if s.ends_with('M') && s.len() > 1 && s[..s.len() - 1].chars().all(|c| c.is_ascii_digit()) {
let n: u64 = s[..s.len() - 1]
.parse()
.map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
if n != 1 {
return Err(IntervalParseError::UnsupportedCalendar {
input: s.into(),
hint: "only '1M' (one calendar month) is supported",
});
}
return Ok(Self::Month);
}
if s.ends_with('y') && s.len() > 1 && s[..s.len() - 1].chars().all(|c| c.is_ascii_digit()) {
let n: u64 = s[..s.len() - 1]
.parse()
.map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
if n != 1 {
return Err(IntervalParseError::UnsupportedCalendar {
input: s.into(),
hint: "only '1y' (one calendar year) is supported",
});
}
return Ok(Self::Year);
}
let (num_str, unit) = if s.len() > 1 && s.as_bytes()[s.len() - 1].is_ascii_alphabetic() {
(&s[..s.len() - 1], &s[s.len() - 1..])
} else {
return Err(IntervalParseError::InvalidFormat { input: s.into() });
};
let n: u64 = num_str
.parse()
.map_err(|_| IntervalParseError::InvalidNumber { input: s.into() })?;
if n == 0 {
return Err(IntervalParseError::ZeroInterval);
}
let ms = match unit {
"s" => n * 1_000,
"m" => n * 60_000,
"h" => n * 3_600_000,
"d" => n * 86_400_000,
"w" => n * 604_800_000,
_ => {
return Err(IntervalParseError::UnknownUnit { unit: unit.into() });
}
};
Ok(Self::Duration(ms))
}
pub fn as_millis(&self) -> Option<u64> {
match self {
Self::Duration(ms) => Some(*ms),
_ => None,
}
}
}
impl std::fmt::Display for PartitionInterval {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Duration(ms) => {
if *ms % 604_800_000 == 0 {
write!(f, "{}w", ms / 604_800_000)
} else if *ms % 86_400_000 == 0 {
write!(f, "{}d", ms / 86_400_000)
} else if *ms % 3_600_000 == 0 {
write!(f, "{}h", ms / 3_600_000)
} else if *ms % 60_000 == 0 {
write!(f, "{}m", ms / 60_000)
} else {
write!(f, "{}s", ms / 1_000)
}
}
Self::Month => write!(f, "1M"),
Self::Year => write!(f, "1y"),
Self::Unbounded => write!(f, "UNBOUNDED"),
Self::Auto => write!(f, "AUTO"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TieredPartitionConfig {
pub memtable_flush_interval_ms: u64,
pub memtable_max_memory_bytes: u64,
pub partition_by: PartitionInterval,
pub merge_after_ms: u64,
pub merge_count: u32,
pub archive_after_ms: u64,
pub archive_compression: ArchiveCompression,
pub retention_period_ms: u64,
pub timestamp_column: String,
pub max_tag_cardinality: u32,
pub wal_enabled: bool,
pub cdc_enabled: bool,
pub sync_resolution_ms: u64,
pub sync_interval_ms: u64,
pub retain_until_synced: bool,
#[serde(default)]
pub battery_aware: bool,
#[serde(default)]
pub bulk_import_threshold_rows: u64,
#[serde(default)]
pub partition_size_target_bytes: u64,
#[serde(default)]
pub compaction_partition_threshold: u32,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
pub enum ArchiveCompression {
#[default]
Zstd,
Lz4,
Snappy,
}
impl TieredPartitionConfig {
pub fn origin_defaults() -> Self {
Self {
memtable_flush_interval_ms: 10_000, memtable_max_memory_bytes: 64 * 1024 * 1024, partition_by: PartitionInterval::Auto,
merge_after_ms: 30 * 86_400_000, merge_count: 10,
archive_after_ms: 0, retention_period_ms: 0, archive_compression: ArchiveCompression::Zstd,
timestamp_column: String::new(), max_tag_cardinality: 100_000,
wal_enabled: true,
cdc_enabled: false,
sync_resolution_ms: 0,
sync_interval_ms: 0,
retain_until_synced: false,
battery_aware: false,
bulk_import_threshold_rows: 0,
partition_size_target_bytes: 0,
compaction_partition_threshold: 0,
}
}
pub fn lite_defaults() -> Self {
Self {
memtable_flush_interval_ms: 30_000, memtable_max_memory_bytes: 4 * 1024 * 1024, partition_by: PartitionInterval::Auto,
merge_after_ms: 7 * 86_400_000, merge_count: 4,
archive_after_ms: 0, retention_period_ms: 7 * 86_400_000, archive_compression: ArchiveCompression::Zstd,
timestamp_column: String::new(),
max_tag_cardinality: 10_000, wal_enabled: true,
cdc_enabled: false,
sync_resolution_ms: 0, sync_interval_ms: 30_000, retain_until_synced: false,
battery_aware: false, bulk_import_threshold_rows: 1_000_000, partition_size_target_bytes: 1_024 * 1_024, compaction_partition_threshold: 20, }
}
pub fn validate(&self) -> Result<(), ConfigValidationError> {
if self.merge_count < 2 {
return Err(ConfigValidationError {
field: "merge_count".into(),
reason: "must be >= 2".into(),
});
}
if self.retention_period_ms > 0
&& self.archive_after_ms > 0
&& self.retention_period_ms < self.archive_after_ms
{
return Err(ConfigValidationError {
field: "retention_period".into(),
reason: "must be >= archive_after".into(),
});
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeseriesWalBatch {
pub collection: String,
pub samples: Vec<(SeriesId, i64, f64)>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogWalBatch {
pub collection: String,
pub entries: Vec<(SeriesId, i64, Vec<u8>)>,
}
#[derive(Debug)]
pub struct FlushedSeries {
pub series_id: SeriesId,
pub kind: FlushedKind,
pub min_ts: i64,
pub max_ts: i64,
}
#[derive(Debug)]
pub enum FlushedKind {
Metric {
gorilla_block: Vec<u8>,
sample_count: u64,
},
Log {
entries: Vec<LogEntry>,
total_bytes: usize,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SegmentRef {
pub path: String,
pub min_ts: i64,
pub max_ts: i64,
pub kind: SegmentKind,
pub size_bytes: u64,
pub created_at_ms: i64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SegmentKind {
Metric,
Log,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn series_key_sorted_tags() {
let k1 = SeriesKey::new(
"cpu",
vec![("host".into(), "a".into()), ("dc".into(), "us".into())],
);
let k2 = SeriesKey::new(
"cpu",
vec![("dc".into(), "us".into()), ("host".into(), "a".into())],
);
assert_eq!(k1, k2);
assert_eq!(k1.to_series_id(0), k2.to_series_id(0));
}
#[test]
fn series_catalog_resolve() {
let mut catalog = SeriesCatalog::new();
let k = SeriesKey::new("cpu", vec![("host".into(), "prod-1".into())]);
let id1 = catalog.resolve(&k);
let id2 = catalog.resolve(&k);
assert_eq!(id1, id2);
assert_eq!(catalog.len(), 1);
}
#[test]
fn series_catalog_different_keys() {
let mut catalog = SeriesCatalog::new();
let k1 = SeriesKey::new("cpu", vec![("host".into(), "a".into())]);
let k2 = SeriesKey::new("mem", vec![("host".into(), "a".into())]);
let id1 = catalog.resolve(&k1);
let id2 = catalog.resolve(&k2);
assert_ne!(id1, id2);
assert_eq!(catalog.len(), 2);
}
#[test]
fn symbol_dictionary_basic() {
let mut dict = SymbolDictionary::new();
assert_eq!(dict.resolve("us-east-1", 100_000), Some(0));
assert_eq!(dict.resolve("us-west-2", 100_000), Some(1));
assert_eq!(dict.resolve("us-east-1", 100_000), Some(0)); assert_eq!(dict.len(), 2);
assert_eq!(dict.get(0), Some("us-east-1"));
assert_eq!(dict.get(1), Some("us-west-2"));
assert_eq!(dict.get_id("us-east-1"), Some(0));
}
#[test]
fn symbol_dictionary_cardinality_breaker() {
let mut dict = SymbolDictionary::new();
for i in 0..100 {
assert!(dict.resolve(&format!("val-{i}"), 100).is_some());
}
assert!(dict.resolve("one-too-many", 100).is_none());
assert_eq!(dict.len(), 100);
}
#[test]
fn symbol_dictionary_merge() {
let mut d1 = SymbolDictionary::new();
d1.resolve("a", 1000);
d1.resolve("b", 1000);
let mut d2 = SymbolDictionary::new();
d2.resolve("b", 1000); d2.resolve("c", 1000);
let remap = d1.merge(&d2, 1000);
assert_eq!(d1.len(), 3); assert_eq!(remap[0], d1.get_id("b").unwrap()); assert_eq!(remap[1], d1.get_id("c").unwrap()); }
#[test]
fn partition_interval_parse() {
assert_eq!(
PartitionInterval::parse("1h").unwrap(),
PartitionInterval::Duration(3_600_000)
);
assert_eq!(
PartitionInterval::parse("3d").unwrap(),
PartitionInterval::Duration(3 * 86_400_000)
);
assert_eq!(
PartitionInterval::parse("2w").unwrap(),
PartitionInterval::Duration(2 * 604_800_000)
);
assert_eq!(
PartitionInterval::parse("1M").unwrap(),
PartitionInterval::Month
);
assert_eq!(
PartitionInterval::parse("1y").unwrap(),
PartitionInterval::Year
);
assert_eq!(
PartitionInterval::parse("AUTO").unwrap(),
PartitionInterval::Auto
);
assert_eq!(
PartitionInterval::parse("UNBOUNDED").unwrap(),
PartitionInterval::Unbounded
);
assert!(matches!(
PartitionInterval::parse("0h"),
Err(IntervalParseError::ZeroInterval)
));
assert!(matches!(
PartitionInterval::parse("2M"),
Err(IntervalParseError::UnsupportedCalendar { .. })
));
}
#[test]
fn partition_interval_display_roundtrip() {
let cases = ["1h", "3d", "2w", "1M", "1y", "AUTO", "UNBOUNDED"];
for s in cases {
let parsed = PartitionInterval::parse(s).unwrap();
let displayed = parsed.to_string();
let reparsed = PartitionInterval::parse(&displayed).unwrap();
assert_eq!(parsed, reparsed, "roundtrip failed for {s}");
}
}
#[test]
fn partition_meta_queryable() {
let meta = PartitionMeta {
min_ts: 1000,
max_ts: 2000,
row_count: 500,
size_bytes: 1024,
schema_version: 1,
state: PartitionState::Sealed,
interval_ms: 86_400_000,
last_flushed_wal_lsn: 42,
column_stats: HashMap::new(),
};
assert!(meta.is_queryable());
assert!(meta.overlaps(&TimeRange::new(1500, 2500)));
assert!(!meta.overlaps(&TimeRange::new(3000, 4000)));
}
#[test]
fn partition_meta_not_queryable_when_deleted() {
let meta = PartitionMeta {
min_ts: 0,
max_ts: 0,
row_count: 0,
size_bytes: 0,
schema_version: 1,
state: PartitionState::Deleted,
interval_ms: 0,
last_flushed_wal_lsn: 0,
column_stats: HashMap::new(),
};
assert!(!meta.is_queryable());
}
#[test]
fn tiered_config_validation() {
let mut cfg = TieredPartitionConfig::origin_defaults();
assert!(cfg.validate().is_ok());
cfg.merge_count = 1;
let err = cfg.validate().unwrap_err();
assert_eq!(err.field, "merge_count");
cfg.merge_count = 10;
cfg.retention_period_ms = 1000;
cfg.archive_after_ms = 2000;
let err = cfg.validate().unwrap_err();
assert_eq!(err.field, "retention_period");
}
#[test]
fn time_range_overlap() {
let r1 = TimeRange::new(100, 200);
let r2 = TimeRange::new(150, 250);
let r3 = TimeRange::new(300, 400);
assert!(r1.overlaps(&r2));
assert!(!r1.overlaps(&r3));
}
#[test]
fn timeseries_delta_serialization() {
let delta = TimeseriesDelta {
source_id: "clxyz1234test".into(),
series_id: 12345,
series_key: SeriesKey::new("cpu", vec![("host".into(), "prod".into())]),
min_ts: 1000,
max_ts: 2000,
encoded_block: vec![1, 2, 3, 4],
sample_count: 100,
};
let json = serde_json::to_string(&delta).unwrap();
let back: TimeseriesDelta = serde_json::from_str(&json).unwrap();
assert_eq!(back.source_id, "clxyz1234test");
assert_eq!(back.series_id, 12345);
assert_eq!(back.sample_count, 100);
}
}