Skip to main content

nodedb_types/timeseries/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Shared timeseries types for multi-model database engines.
4//!
5//! Used by both `nodedb` (server) and `nodedb-lite` (embedded) for
6//! timeseries ingest, storage, and query.
7
8pub mod config;
9pub mod continuous_agg;
10pub mod ingest;
11pub mod partition;
12pub mod series;
13pub mod sync;
14
15// Public API surface — flattened re-exports for callers that don't need the sub-module structure.
16pub use config::{ArchiveCompression, ConfigValidationError, TieredPartitionConfig};
17pub use continuous_agg::{AggFunction, AggregateExpr, ContinuousAggregateDef, RefreshPolicy};
18pub use ingest::{IngestResult, LogEntry, MetricSample, SymbolDictionary, TimeRange};
19pub use partition::{
20    FlushedKind, FlushedSeries, IntervalParseError, PartitionInterval, PartitionMeta,
21    PartitionState, SegmentKind, SegmentRef,
22};
23pub use series::{BatteryState, LiteId, SeriesCatalog, SeriesId, SeriesKey};
24pub use sync::{LogWalBatch, TimeseriesDelta, TimeseriesWalBatch};
25
26#[cfg(test)]
27mod tests {
28    use std::collections::HashMap;
29
30    use super::*;
31
32    #[test]
33    fn series_key_sorted_tags() {
34        let k1 = SeriesKey::new(
35            "cpu",
36            vec![("host".into(), "a".into()), ("dc".into(), "us".into())],
37        );
38        let k2 = SeriesKey::new(
39            "cpu",
40            vec![("dc".into(), "us".into()), ("host".into(), "a".into())],
41        );
42        assert_eq!(k1, k2);
43        assert_eq!(k1.to_series_id(0), k2.to_series_id(0));
44    }
45
46    #[test]
47    fn series_catalog_resolve() {
48        let mut catalog = SeriesCatalog::new();
49        let k = SeriesKey::new("cpu", vec![("host".into(), "prod-1".into())]);
50        let id1 = catalog.resolve(&k);
51        let id2 = catalog.resolve(&k);
52        assert_eq!(id1, id2);
53        assert_eq!(catalog.len(), 1);
54    }
55
56    #[test]
57    fn series_catalog_different_keys() {
58        let mut catalog = SeriesCatalog::new();
59        let k1 = SeriesKey::new("cpu", vec![("host".into(), "a".into())]);
60        let k2 = SeriesKey::new("mem", vec![("host".into(), "a".into())]);
61        let id1 = catalog.resolve(&k1);
62        let id2 = catalog.resolve(&k2);
63        assert_ne!(id1, id2);
64        assert_eq!(catalog.len(), 2);
65    }
66
67    #[test]
68    fn symbol_dictionary_basic() {
69        let mut dict = SymbolDictionary::new();
70        assert_eq!(dict.resolve("us-east-1", 100_000), Some(0));
71        assert_eq!(dict.resolve("us-west-2", 100_000), Some(1));
72        assert_eq!(dict.resolve("us-east-1", 100_000), Some(0));
73        assert_eq!(dict.len(), 2);
74        assert_eq!(dict.get(0), Some("us-east-1"));
75        assert_eq!(dict.get(1), Some("us-west-2"));
76        assert_eq!(dict.get_id("us-east-1"), Some(0));
77    }
78
79    #[test]
80    fn symbol_dictionary_cardinality_breaker() {
81        let mut dict = SymbolDictionary::new();
82        for i in 0..100 {
83            assert!(dict.resolve(&format!("val-{i}"), 100).is_some());
84        }
85        assert!(dict.resolve("one-too-many", 100).is_none());
86        assert_eq!(dict.len(), 100);
87    }
88
89    #[test]
90    fn symbol_dictionary_merge() {
91        let mut d1 = SymbolDictionary::new();
92        d1.resolve("a", 1000);
93        d1.resolve("b", 1000);
94
95        let mut d2 = SymbolDictionary::new();
96        d2.resolve("b", 1000);
97        d2.resolve("c", 1000);
98
99        let remap = d1.merge(&d2, 1000);
100        assert_eq!(d1.len(), 3);
101        assert_eq!(remap[0], d1.get_id("b").unwrap());
102        assert_eq!(remap[1], d1.get_id("c").unwrap());
103    }
104
105    #[test]
106    fn partition_interval_parse() {
107        assert_eq!(
108            PartitionInterval::parse("1h").unwrap(),
109            PartitionInterval::Duration(3_600_000)
110        );
111        assert_eq!(
112            PartitionInterval::parse("3d").unwrap(),
113            PartitionInterval::Duration(3 * 86_400_000)
114        );
115        assert_eq!(
116            PartitionInterval::parse("2w").unwrap(),
117            PartitionInterval::Duration(2 * 604_800_000)
118        );
119        assert_eq!(
120            PartitionInterval::parse("1M").unwrap(),
121            PartitionInterval::Month
122        );
123        assert_eq!(
124            PartitionInterval::parse("1y").unwrap(),
125            PartitionInterval::Year
126        );
127        assert_eq!(
128            PartitionInterval::parse("AUTO").unwrap(),
129            PartitionInterval::Auto
130        );
131        assert_eq!(
132            PartitionInterval::parse("UNBOUNDED").unwrap(),
133            PartitionInterval::Unbounded
134        );
135        assert!(matches!(
136            PartitionInterval::parse("0h"),
137            Err(IntervalParseError::ZeroInterval)
138        ));
139        assert!(matches!(
140            PartitionInterval::parse("2M"),
141            Err(IntervalParseError::UnsupportedCalendar { .. })
142        ));
143    }
144
145    #[test]
146    fn partition_interval_display_roundtrip() {
147        let cases = ["1h", "3d", "2w", "1M", "1y", "AUTO", "UNBOUNDED"];
148        for s in cases {
149            let parsed = PartitionInterval::parse(s).unwrap();
150            let displayed = parsed.to_string();
151            let reparsed = PartitionInterval::parse(&displayed).unwrap();
152            assert_eq!(parsed, reparsed, "roundtrip failed for {s}");
153        }
154    }
155
156    #[test]
157    fn partition_meta_queryable() {
158        let meta = PartitionMeta {
159            min_ts: 1000,
160            max_ts: 2000,
161            row_count: 500,
162            size_bytes: 1024,
163            schema_version: 1,
164            state: PartitionState::Sealed,
165            interval_ms: 86_400_000,
166            last_flushed_wal_lsn: 42,
167            column_stats: HashMap::new(),
168            max_system_ts: 0,
169        };
170        assert!(meta.is_queryable());
171        assert!(meta.overlaps(&TimeRange::new(1500, 2500)));
172        assert!(!meta.overlaps(&TimeRange::new(3000, 4000)));
173    }
174
175    #[test]
176    fn partition_meta_not_queryable_when_deleted() {
177        let meta = PartitionMeta {
178            min_ts: 0,
179            max_ts: 0,
180            row_count: 0,
181            size_bytes: 0,
182            schema_version: 1,
183            state: PartitionState::Deleted,
184            interval_ms: 0,
185            last_flushed_wal_lsn: 0,
186            column_stats: HashMap::new(),
187            max_system_ts: 0,
188        };
189        assert!(!meta.is_queryable());
190    }
191
192    #[test]
193    fn tiered_config_validation() {
194        let mut cfg = TieredPartitionConfig::origin_defaults();
195        assert!(cfg.validate().is_ok());
196
197        cfg.merge_count = 1;
198        let err = cfg.validate().unwrap_err();
199        assert_eq!(err.field, "merge_count");
200
201        cfg.merge_count = 10;
202        cfg.retention_period_ms = 1000;
203        cfg.archive_after_ms = 2000;
204        let err = cfg.validate().unwrap_err();
205        assert_eq!(err.field, "retention_period");
206    }
207
208    #[test]
209    fn time_range_overlap() {
210        let r1 = TimeRange::new(100, 200);
211        let r2 = TimeRange::new(150, 250);
212        let r3 = TimeRange::new(300, 400);
213        assert!(r1.overlaps(&r2));
214        assert!(!r1.overlaps(&r3));
215    }
216
217    #[test]
218    fn timeseries_delta_serialization() {
219        let delta = TimeseriesDelta {
220            source_id: "clxyz1234test".into(),
221            series_id: 12345,
222            series_key: SeriesKey::new("cpu", vec![("host".into(), "prod".into())]),
223            min_ts: 1000,
224            max_ts: 2000,
225            encoded_block: vec![1, 2, 3, 4],
226            sample_count: 100,
227        };
228        let json = sonic_rs::to_string(&delta).unwrap();
229        let back: TimeseriesDelta = sonic_rs::from_str(&json).unwrap();
230        assert_eq!(back.source_id, "clxyz1234test");
231        assert_eq!(back.series_id, 12345);
232        assert_eq!(back.sample_count, 100);
233    }
234}