Skip to main content

nodedb_types/timeseries/
mod.rs

1//! Shared timeseries types for multi-model database engines.
2//!
3//! Used by both `nodedb` (server) and `nodedb-lite` (embedded) for
4//! timeseries ingest, storage, and query.
5
6pub mod config;
7pub mod continuous_agg;
8pub mod ingest;
9pub mod partition;
10pub mod series;
11pub mod sync;
12
13// Re-export all public types for backward compatibility.
14// Existing code uses `nodedb_types::timeseries::SeriesKey` etc.
15pub use config::{ArchiveCompression, ConfigValidationError, TieredPartitionConfig};
16pub use continuous_agg::{AggFunction, AggregateExpr, ContinuousAggregateDef, RefreshPolicy};
17pub use ingest::{IngestResult, LogEntry, MetricSample, SymbolDictionary, TimeRange};
18pub use partition::{
19    FlushedKind, FlushedSeries, IntervalParseError, PartitionInterval, PartitionMeta,
20    PartitionState, SegmentKind, SegmentRef,
21};
22pub use series::{BatteryState, LiteId, SeriesCatalog, SeriesId, SeriesKey};
23pub use sync::{LogWalBatch, TimeseriesDelta, TimeseriesWalBatch};
24
25#[cfg(test)]
26mod tests {
27    use std::collections::HashMap;
28
29    use super::*;
30
31    #[test]
32    fn series_key_sorted_tags() {
33        let k1 = SeriesKey::new(
34            "cpu",
35            vec![("host".into(), "a".into()), ("dc".into(), "us".into())],
36        );
37        let k2 = SeriesKey::new(
38            "cpu",
39            vec![("dc".into(), "us".into()), ("host".into(), "a".into())],
40        );
41        assert_eq!(k1, k2);
42        assert_eq!(k1.to_series_id(0), k2.to_series_id(0));
43    }
44
45    #[test]
46    fn series_catalog_resolve() {
47        let mut catalog = SeriesCatalog::new();
48        let k = SeriesKey::new("cpu", vec![("host".into(), "prod-1".into())]);
49        let id1 = catalog.resolve(&k);
50        let id2 = catalog.resolve(&k);
51        assert_eq!(id1, id2);
52        assert_eq!(catalog.len(), 1);
53    }
54
55    #[test]
56    fn series_catalog_different_keys() {
57        let mut catalog = SeriesCatalog::new();
58        let k1 = SeriesKey::new("cpu", vec![("host".into(), "a".into())]);
59        let k2 = SeriesKey::new("mem", vec![("host".into(), "a".into())]);
60        let id1 = catalog.resolve(&k1);
61        let id2 = catalog.resolve(&k2);
62        assert_ne!(id1, id2);
63        assert_eq!(catalog.len(), 2);
64    }
65
66    #[test]
67    fn symbol_dictionary_basic() {
68        let mut dict = SymbolDictionary::new();
69        assert_eq!(dict.resolve("us-east-1", 100_000), Some(0));
70        assert_eq!(dict.resolve("us-west-2", 100_000), Some(1));
71        assert_eq!(dict.resolve("us-east-1", 100_000), Some(0));
72        assert_eq!(dict.len(), 2);
73        assert_eq!(dict.get(0), Some("us-east-1"));
74        assert_eq!(dict.get(1), Some("us-west-2"));
75        assert_eq!(dict.get_id("us-east-1"), Some(0));
76    }
77
78    #[test]
79    fn symbol_dictionary_cardinality_breaker() {
80        let mut dict = SymbolDictionary::new();
81        for i in 0..100 {
82            assert!(dict.resolve(&format!("val-{i}"), 100).is_some());
83        }
84        assert!(dict.resolve("one-too-many", 100).is_none());
85        assert_eq!(dict.len(), 100);
86    }
87
88    #[test]
89    fn symbol_dictionary_merge() {
90        let mut d1 = SymbolDictionary::new();
91        d1.resolve("a", 1000);
92        d1.resolve("b", 1000);
93
94        let mut d2 = SymbolDictionary::new();
95        d2.resolve("b", 1000);
96        d2.resolve("c", 1000);
97
98        let remap = d1.merge(&d2, 1000);
99        assert_eq!(d1.len(), 3);
100        assert_eq!(remap[0], d1.get_id("b").unwrap());
101        assert_eq!(remap[1], d1.get_id("c").unwrap());
102    }
103
104    #[test]
105    fn partition_interval_parse() {
106        assert_eq!(
107            PartitionInterval::parse("1h").unwrap(),
108            PartitionInterval::Duration(3_600_000)
109        );
110        assert_eq!(
111            PartitionInterval::parse("3d").unwrap(),
112            PartitionInterval::Duration(3 * 86_400_000)
113        );
114        assert_eq!(
115            PartitionInterval::parse("2w").unwrap(),
116            PartitionInterval::Duration(2 * 604_800_000)
117        );
118        assert_eq!(
119            PartitionInterval::parse("1M").unwrap(),
120            PartitionInterval::Month
121        );
122        assert_eq!(
123            PartitionInterval::parse("1y").unwrap(),
124            PartitionInterval::Year
125        );
126        assert_eq!(
127            PartitionInterval::parse("AUTO").unwrap(),
128            PartitionInterval::Auto
129        );
130        assert_eq!(
131            PartitionInterval::parse("UNBOUNDED").unwrap(),
132            PartitionInterval::Unbounded
133        );
134        assert!(matches!(
135            PartitionInterval::parse("0h"),
136            Err(IntervalParseError::ZeroInterval)
137        ));
138        assert!(matches!(
139            PartitionInterval::parse("2M"),
140            Err(IntervalParseError::UnsupportedCalendar { .. })
141        ));
142    }
143
144    #[test]
145    fn partition_interval_display_roundtrip() {
146        let cases = ["1h", "3d", "2w", "1M", "1y", "AUTO", "UNBOUNDED"];
147        for s in cases {
148            let parsed = PartitionInterval::parse(s).unwrap();
149            let displayed = parsed.to_string();
150            let reparsed = PartitionInterval::parse(&displayed).unwrap();
151            assert_eq!(parsed, reparsed, "roundtrip failed for {s}");
152        }
153    }
154
155    #[test]
156    fn partition_meta_queryable() {
157        let meta = PartitionMeta {
158            min_ts: 1000,
159            max_ts: 2000,
160            row_count: 500,
161            size_bytes: 1024,
162            schema_version: 1,
163            state: PartitionState::Sealed,
164            interval_ms: 86_400_000,
165            last_flushed_wal_lsn: 42,
166            column_stats: HashMap::new(),
167        };
168        assert!(meta.is_queryable());
169        assert!(meta.overlaps(&TimeRange::new(1500, 2500)));
170        assert!(!meta.overlaps(&TimeRange::new(3000, 4000)));
171    }
172
173    #[test]
174    fn partition_meta_not_queryable_when_deleted() {
175        let meta = PartitionMeta {
176            min_ts: 0,
177            max_ts: 0,
178            row_count: 0,
179            size_bytes: 0,
180            schema_version: 1,
181            state: PartitionState::Deleted,
182            interval_ms: 0,
183            last_flushed_wal_lsn: 0,
184            column_stats: HashMap::new(),
185        };
186        assert!(!meta.is_queryable());
187    }
188
189    #[test]
190    fn tiered_config_validation() {
191        let mut cfg = TieredPartitionConfig::origin_defaults();
192        assert!(cfg.validate().is_ok());
193
194        cfg.merge_count = 1;
195        let err = cfg.validate().unwrap_err();
196        assert_eq!(err.field, "merge_count");
197
198        cfg.merge_count = 10;
199        cfg.retention_period_ms = 1000;
200        cfg.archive_after_ms = 2000;
201        let err = cfg.validate().unwrap_err();
202        assert_eq!(err.field, "retention_period");
203    }
204
205    #[test]
206    fn time_range_overlap() {
207        let r1 = TimeRange::new(100, 200);
208        let r2 = TimeRange::new(150, 250);
209        let r3 = TimeRange::new(300, 400);
210        assert!(r1.overlaps(&r2));
211        assert!(!r1.overlaps(&r3));
212    }
213
214    #[test]
215    fn timeseries_delta_serialization() {
216        let delta = TimeseriesDelta {
217            source_id: "clxyz1234test".into(),
218            series_id: 12345,
219            series_key: SeriesKey::new("cpu", vec![("host".into(), "prod".into())]),
220            min_ts: 1000,
221            max_ts: 2000,
222            encoded_block: vec![1, 2, 3, 4],
223            sample_count: 100,
224        };
225        let json = sonic_rs::to_string(&delta).unwrap();
226        let back: TimeseriesDelta = sonic_rs::from_str(&json).unwrap();
227        assert_eq!(back.source_id, "clxyz1234test");
228        assert_eq!(back.series_id, 12345);
229        assert_eq!(back.sample_count, 100);
230    }
231}