1use std::collections::BTreeMap;
29use std::sync::{Arc, Mutex};
30
31use super::retention::parse_duration_ns;
32
33#[derive(Debug, Clone)]
35pub struct HypertableSpec {
36 pub name: String,
37 pub time_column: String,
40 pub chunk_interval_ns: u64,
42 pub default_ttl_ns: Option<u64>,
53}
54
55impl HypertableSpec {
56 pub fn new(
57 name: impl Into<String>,
58 time_column: impl Into<String>,
59 chunk_interval_ns: u64,
60 ) -> Self {
61 Self {
62 name: name.into(),
63 time_column: time_column.into(),
64 chunk_interval_ns: chunk_interval_ns.max(1),
65 default_ttl_ns: None,
66 }
67 }
68
69 pub fn from_interval_string(
72 name: impl Into<String>,
73 time_column: impl Into<String>,
74 interval: &str,
75 ) -> Option<Self> {
76 let ns = parse_duration_ns(interval)?;
77 if ns == 0 {
78 return None;
79 }
80 Some(Self::new(name, time_column, ns))
81 }
82
83 pub fn with_ttl(mut self, ttl: &str) -> Option<Self> {
86 let ns = parse_duration_ns(ttl)?;
87 if ns == 0 {
88 return None;
89 }
90 self.default_ttl_ns = Some(ns);
91 Some(self)
92 }
93
94 pub fn with_ttl_ns(mut self, ttl_ns: u64) -> Self {
96 self.default_ttl_ns = if ttl_ns == 0 { None } else { Some(ttl_ns) };
97 self
98 }
99
100 pub fn chunk_start(&self, timestamp_ns: u64) -> u64 {
104 (timestamp_ns / self.chunk_interval_ns) * self.chunk_interval_ns
105 }
106
107 pub fn chunk_end_exclusive(&self, timestamp_ns: u64) -> u64 {
108 self.chunk_start(timestamp_ns)
109 .saturating_add(self.chunk_interval_ns)
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Hash)]
116pub struct ChunkId {
117 pub hypertable: String,
118 pub start_ns: u64,
120}
121
122#[derive(Debug, Clone)]
125pub struct ChunkMeta {
126 pub id: ChunkId,
127 pub end_ns_exclusive: u64,
128 pub row_count: u64,
129 pub min_ts_ns: u64,
130 pub max_ts_ns: u64,
131 pub sealed: bool,
132 pub ttl_override_ns: Option<u64>,
138}
139
140impl ChunkMeta {
141 pub fn new(id: ChunkId, end_ns_exclusive: u64) -> Self {
142 Self {
143 id,
144 end_ns_exclusive,
145 row_count: 0,
146 min_ts_ns: u64::MAX,
147 max_ts_ns: 0,
148 sealed: false,
149 ttl_override_ns: None,
150 }
151 }
152
153 pub fn observe(&mut self, ts_ns: u64) {
154 self.row_count += 1;
155 if ts_ns < self.min_ts_ns {
156 self.min_ts_ns = ts_ns;
157 }
158 if ts_ns > self.max_ts_ns {
159 self.max_ts_ns = ts_ns;
160 }
161 }
162
163 pub fn effective_ttl_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
167 self.ttl_override_ns.or(default_ttl_ns)
168 }
169
170 pub fn expiry_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
175 let ttl = self.effective_ttl_ns(default_ttl_ns)?;
176 if self.row_count == 0 {
177 return None;
178 }
179 Some(self.max_ts_ns.saturating_add(ttl))
180 }
181
182 pub fn is_expired_at(&self, now_ns: u64, default_ttl_ns: Option<u64>) -> bool {
183 match self.expiry_ns(default_ttl_ns) {
184 Some(expiry) => now_ns >= expiry,
185 None => false,
186 }
187 }
188}
189
190#[derive(Clone, Default)]
193pub struct HypertableRegistry {
194 inner: Arc<Mutex<RegistryInner>>,
195}
196
197#[derive(Default)]
198struct RegistryInner {
199 specs: BTreeMap<String, HypertableSpec>,
200 chunks: BTreeMap<(String, u64), ChunkMeta>,
204}
205
206impl std::fmt::Debug for HypertableRegistry {
207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 let guard = match self.inner.lock() {
209 Ok(g) => g,
210 Err(p) => p.into_inner(),
211 };
212 f.debug_struct("HypertableRegistry")
213 .field("hypertables", &guard.specs.len())
214 .field("chunks", &guard.chunks.len())
215 .finish()
216 }
217}
218
219impl HypertableRegistry {
220 pub fn new() -> Self {
221 Self::default()
222 }
223
224 pub fn register(&self, spec: HypertableSpec) {
229 let mut guard = match self.inner.lock() {
230 Ok(g) => g,
231 Err(p) => p.into_inner(),
232 };
233 guard.specs.insert(spec.name.clone(), spec);
234 }
235
236 pub fn get(&self, name: &str) -> Option<HypertableSpec> {
237 let guard = match self.inner.lock() {
238 Ok(g) => g,
239 Err(p) => p.into_inner(),
240 };
241 guard.specs.get(name).cloned()
242 }
243
244 pub fn list(&self) -> Vec<HypertableSpec> {
245 let guard = match self.inner.lock() {
246 Ok(g) => g,
247 Err(p) => p.into_inner(),
248 };
249 guard.specs.values().cloned().collect()
250 }
251
252 pub fn unregister(&self, name: &str) -> Option<HypertableSpec> {
257 let mut guard = match self.inner.lock() {
258 Ok(g) => g,
259 Err(p) => p.into_inner(),
260 };
261 guard.specs.remove(name)
262 }
263
264 pub fn route(&self, hypertable: &str, timestamp_ns: u64) -> Option<ChunkId> {
268 let mut guard = match self.inner.lock() {
269 Ok(g) => g,
270 Err(p) => p.into_inner(),
271 };
272 let spec = guard.specs.get(hypertable)?.clone();
273 let start = spec.chunk_start(timestamp_ns);
274 let end = spec.chunk_end_exclusive(timestamp_ns);
275 let id = ChunkId {
276 hypertable: spec.name.clone(),
277 start_ns: start,
278 };
279 let key = (spec.name.clone(), start);
280 let meta = guard
281 .chunks
282 .entry(key)
283 .or_insert_with(|| ChunkMeta::new(id.clone(), end));
284 meta.observe(timestamp_ns);
285 Some(id)
286 }
287
288 pub fn show_chunks(&self, hypertable: &str) -> Vec<ChunkMeta> {
290 let guard = match self.inner.lock() {
291 Ok(g) => g,
292 Err(p) => p.into_inner(),
293 };
294 guard
295 .chunks
296 .iter()
297 .filter(|((name, _), _)| name == hypertable)
298 .map(|(_, meta)| meta.clone())
299 .collect()
300 }
301
302 pub fn drop_chunks_before(&self, hypertable: &str, cutoff_ns: u64) -> Vec<ChunkMeta> {
307 let mut guard = match self.inner.lock() {
308 Ok(g) => g,
309 Err(p) => p.into_inner(),
310 };
311 let mut dropped = Vec::new();
312 let keys: Vec<(String, u64)> = guard
313 .chunks
314 .iter()
315 .filter(|((name, _), meta)| name == hypertable && meta.max_ts_ns <= cutoff_ns)
316 .map(|(k, _)| k.clone())
317 .collect();
318 for key in keys {
319 if let Some(meta) = guard.chunks.remove(&key) {
320 dropped.push(meta);
321 }
322 }
323 dropped
324 }
325
326 pub fn sweep_expired(&self, hypertable: &str, now_ns: u64) -> Vec<ChunkMeta> {
338 let mut guard = match self.inner.lock() {
339 Ok(g) => g,
340 Err(p) => p.into_inner(),
341 };
342 let Some(spec) = guard.specs.get(hypertable).cloned() else {
343 return Vec::new();
344 };
345 let expired_keys: Vec<(String, u64)> = guard
346 .chunks
347 .iter()
348 .filter(|((name, _), meta)| {
349 name == hypertable && meta.is_expired_at(now_ns, spec.default_ttl_ns)
350 })
351 .map(|(k, _)| k.clone())
352 .collect();
353 let mut dropped = Vec::with_capacity(expired_keys.len());
354 for key in expired_keys {
355 if let Some(meta) = guard.chunks.remove(&key) {
356 dropped.push(meta);
357 }
358 }
359 dropped
360 }
361
362 pub fn sweep_all_expired(&self, now_ns: u64) -> Vec<(String, ChunkMeta)> {
366 let names: Vec<String> = {
367 let guard = match self.inner.lock() {
368 Ok(g) => g,
369 Err(p) => p.into_inner(),
370 };
371 guard.specs.keys().cloned().collect()
372 };
373 let mut out = Vec::new();
374 for name in names {
375 for meta in self.sweep_expired(&name, now_ns) {
376 out.push((name.clone(), meta));
377 }
378 }
379 out
380 }
381
382 pub fn set_default_ttl_ns(&self, hypertable: &str, ttl_ns: Option<u64>) -> bool {
386 let mut guard = match self.inner.lock() {
387 Ok(g) => g,
388 Err(p) => p.into_inner(),
389 };
390 match guard.specs.get_mut(hypertable) {
391 Some(spec) => {
392 spec.default_ttl_ns = match ttl_ns {
393 Some(0) | None => None,
394 Some(v) => Some(v),
395 };
396 true
397 }
398 None => false,
399 }
400 }
401
402 pub fn set_chunk_ttl_ns(&self, id: &ChunkId, ttl_ns: Option<u64>) -> bool {
408 let mut guard = match self.inner.lock() {
409 Ok(g) => g,
410 Err(p) => p.into_inner(),
411 };
412 if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
413 meta.ttl_override_ns = ttl_ns;
414 true
415 } else {
416 false
417 }
418 }
419
420 pub fn chunks_expiring_within(
424 &self,
425 hypertable: &str,
426 now_ns: u64,
427 horizon_ns: u64,
428 ) -> Vec<ChunkMeta> {
429 let guard = match self.inner.lock() {
430 Ok(g) => g,
431 Err(p) => p.into_inner(),
432 };
433 let Some(spec) = guard.specs.get(hypertable).cloned() else {
434 return Vec::new();
435 };
436 let cutoff = now_ns.saturating_add(horizon_ns);
437 guard
438 .chunks
439 .iter()
440 .filter(|((name, _), _)| name == hypertable)
441 .filter_map(|(_, meta)| {
442 let expiry = meta.expiry_ns(spec.default_ttl_ns)?;
443 if expiry <= cutoff {
444 Some(meta.clone())
445 } else {
446 None
447 }
448 })
449 .collect()
450 }
451
452 pub fn seal_chunk(&self, id: &ChunkId) -> bool {
457 let mut guard = match self.inner.lock() {
458 Ok(g) => g,
459 Err(p) => p.into_inner(),
460 };
461 if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
462 meta.sealed = true;
463 true
464 } else {
465 false
466 }
467 }
468
469 pub fn total_rows(&self, hypertable: &str) -> u64 {
472 let guard = match self.inner.lock() {
473 Ok(g) => g,
474 Err(p) => p.into_inner(),
475 };
476 guard
477 .chunks
478 .iter()
479 .filter(|((name, _), _)| name == hypertable)
480 .map(|(_, meta)| meta.row_count)
481 .sum()
482 }
483
484 pub fn names(&self) -> Vec<String> {
486 let guard = match self.inner.lock() {
487 Ok(g) => g,
488 Err(p) => p.into_inner(),
489 };
490 guard.specs.keys().cloned().collect()
491 }
492
493 pub fn drop_hypertable(&self, name: &str) -> usize {
496 let mut guard = match self.inner.lock() {
497 Ok(g) => g,
498 Err(p) => p.into_inner(),
499 };
500 guard.specs.remove(name);
501 let keys: Vec<(String, u64)> = guard
502 .chunks
503 .keys()
504 .filter(|(n, _)| n == name)
505 .cloned()
506 .collect();
507 for key in &keys {
508 guard.chunks.remove(key);
509 }
510 keys.len()
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517
518 const DAY_NS: u64 = 86_400_000_000_000;
519 const HOUR_NS: u64 = 3_600_000_000_000;
520
521 #[test]
522 fn chunk_start_aligns_to_interval_floor() {
523 let spec = HypertableSpec::new("m", "ts", DAY_NS);
524 assert_eq!(spec.chunk_start(0), 0);
525 assert_eq!(spec.chunk_start(DAY_NS - 1), 0);
526 assert_eq!(spec.chunk_start(DAY_NS), DAY_NS);
527 assert_eq!(spec.chunk_start(3 * DAY_NS + 123), 3 * DAY_NS);
528 }
529
530 #[test]
531 fn interval_string_accepts_duration_units() {
532 let s = HypertableSpec::from_interval_string("m", "ts", "1d").unwrap();
533 assert_eq!(s.chunk_interval_ns, DAY_NS);
534 let s = HypertableSpec::from_interval_string("m", "ts", "1h").unwrap();
535 assert_eq!(s.chunk_interval_ns, HOUR_NS);
536 assert!(HypertableSpec::from_interval_string("m", "ts", "raw").is_none());
537 assert!(HypertableSpec::from_interval_string("m", "ts", "garbage").is_none());
538 }
539
540 #[test]
541 fn route_allocates_chunk_on_first_write() {
542 let reg = HypertableRegistry::new();
543 reg.register(HypertableSpec::new("metrics", "ts", DAY_NS));
544 let id = reg.route("metrics", DAY_NS + 100).unwrap();
545 assert_eq!(id.hypertable, "metrics");
546 assert_eq!(id.start_ns, DAY_NS);
547 let chunks = reg.show_chunks("metrics");
548 assert_eq!(chunks.len(), 1);
549 assert_eq!(chunks[0].row_count, 1);
550 assert_eq!(chunks[0].min_ts_ns, DAY_NS + 100);
551 assert_eq!(chunks[0].max_ts_ns, DAY_NS + 100);
552 assert_eq!(chunks[0].end_ns_exclusive, 2 * DAY_NS);
553 }
554
555 #[test]
556 fn route_groups_writes_within_same_chunk() {
557 let reg = HypertableRegistry::new();
558 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
559 for offset in [10u64, 100, 1_000, DAY_NS - 1] {
560 let id = reg.route("m", offset).unwrap();
561 assert_eq!(id.start_ns, 0);
562 }
563 let chunks = reg.show_chunks("m");
564 assert_eq!(chunks.len(), 1);
565 assert_eq!(chunks[0].row_count, 4);
566 }
567
568 #[test]
569 fn route_splits_writes_across_adjacent_chunks() {
570 let reg = HypertableRegistry::new();
571 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
572 reg.route("m", DAY_NS - 1).unwrap();
573 reg.route("m", DAY_NS).unwrap();
574 reg.route("m", 2 * DAY_NS).unwrap();
575 let chunks = reg.show_chunks("m");
576 assert_eq!(chunks.len(), 3);
577 assert!(chunks[0].id.start_ns <= chunks[1].id.start_ns);
578 assert!(chunks[1].id.start_ns <= chunks[2].id.start_ns);
579 }
580
581 #[test]
582 fn route_returns_none_for_unknown_hypertable() {
583 let reg = HypertableRegistry::new();
584 assert!(reg.route("nope", 0).is_none());
585 }
586
587 #[test]
588 fn drop_chunks_before_removes_matching_chunks() {
589 let reg = HypertableRegistry::new();
590 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
591 reg.route("m", 0).unwrap(); reg.route("m", DAY_NS).unwrap(); reg.route("m", 2 * DAY_NS + 5).unwrap(); let dropped = reg.drop_chunks_before("m", DAY_NS);
596 assert_eq!(dropped.len(), 2);
599 let remaining = reg.show_chunks("m");
600 assert_eq!(remaining.len(), 1);
601 assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
602 }
603
604 #[test]
605 fn show_chunks_is_ordered_by_start() {
606 let reg = HypertableRegistry::new();
607 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
608 for ts in [5 * DAY_NS, 2 * DAY_NS, 7 * DAY_NS, 1 * DAY_NS] {
609 reg.route("m", ts).unwrap();
610 }
611 let starts: Vec<u64> = reg.show_chunks("m").iter().map(|c| c.id.start_ns).collect();
612 assert_eq!(starts, vec![DAY_NS, 2 * DAY_NS, 5 * DAY_NS, 7 * DAY_NS]);
613 }
614
615 #[test]
616 fn seal_chunk_flips_flag() {
617 let reg = HypertableRegistry::new();
618 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
619 let id = reg.route("m", 0).unwrap();
620 assert!(reg.seal_chunk(&id));
621 assert!(reg.show_chunks("m")[0].sealed);
622 }
623
624 #[test]
625 fn drop_hypertable_removes_everything() {
626 let reg = HypertableRegistry::new();
627 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
628 reg.route("m", 0).unwrap();
629 reg.route("m", DAY_NS).unwrap();
630 assert_eq!(reg.drop_hypertable("m"), 2);
631 assert!(reg.get("m").is_none());
632 assert!(reg.show_chunks("m").is_empty());
633 }
634
635 #[test]
636 fn total_rows_sums_every_chunk() {
637 let reg = HypertableRegistry::new();
638 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
639 for ts in 0..1000 {
640 reg.route("m", ts).unwrap();
641 }
642 for ts in DAY_NS..DAY_NS + 500 {
643 reg.route("m", ts).unwrap();
644 }
645 assert_eq!(reg.total_rows("m"), 1500);
646 }
647
648 #[test]
649 fn names_lists_registered_hypertables() {
650 let reg = HypertableRegistry::new();
651 reg.register(HypertableSpec::new("a", "ts", DAY_NS));
652 reg.register(HypertableSpec::new("b", "ts", HOUR_NS));
653 let mut names = reg.names();
654 names.sort();
655 assert_eq!(names, vec!["a", "b"]);
656 }
657
658 #[test]
663 fn with_ttl_parses_duration_and_sets_default() {
664 let s = HypertableSpec::new("m", "ts", DAY_NS)
665 .with_ttl("7d")
666 .unwrap();
667 assert_eq!(s.default_ttl_ns, Some(7 * DAY_NS));
668 assert!(HypertableSpec::new("m", "ts", DAY_NS)
669 .with_ttl("raw")
670 .is_none());
671 assert!(HypertableSpec::new("m", "ts", DAY_NS)
672 .with_ttl("garbage")
673 .is_none());
674 }
675
676 #[test]
677 fn chunk_with_no_rows_never_expires() {
678 let meta = ChunkMeta::new(
679 ChunkId {
680 hypertable: "m".into(),
681 start_ns: 0,
682 },
683 DAY_NS,
684 );
685 assert!(!meta.is_expired_at(u64::MAX, Some(1)));
686 }
687
688 #[test]
689 fn chunk_expires_when_now_crosses_max_ts_plus_ttl() {
690 let mut meta = ChunkMeta::new(
691 ChunkId {
692 hypertable: "m".into(),
693 start_ns: 0,
694 },
695 DAY_NS,
696 );
697 meta.observe(500);
698 assert!(!meta.is_expired_at(1000, Some(1000)));
700 assert!(!meta.is_expired_at(1499, Some(1000)));
701 assert!(meta.is_expired_at(1500, Some(1000)));
702 }
703
704 #[test]
705 fn per_chunk_override_wins_over_hypertable_default() {
706 let mut meta = ChunkMeta::new(
707 ChunkId {
708 hypertable: "m".into(),
709 start_ns: 0,
710 },
711 DAY_NS,
712 );
713 meta.observe(500);
714 meta.ttl_override_ns = Some(100);
717 assert!(meta.is_expired_at(600, Some(1000)));
718 assert!(!meta.is_expired_at(599, Some(1000)));
719 }
720
721 #[test]
722 fn sweep_expired_drops_chunks_past_ttl_and_returns_them() {
723 let reg = HypertableRegistry::new();
724 reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(2 * DAY_NS));
725 for t in [0, DAY_NS, 2 * DAY_NS] {
727 reg.route("m", t).unwrap();
728 }
729 let dropped = reg.sweep_expired("m", 3 * DAY_NS + 1);
732 let mut starts: Vec<u64> = dropped.iter().map(|m| m.id.start_ns).collect();
733 starts.sort();
734 assert_eq!(starts, vec![0, DAY_NS]);
735 let remaining = reg.show_chunks("m");
736 assert_eq!(remaining.len(), 1);
737 assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
738 }
739
740 #[test]
741 fn sweep_without_ttl_keeps_every_chunk() {
742 let reg = HypertableRegistry::new();
743 reg.register(HypertableSpec::new("m", "ts", DAY_NS)); for t in [0, DAY_NS, 2 * DAY_NS] {
745 reg.route("m", t).unwrap();
746 }
747 let dropped = reg.sweep_expired("m", 10_000 * DAY_NS);
748 assert!(dropped.is_empty());
749 assert_eq!(reg.show_chunks("m").len(), 3);
750 }
751
752 #[test]
753 fn sweep_all_expired_iterates_every_hypertable() {
754 let reg = HypertableRegistry::new();
755 reg.register(HypertableSpec::new("fast", "ts", HOUR_NS).with_ttl_ns(HOUR_NS));
756 reg.register(HypertableSpec::new("slow", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
757 reg.route("fast", 0).unwrap();
760 reg.route("slow", 0).unwrap();
761 let dropped = reg.sweep_all_expired(2 * HOUR_NS);
762 assert_eq!(dropped.len(), 1);
763 assert_eq!(dropped[0].0, "fast");
764 assert_eq!(reg.show_chunks("slow").len(), 1);
765 }
766
767 #[test]
768 fn set_chunk_ttl_ns_lets_caller_pin_or_shorten() {
769 let reg = HypertableRegistry::new();
770 reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
771 let id = reg.route("m", 0).unwrap();
772 assert!(reg.set_chunk_ttl_ns(&id, Some(100 * DAY_NS)));
774 let dropped = reg.sweep_expired("m", 10 * DAY_NS);
775 assert!(dropped.is_empty());
776 reg.set_chunk_ttl_ns(&id, Some(HOUR_NS));
778 let dropped = reg.sweep_expired("m", 10 * HOUR_NS);
779 assert_eq!(dropped.len(), 1);
780 }
781
782 #[test]
783 fn chunks_expiring_within_previews_without_dropping() {
784 let reg = HypertableRegistry::new();
785 reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
786 for t in [0, DAY_NS, 2 * DAY_NS] {
788 reg.route("m", t).unwrap();
789 }
790 let preview = reg.chunks_expiring_within("m", 0, DAY_NS + DAY_NS / 2);
793 assert_eq!(preview.len(), 1);
794 assert_eq!(preview[0].id.start_ns, 0);
795 let preview2 = reg.chunks_expiring_within("m", 0, 2 * DAY_NS);
797 assert_eq!(preview2.len(), 2);
798 assert_eq!(reg.show_chunks("m").len(), 3);
800 }
801}