1pub mod config;
9pub mod continuous_agg;
10pub mod ingest;
11pub mod partition;
12pub mod series;
13pub mod sync;
14
15pub use config::{ArchiveCompression, ConfigValidationError, TieredPartitionConfig};
17pub use continuous_agg::{AggFunction, AggregateExpr, ContinuousAggregateDef, RefreshPolicy};
18pub use ingest::{IngestResult, LogEntry, MetricSample, SymbolDictionary, TimeRange};
19pub use partition::{
20 FlushedKind, FlushedSeries, IntervalParseError, PartitionInterval, PartitionMeta,
21 PartitionState, SegmentKind, SegmentRef,
22};
23pub use series::{BatteryState, LiteId, SeriesCatalog, SeriesId, SeriesKey};
24pub use sync::{LogWalBatch, TimeseriesDelta, TimeseriesWalBatch};
25
26#[cfg(test)]
27mod tests {
28 use std::collections::HashMap;
29
30 use super::*;
31
32 #[test]
33 fn series_key_sorted_tags() {
34 let k1 = SeriesKey::new(
35 "cpu",
36 vec![("host".into(), "a".into()), ("dc".into(), "us".into())],
37 );
38 let k2 = SeriesKey::new(
39 "cpu",
40 vec![("dc".into(), "us".into()), ("host".into(), "a".into())],
41 );
42 assert_eq!(k1, k2);
43 assert_eq!(k1.to_series_id(0), k2.to_series_id(0));
44 }
45
46 #[test]
47 fn series_catalog_resolve() {
48 let mut catalog = SeriesCatalog::new();
49 let k = SeriesKey::new("cpu", vec![("host".into(), "prod-1".into())]);
50 let id1 = catalog.resolve(&k);
51 let id2 = catalog.resolve(&k);
52 assert_eq!(id1, id2);
53 assert_eq!(catalog.len(), 1);
54 }
55
56 #[test]
57 fn series_catalog_different_keys() {
58 let mut catalog = SeriesCatalog::new();
59 let k1 = SeriesKey::new("cpu", vec![("host".into(), "a".into())]);
60 let k2 = SeriesKey::new("mem", vec![("host".into(), "a".into())]);
61 let id1 = catalog.resolve(&k1);
62 let id2 = catalog.resolve(&k2);
63 assert_ne!(id1, id2);
64 assert_eq!(catalog.len(), 2);
65 }
66
67 #[test]
68 fn symbol_dictionary_basic() {
69 let mut dict = SymbolDictionary::new();
70 assert_eq!(dict.resolve("us-east-1", 100_000), Some(0));
71 assert_eq!(dict.resolve("us-west-2", 100_000), Some(1));
72 assert_eq!(dict.resolve("us-east-1", 100_000), Some(0));
73 assert_eq!(dict.len(), 2);
74 assert_eq!(dict.get(0), Some("us-east-1"));
75 assert_eq!(dict.get(1), Some("us-west-2"));
76 assert_eq!(dict.get_id("us-east-1"), Some(0));
77 }
78
79 #[test]
80 fn symbol_dictionary_cardinality_breaker() {
81 let mut dict = SymbolDictionary::new();
82 for i in 0..100 {
83 assert!(dict.resolve(&format!("val-{i}"), 100).is_some());
84 }
85 assert!(dict.resolve("one-too-many", 100).is_none());
86 assert_eq!(dict.len(), 100);
87 }
88
89 #[test]
90 fn symbol_dictionary_merge() {
91 let mut d1 = SymbolDictionary::new();
92 d1.resolve("a", 1000);
93 d1.resolve("b", 1000);
94
95 let mut d2 = SymbolDictionary::new();
96 d2.resolve("b", 1000);
97 d2.resolve("c", 1000);
98
99 let remap = d1.merge(&d2, 1000);
100 assert_eq!(d1.len(), 3);
101 assert_eq!(remap[0], d1.get_id("b").unwrap());
102 assert_eq!(remap[1], d1.get_id("c").unwrap());
103 }
104
105 #[test]
106 fn partition_interval_parse() {
107 assert_eq!(
108 PartitionInterval::parse("1h").unwrap(),
109 PartitionInterval::Duration(3_600_000)
110 );
111 assert_eq!(
112 PartitionInterval::parse("3d").unwrap(),
113 PartitionInterval::Duration(3 * 86_400_000)
114 );
115 assert_eq!(
116 PartitionInterval::parse("2w").unwrap(),
117 PartitionInterval::Duration(2 * 604_800_000)
118 );
119 assert_eq!(
120 PartitionInterval::parse("1M").unwrap(),
121 PartitionInterval::Month
122 );
123 assert_eq!(
124 PartitionInterval::parse("1y").unwrap(),
125 PartitionInterval::Year
126 );
127 assert_eq!(
128 PartitionInterval::parse("AUTO").unwrap(),
129 PartitionInterval::Auto
130 );
131 assert_eq!(
132 PartitionInterval::parse("UNBOUNDED").unwrap(),
133 PartitionInterval::Unbounded
134 );
135 assert!(matches!(
136 PartitionInterval::parse("0h"),
137 Err(IntervalParseError::ZeroInterval)
138 ));
139 assert!(matches!(
140 PartitionInterval::parse("2M"),
141 Err(IntervalParseError::UnsupportedCalendar { .. })
142 ));
143 }
144
145 #[test]
146 fn partition_interval_display_roundtrip() {
147 let cases = ["1h", "3d", "2w", "1M", "1y", "AUTO", "UNBOUNDED"];
148 for s in cases {
149 let parsed = PartitionInterval::parse(s).unwrap();
150 let displayed = parsed.to_string();
151 let reparsed = PartitionInterval::parse(&displayed).unwrap();
152 assert_eq!(parsed, reparsed, "roundtrip failed for {s}");
153 }
154 }
155
156 #[test]
157 fn partition_meta_queryable() {
158 let meta = PartitionMeta {
159 min_ts: 1000,
160 max_ts: 2000,
161 row_count: 500,
162 size_bytes: 1024,
163 schema_version: 1,
164 state: PartitionState::Sealed,
165 interval_ms: 86_400_000,
166 last_flushed_wal_lsn: 42,
167 column_stats: HashMap::new(),
168 max_system_ts: 0,
169 };
170 assert!(meta.is_queryable());
171 assert!(meta.overlaps(&TimeRange::new(1500, 2500)));
172 assert!(!meta.overlaps(&TimeRange::new(3000, 4000)));
173 }
174
175 #[test]
176 fn partition_meta_not_queryable_when_deleted() {
177 let meta = PartitionMeta {
178 min_ts: 0,
179 max_ts: 0,
180 row_count: 0,
181 size_bytes: 0,
182 schema_version: 1,
183 state: PartitionState::Deleted,
184 interval_ms: 0,
185 last_flushed_wal_lsn: 0,
186 column_stats: HashMap::new(),
187 max_system_ts: 0,
188 };
189 assert!(!meta.is_queryable());
190 }
191
192 #[test]
193 fn tiered_config_validation() {
194 let mut cfg = TieredPartitionConfig::origin_defaults();
195 assert!(cfg.validate().is_ok());
196
197 cfg.merge_count = 1;
198 let err = cfg.validate().unwrap_err();
199 assert_eq!(err.field, "merge_count");
200
201 cfg.merge_count = 10;
202 cfg.retention_period_ms = 1000;
203 cfg.archive_after_ms = 2000;
204 let err = cfg.validate().unwrap_err();
205 assert_eq!(err.field, "retention_period");
206 }
207
208 #[test]
209 fn time_range_overlap() {
210 let r1 = TimeRange::new(100, 200);
211 let r2 = TimeRange::new(150, 250);
212 let r3 = TimeRange::new(300, 400);
213 assert!(r1.overlaps(&r2));
214 assert!(!r1.overlaps(&r3));
215 }
216
217 #[test]
218 fn timeseries_delta_serialization() {
219 let delta = TimeseriesDelta {
220 source_id: "clxyz1234test".into(),
221 series_id: 12345,
222 series_key: SeriesKey::new("cpu", vec![("host".into(), "prod".into())]),
223 min_ts: 1000,
224 max_ts: 2000,
225 encoded_block: vec![1, 2, 3, 4],
226 sample_count: 100,
227 };
228 let json = sonic_rs::to_string(&delta).unwrap();
229 let back: TimeseriesDelta = sonic_rs::from_str(&json).unwrap();
230 assert_eq!(back.source_id, "clxyz1234test");
231 assert_eq!(back.series_id, 12345);
232 assert_eq!(back.sample_count, 100);
233 }
234}