1use std::collections::BTreeMap;
29use std::sync::{Arc, Mutex};
30
31use super::retention::parse_duration_ns;
32use crate::storage::engine::PageLocation;
33
34#[derive(Debug, Clone)]
36pub struct HypertableSpec {
37 pub name: String,
38 pub time_column: String,
41 pub chunk_interval_ns: u64,
43 pub default_ttl_ns: Option<u64>,
54}
55
56impl HypertableSpec {
57 pub fn new(
58 name: impl Into<String>,
59 time_column: impl Into<String>,
60 chunk_interval_ns: u64,
61 ) -> Self {
62 Self {
63 name: name.into(),
64 time_column: time_column.into(),
65 chunk_interval_ns: chunk_interval_ns.max(1),
66 default_ttl_ns: None,
67 }
68 }
69
70 pub fn from_interval_string(
73 name: impl Into<String>,
74 time_column: impl Into<String>,
75 interval: &str,
76 ) -> Option<Self> {
77 let ns = parse_duration_ns(interval)?;
78 if ns == 0 {
79 return None;
80 }
81 Some(Self::new(name, time_column, ns))
82 }
83
84 pub fn with_ttl(mut self, ttl: &str) -> Option<Self> {
87 let ns = parse_duration_ns(ttl)?;
88 if ns == 0 {
89 return None;
90 }
91 self.default_ttl_ns = Some(ns);
92 Some(self)
93 }
94
95 pub fn with_ttl_ns(mut self, ttl_ns: u64) -> Self {
97 self.default_ttl_ns = if ttl_ns == 0 { None } else { Some(ttl_ns) };
98 self
99 }
100
101 pub fn chunk_start(&self, timestamp_ns: u64) -> u64 {
105 (timestamp_ns / self.chunk_interval_ns) * self.chunk_interval_ns
106 }
107
108 pub fn chunk_end_exclusive(&self, timestamp_ns: u64) -> u64 {
109 self.chunk_start(timestamp_ns)
110 .saturating_add(self.chunk_interval_ns)
111 }
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, Hash)]
117pub struct ChunkId {
118 pub hypertable: String,
119 pub start_ns: u64,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum ChunkFormat {
131 Row,
135 ColumnarV1,
138}
139
140#[derive(Debug, Clone)]
143pub struct ChunkMeta {
144 pub id: ChunkId,
145 pub end_ns_exclusive: u64,
146 pub row_count: u64,
147 pub min_ts_ns: u64,
148 pub max_ts_ns: u64,
149 pub sealed: bool,
150 pub ttl_override_ns: Option<u64>,
156 pub columnar_page: Option<PageLocation>,
164}
165
166impl ChunkMeta {
167 pub fn new(id: ChunkId, end_ns_exclusive: u64) -> Self {
168 Self {
169 id,
170 end_ns_exclusive,
171 row_count: 0,
172 min_ts_ns: u64::MAX,
173 max_ts_ns: 0,
174 sealed: false,
175 ttl_override_ns: None,
176 columnar_page: None,
177 }
178 }
179
180 pub fn format(&self) -> ChunkFormat {
187 match self.columnar_page {
188 Some(_) => ChunkFormat::ColumnarV1,
189 None => ChunkFormat::Row,
190 }
191 }
192
193 pub fn is_columnar(&self) -> bool {
195 matches!(self.format(), ChunkFormat::ColumnarV1)
196 }
197
198 pub fn observe(&mut self, ts_ns: u64) {
199 self.row_count += 1;
200 if ts_ns < self.min_ts_ns {
201 self.min_ts_ns = ts_ns;
202 }
203 if ts_ns > self.max_ts_ns {
204 self.max_ts_ns = ts_ns;
205 }
206 }
207
208 pub fn effective_ttl_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
212 self.ttl_override_ns.or(default_ttl_ns)
213 }
214
215 pub fn expiry_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
220 let ttl = self.effective_ttl_ns(default_ttl_ns)?;
221 if self.row_count == 0 {
222 return None;
223 }
224 Some(self.max_ts_ns.saturating_add(ttl))
225 }
226
227 pub fn is_expired_at(&self, now_ns: u64, default_ttl_ns: Option<u64>) -> bool {
228 match self.expiry_ns(default_ttl_ns) {
229 Some(expiry) => now_ns >= expiry,
230 None => false,
231 }
232 }
233}
234
235#[derive(Clone, Default)]
238pub struct HypertableRegistry {
239 inner: Arc<Mutex<RegistryInner>>,
240}
241
242#[derive(Default)]
243struct RegistryInner {
244 specs: BTreeMap<String, HypertableSpec>,
245 chunks: BTreeMap<(String, u64), ChunkMeta>,
249 columnar_blocks: BTreeMap<(String, u64), Vec<u8>>,
256}
257
258impl std::fmt::Debug for HypertableRegistry {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 let guard = match self.inner.lock() {
261 Ok(g) => g,
262 Err(p) => p.into_inner(),
263 };
264 f.debug_struct("HypertableRegistry")
265 .field("hypertables", &guard.specs.len())
266 .field("chunks", &guard.chunks.len())
267 .finish()
268 }
269}
270
271impl HypertableRegistry {
272 pub fn new() -> Self {
273 Self::default()
274 }
275
276 pub fn register(&self, spec: HypertableSpec) {
281 let mut guard = match self.inner.lock() {
282 Ok(g) => g,
283 Err(p) => p.into_inner(),
284 };
285 guard.specs.insert(spec.name.clone(), spec);
286 }
287
288 pub fn get(&self, name: &str) -> Option<HypertableSpec> {
289 let guard = match self.inner.lock() {
290 Ok(g) => g,
291 Err(p) => p.into_inner(),
292 };
293 guard.specs.get(name).cloned()
294 }
295
296 pub fn list(&self) -> Vec<HypertableSpec> {
297 let guard = match self.inner.lock() {
298 Ok(g) => g,
299 Err(p) => p.into_inner(),
300 };
301 guard.specs.values().cloned().collect()
302 }
303
304 pub fn unregister(&self, name: &str) -> Option<HypertableSpec> {
309 let mut guard = match self.inner.lock() {
310 Ok(g) => g,
311 Err(p) => p.into_inner(),
312 };
313 guard.specs.remove(name)
314 }
315
316 pub fn route(&self, hypertable: &str, timestamp_ns: u64) -> Option<ChunkId> {
320 let mut guard = match self.inner.lock() {
321 Ok(g) => g,
322 Err(p) => p.into_inner(),
323 };
324 let spec = guard.specs.get(hypertable)?.clone();
325 let start = spec.chunk_start(timestamp_ns);
326 let end = spec.chunk_end_exclusive(timestamp_ns);
327 let id = ChunkId {
328 hypertable: spec.name.clone(),
329 start_ns: start,
330 };
331 let key = (spec.name.clone(), start);
332 let meta = guard
333 .chunks
334 .entry(key)
335 .or_insert_with(|| ChunkMeta::new(id.clone(), end));
336 meta.observe(timestamp_ns);
337 Some(id)
338 }
339
340 pub fn show_chunks(&self, hypertable: &str) -> Vec<ChunkMeta> {
342 let guard = match self.inner.lock() {
343 Ok(g) => g,
344 Err(p) => p.into_inner(),
345 };
346 guard
347 .chunks
348 .iter()
349 .filter(|((name, _), _)| name == hypertable)
350 .map(|(_, meta)| meta.clone())
351 .collect()
352 }
353
354 pub fn drop_chunks_before(&self, hypertable: &str, cutoff_ns: u64) -> Vec<ChunkMeta> {
359 let mut guard = match self.inner.lock() {
360 Ok(g) => g,
361 Err(p) => p.into_inner(),
362 };
363 let mut dropped = Vec::new();
364 let keys: Vec<(String, u64)> = guard
365 .chunks
366 .iter()
367 .filter(|((name, _), meta)| name == hypertable && meta.max_ts_ns <= cutoff_ns)
368 .map(|(k, _)| k.clone())
369 .collect();
370 for key in keys {
371 if let Some(meta) = guard.chunks.remove(&key) {
372 dropped.push(meta);
373 }
374 }
375 dropped
376 }
377
378 pub fn sweep_expired(&self, hypertable: &str, now_ns: u64) -> Vec<ChunkMeta> {
390 let mut guard = match self.inner.lock() {
391 Ok(g) => g,
392 Err(p) => p.into_inner(),
393 };
394 let Some(spec) = guard.specs.get(hypertable).cloned() else {
395 return Vec::new();
396 };
397 let expired_keys: Vec<(String, u64)> = guard
398 .chunks
399 .iter()
400 .filter(|((name, _), meta)| {
401 name == hypertable && meta.is_expired_at(now_ns, spec.default_ttl_ns)
402 })
403 .map(|(k, _)| k.clone())
404 .collect();
405 let mut dropped = Vec::with_capacity(expired_keys.len());
406 for key in expired_keys {
407 if let Some(meta) = guard.chunks.remove(&key) {
408 dropped.push(meta);
409 }
410 }
411 dropped
412 }
413
414 pub fn sweep_all_expired(&self, now_ns: u64) -> Vec<(String, ChunkMeta)> {
418 let names: Vec<String> = {
419 let guard = match self.inner.lock() {
420 Ok(g) => g,
421 Err(p) => p.into_inner(),
422 };
423 guard.specs.keys().cloned().collect()
424 };
425 let mut out = Vec::new();
426 for name in names {
427 for meta in self.sweep_expired(&name, now_ns) {
428 out.push((name.clone(), meta));
429 }
430 }
431 out
432 }
433
434 pub fn set_default_ttl_ns(&self, hypertable: &str, ttl_ns: Option<u64>) -> bool {
438 let mut guard = match self.inner.lock() {
439 Ok(g) => g,
440 Err(p) => p.into_inner(),
441 };
442 match guard.specs.get_mut(hypertable) {
443 Some(spec) => {
444 spec.default_ttl_ns = match ttl_ns {
445 Some(0) | None => None,
446 Some(v) => Some(v),
447 };
448 true
449 }
450 None => false,
451 }
452 }
453
454 pub fn set_chunk_ttl_ns(&self, id: &ChunkId, ttl_ns: Option<u64>) -> bool {
460 let mut guard = match self.inner.lock() {
461 Ok(g) => g,
462 Err(p) => p.into_inner(),
463 };
464 if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
465 meta.ttl_override_ns = ttl_ns;
466 true
467 } else {
468 false
469 }
470 }
471
472 pub fn chunks_expiring_within(
476 &self,
477 hypertable: &str,
478 now_ns: u64,
479 horizon_ns: u64,
480 ) -> Vec<ChunkMeta> {
481 let guard = match self.inner.lock() {
482 Ok(g) => g,
483 Err(p) => p.into_inner(),
484 };
485 let Some(spec) = guard.specs.get(hypertable).cloned() else {
486 return Vec::new();
487 };
488 let cutoff = now_ns.saturating_add(horizon_ns);
489 guard
490 .chunks
491 .iter()
492 .filter(|((name, _), _)| name == hypertable)
493 .filter_map(|(_, meta)| {
494 let expiry = meta.expiry_ns(spec.default_ttl_ns)?;
495 if expiry <= cutoff {
496 Some(meta.clone())
497 } else {
498 None
499 }
500 })
501 .collect()
502 }
503
504 pub fn seal_chunk(&self, id: &ChunkId) -> bool {
509 let mut guard = match self.inner.lock() {
510 Ok(g) => g,
511 Err(p) => p.into_inner(),
512 };
513 if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
514 meta.sealed = true;
515 true
516 } else {
517 false
518 }
519 }
520
521 pub fn seal_chunk_columnar(&self, id: &ChunkId, page: PageLocation, bytes: Vec<u8>) -> bool {
528 let mut guard = match self.inner.lock() {
529 Ok(g) => g,
530 Err(p) => p.into_inner(),
531 };
532 let key = (id.hypertable.clone(), id.start_ns);
533 if let Some(meta) = guard.chunks.get_mut(&key) {
534 meta.sealed = true;
535 meta.columnar_page = Some(page);
536 guard.columnar_blocks.insert(key, bytes);
537 true
538 } else {
539 false
540 }
541 }
542
543 pub fn columnar_block(&self, id: &ChunkId) -> Option<Vec<u8>> {
548 let guard = match self.inner.lock() {
549 Ok(g) => g,
550 Err(p) => p.into_inner(),
551 };
552 guard
553 .columnar_blocks
554 .get(&(id.hypertable.clone(), id.start_ns))
555 .cloned()
556 }
557
558 pub fn total_rows(&self, hypertable: &str) -> u64 {
561 let guard = match self.inner.lock() {
562 Ok(g) => g,
563 Err(p) => p.into_inner(),
564 };
565 guard
566 .chunks
567 .iter()
568 .filter(|((name, _), _)| name == hypertable)
569 .map(|(_, meta)| meta.row_count)
570 .sum()
571 }
572
573 pub fn names(&self) -> Vec<String> {
575 let guard = match self.inner.lock() {
576 Ok(g) => g,
577 Err(p) => p.into_inner(),
578 };
579 guard.specs.keys().cloned().collect()
580 }
581
582 pub fn is_empty(&self) -> bool {
586 let guard = match self.inner.lock() {
587 Ok(g) => g,
588 Err(p) => p.into_inner(),
589 };
590 guard.specs.is_empty() && guard.chunks.is_empty()
591 }
592
593 pub fn snapshot_chunks(&self) -> Vec<ChunkMeta> {
598 let guard = match self.inner.lock() {
599 Ok(g) => g,
600 Err(p) => p.into_inner(),
601 };
602 guard.chunks.values().cloned().collect()
603 }
604
605 pub fn restore_chunk(&self, meta: ChunkMeta) {
618 let mut guard = match self.inner.lock() {
619 Ok(g) => g,
620 Err(p) => p.into_inner(),
621 };
622 let key = (meta.id.hypertable.clone(), meta.id.start_ns);
623 guard.chunks.insert(key, meta);
624 }
625
626 pub fn drop_hypertable(&self, name: &str) -> usize {
629 let mut guard = match self.inner.lock() {
630 Ok(g) => g,
631 Err(p) => p.into_inner(),
632 };
633 guard.specs.remove(name);
634 let keys: Vec<(String, u64)> = guard
635 .chunks
636 .keys()
637 .filter(|(n, _)| n == name)
638 .cloned()
639 .collect();
640 for key in &keys {
641 guard.chunks.remove(key);
642 }
643 keys.len()
644 }
645}
646
647#[cfg(test)]
648mod tests {
649 use super::*;
650
651 const DAY_NS: u64 = 86_400_000_000_000;
652 const HOUR_NS: u64 = 3_600_000_000_000;
653
654 #[test]
655 fn chunk_start_aligns_to_interval_floor() {
656 let spec = HypertableSpec::new("m", "ts", DAY_NS);
657 assert_eq!(spec.chunk_start(0), 0);
658 assert_eq!(spec.chunk_start(DAY_NS - 1), 0);
659 assert_eq!(spec.chunk_start(DAY_NS), DAY_NS);
660 assert_eq!(spec.chunk_start(3 * DAY_NS + 123), 3 * DAY_NS);
661 }
662
663 #[test]
664 fn interval_string_accepts_duration_units() {
665 let s = HypertableSpec::from_interval_string("m", "ts", "1d").unwrap();
666 assert_eq!(s.chunk_interval_ns, DAY_NS);
667 let s = HypertableSpec::from_interval_string("m", "ts", "1h").unwrap();
668 assert_eq!(s.chunk_interval_ns, HOUR_NS);
669 assert!(HypertableSpec::from_interval_string("m", "ts", "raw").is_none());
670 assert!(HypertableSpec::from_interval_string("m", "ts", "garbage").is_none());
671 }
672
673 #[test]
674 fn route_allocates_chunk_on_first_write() {
675 let reg = HypertableRegistry::new();
676 reg.register(HypertableSpec::new("metrics", "ts", DAY_NS));
677 let id = reg.route("metrics", DAY_NS + 100).unwrap();
678 assert_eq!(id.hypertable, "metrics");
679 assert_eq!(id.start_ns, DAY_NS);
680 let chunks = reg.show_chunks("metrics");
681 assert_eq!(chunks.len(), 1);
682 assert_eq!(chunks[0].row_count, 1);
683 assert_eq!(chunks[0].min_ts_ns, DAY_NS + 100);
684 assert_eq!(chunks[0].max_ts_ns, DAY_NS + 100);
685 assert_eq!(chunks[0].end_ns_exclusive, 2 * DAY_NS);
686 }
687
688 #[test]
689 fn route_groups_writes_within_same_chunk() {
690 let reg = HypertableRegistry::new();
691 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
692 for offset in [10u64, 100, 1_000, DAY_NS - 1] {
693 let id = reg.route("m", offset).unwrap();
694 assert_eq!(id.start_ns, 0);
695 }
696 let chunks = reg.show_chunks("m");
697 assert_eq!(chunks.len(), 1);
698 assert_eq!(chunks[0].row_count, 4);
699 }
700
701 #[test]
702 fn route_splits_writes_across_adjacent_chunks() {
703 let reg = HypertableRegistry::new();
704 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
705 reg.route("m", DAY_NS - 1).unwrap();
706 reg.route("m", DAY_NS).unwrap();
707 reg.route("m", 2 * DAY_NS).unwrap();
708 let chunks = reg.show_chunks("m");
709 assert_eq!(chunks.len(), 3);
710 assert!(chunks[0].id.start_ns <= chunks[1].id.start_ns);
711 assert!(chunks[1].id.start_ns <= chunks[2].id.start_ns);
712 }
713
714 #[test]
715 fn route_returns_none_for_unknown_hypertable() {
716 let reg = HypertableRegistry::new();
717 assert!(reg.route("nope", 0).is_none());
718 }
719
720 #[test]
721 fn drop_chunks_before_removes_matching_chunks() {
722 let reg = HypertableRegistry::new();
723 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
724 reg.route("m", 0).unwrap(); reg.route("m", DAY_NS).unwrap(); reg.route("m", 2 * DAY_NS + 5).unwrap(); let dropped = reg.drop_chunks_before("m", DAY_NS);
729 assert_eq!(dropped.len(), 2);
732 let remaining = reg.show_chunks("m");
733 assert_eq!(remaining.len(), 1);
734 assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
735 }
736
737 #[test]
738 fn show_chunks_is_ordered_by_start() {
739 let reg = HypertableRegistry::new();
740 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
741 for ts in [5 * DAY_NS, 2 * DAY_NS, 7 * DAY_NS, 1 * DAY_NS] {
742 reg.route("m", ts).unwrap();
743 }
744 let starts: Vec<u64> = reg.show_chunks("m").iter().map(|c| c.id.start_ns).collect();
745 assert_eq!(starts, vec![DAY_NS, 2 * DAY_NS, 5 * DAY_NS, 7 * DAY_NS]);
746 }
747
748 #[test]
749 fn seal_chunk_flips_flag() {
750 let reg = HypertableRegistry::new();
751 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
752 let id = reg.route("m", 0).unwrap();
753 assert!(reg.seal_chunk(&id));
754 assert!(reg.show_chunks("m")[0].sealed);
755 }
756
757 #[test]
758 fn drop_hypertable_removes_everything() {
759 let reg = HypertableRegistry::new();
760 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
761 reg.route("m", 0).unwrap();
762 reg.route("m", DAY_NS).unwrap();
763 assert_eq!(reg.drop_hypertable("m"), 2);
764 assert!(reg.get("m").is_none());
765 assert!(reg.show_chunks("m").is_empty());
766 }
767
768 #[test]
769 fn total_rows_sums_every_chunk() {
770 let reg = HypertableRegistry::new();
771 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
772 for ts in 0..1000 {
773 reg.route("m", ts).unwrap();
774 }
775 for ts in DAY_NS..DAY_NS + 500 {
776 reg.route("m", ts).unwrap();
777 }
778 assert_eq!(reg.total_rows("m"), 1500);
779 }
780
781 #[test]
782 fn names_lists_registered_hypertables() {
783 let reg = HypertableRegistry::new();
784 reg.register(HypertableSpec::new("a", "ts", DAY_NS));
785 reg.register(HypertableSpec::new("b", "ts", HOUR_NS));
786 let mut names = reg.names();
787 names.sort();
788 assert_eq!(names, vec!["a", "b"]);
789 }
790
791 #[test]
796 fn with_ttl_parses_duration_and_sets_default() {
797 let s = HypertableSpec::new("m", "ts", DAY_NS)
798 .with_ttl("7d")
799 .unwrap();
800 assert_eq!(s.default_ttl_ns, Some(7 * DAY_NS));
801 assert!(HypertableSpec::new("m", "ts", DAY_NS)
802 .with_ttl("raw")
803 .is_none());
804 assert!(HypertableSpec::new("m", "ts", DAY_NS)
805 .with_ttl("garbage")
806 .is_none());
807 }
808
809 #[test]
810 fn chunk_with_no_rows_never_expires() {
811 let meta = ChunkMeta::new(
812 ChunkId {
813 hypertable: "m".into(),
814 start_ns: 0,
815 },
816 DAY_NS,
817 );
818 assert!(!meta.is_expired_at(u64::MAX, Some(1)));
819 }
820
821 #[test]
822 fn chunk_expires_when_now_crosses_max_ts_plus_ttl() {
823 let mut meta = ChunkMeta::new(
824 ChunkId {
825 hypertable: "m".into(),
826 start_ns: 0,
827 },
828 DAY_NS,
829 );
830 meta.observe(500);
831 assert!(!meta.is_expired_at(1000, Some(1000)));
833 assert!(!meta.is_expired_at(1499, Some(1000)));
834 assert!(meta.is_expired_at(1500, Some(1000)));
835 }
836
837 #[test]
838 fn per_chunk_override_wins_over_hypertable_default() {
839 let mut meta = ChunkMeta::new(
840 ChunkId {
841 hypertable: "m".into(),
842 start_ns: 0,
843 },
844 DAY_NS,
845 );
846 meta.observe(500);
847 meta.ttl_override_ns = Some(100);
850 assert!(meta.is_expired_at(600, Some(1000)));
851 assert!(!meta.is_expired_at(599, Some(1000)));
852 }
853
854 #[test]
855 fn sweep_expired_drops_chunks_past_ttl_and_returns_them() {
856 let reg = HypertableRegistry::new();
857 reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(2 * DAY_NS));
858 for t in [0, DAY_NS, 2 * DAY_NS] {
860 reg.route("m", t).unwrap();
861 }
862 let dropped = reg.sweep_expired("m", 3 * DAY_NS + 1);
865 let mut starts: Vec<u64> = dropped.iter().map(|m| m.id.start_ns).collect();
866 starts.sort();
867 assert_eq!(starts, vec![0, DAY_NS]);
868 let remaining = reg.show_chunks("m");
869 assert_eq!(remaining.len(), 1);
870 assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
871 }
872
873 #[test]
874 fn sweep_without_ttl_keeps_every_chunk() {
875 let reg = HypertableRegistry::new();
876 reg.register(HypertableSpec::new("m", "ts", DAY_NS)); for t in [0, DAY_NS, 2 * DAY_NS] {
878 reg.route("m", t).unwrap();
879 }
880 let dropped = reg.sweep_expired("m", 10_000 * DAY_NS);
881 assert!(dropped.is_empty());
882 assert_eq!(reg.show_chunks("m").len(), 3);
883 }
884
885 #[test]
886 fn sweep_all_expired_iterates_every_hypertable() {
887 let reg = HypertableRegistry::new();
888 reg.register(HypertableSpec::new("fast", "ts", HOUR_NS).with_ttl_ns(HOUR_NS));
889 reg.register(HypertableSpec::new("slow", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
890 reg.route("fast", 0).unwrap();
893 reg.route("slow", 0).unwrap();
894 let dropped = reg.sweep_all_expired(2 * HOUR_NS);
895 assert_eq!(dropped.len(), 1);
896 assert_eq!(dropped[0].0, "fast");
897 assert_eq!(reg.show_chunks("slow").len(), 1);
898 }
899
900 #[test]
901 fn set_chunk_ttl_ns_lets_caller_pin_or_shorten() {
902 let reg = HypertableRegistry::new();
903 reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
904 let id = reg.route("m", 0).unwrap();
905 assert!(reg.set_chunk_ttl_ns(&id, Some(100 * DAY_NS)));
907 let dropped = reg.sweep_expired("m", 10 * DAY_NS);
908 assert!(dropped.is_empty());
909 reg.set_chunk_ttl_ns(&id, Some(HOUR_NS));
911 let dropped = reg.sweep_expired("m", 10 * HOUR_NS);
912 assert_eq!(dropped.len(), 1);
913 }
914
915 #[test]
916 fn snapshot_then_restore_reproduces_registry_identically() {
917 let reg = HypertableRegistry::new();
921 reg.register(HypertableSpec::new("metrics", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
922 reg.register(HypertableSpec::new("events", "ts", HOUR_NS));
923 for t in [0, DAY_NS + 5, DAY_NS + 9, 2 * DAY_NS] {
924 reg.route("metrics", t).unwrap();
925 }
926 let id = reg.route("events", 0).unwrap();
927 reg.seal_chunk(&id);
928 reg.set_chunk_ttl_ns(&id, Some(3 * HOUR_NS));
929
930 let specs = reg.list();
931 let chunks = reg.snapshot_chunks();
932 assert!(!reg.is_empty());
933
934 let restored = HypertableRegistry::new();
936 assert!(restored.is_empty());
937 for spec in specs {
938 restored.register(spec);
939 }
940 for chunk in chunks {
941 restored.restore_chunk(chunk);
942 }
943
944 let before = reg.get("metrics").unwrap();
946 let after = restored.get("metrics").unwrap();
947 assert_eq!(after.chunk_interval_ns, before.chunk_interval_ns);
948 assert_eq!(after.time_column, before.time_column);
949 assert_eq!(after.default_ttl_ns, before.default_ttl_ns);
950
951 let m_before = reg.show_chunks("metrics");
953 let m_after = restored.show_chunks("metrics");
954 assert_eq!(m_after.len(), m_before.len());
955 for (a, b) in m_after.iter().zip(m_before.iter()) {
956 assert_eq!(a.id.start_ns, b.id.start_ns);
957 assert_eq!(a.end_ns_exclusive, b.end_ns_exclusive);
958 assert_eq!(a.row_count, b.row_count);
959 assert_eq!(a.min_ts_ns, b.min_ts_ns);
960 assert_eq!(a.max_ts_ns, b.max_ts_ns);
961 }
962 let e_after = restored.show_chunks("events");
963 assert_eq!(e_after.len(), 1);
964 assert!(e_after[0].sealed, "sealed flag must survive restore");
965 assert_eq!(e_after[0].ttl_override_ns, Some(3 * HOUR_NS));
966
967 let routed = restored.route("metrics", DAY_NS + 1).unwrap();
970 assert_eq!(routed.start_ns, DAY_NS);
971 assert_eq!(
972 restored.show_chunks("metrics").len(),
973 m_before.len(),
974 "write after restore must not allocate a new chunk"
975 );
976 }
977
978 fn columnar_chunk(hypertable: &str, start_ns: u64, max_ts_ns: u64) -> ChunkMeta {
994 let mut meta = ChunkMeta::new(
995 ChunkId {
996 hypertable: hypertable.into(),
997 start_ns,
998 },
999 start_ns + DAY_NS,
1000 );
1001 meta.row_count = 1;
1002 meta.min_ts_ns = max_ts_ns;
1003 meta.max_ts_ns = max_ts_ns;
1004 meta.sealed = true;
1005 meta.columnar_page = Some(PageLocation::new(7, 0, 1234));
1006 meta
1007 }
1008
1009 #[test]
1010 fn columnar_chunk_evicts_via_sweep_expired_carrying_its_page() {
1011 let reg = HypertableRegistry::new();
1012 reg.register(HypertableSpec::new("metrics", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1013 reg.restore_chunk(columnar_chunk("metrics", 0, 0));
1015 assert!(reg.show_chunks("metrics")[0].columnar_page.is_some());
1016
1017 let dropped = reg.sweep_expired("metrics", 3 * DAY_NS);
1019 assert_eq!(dropped.len(), 1, "columnar chunk must evict via TTL sweep");
1020 assert_eq!(
1021 dropped[0].columnar_page,
1022 Some(PageLocation::new(7, 0, 1234)),
1023 "dropped meta must carry columnar_page so physical release frees the RDCC block"
1024 );
1025 assert!(reg.show_chunks("metrics").is_empty());
1026 }
1027
1028 #[test]
1029 fn columnar_chunk_evicts_via_drop_chunks_before() {
1030 let reg = HypertableRegistry::new();
1031 reg.register(HypertableSpec::new("metrics", "ts", DAY_NS));
1032 reg.restore_chunk(columnar_chunk("metrics", 0, 0));
1033
1034 let dropped = reg.drop_chunks_before("metrics", DAY_NS);
1035 assert_eq!(dropped.len(), 1);
1036 assert!(
1037 dropped[0].columnar_page.is_some(),
1038 "drop_chunks_before is metadata-only and carries columnar_page through"
1039 );
1040 assert!(reg.show_chunks("metrics").is_empty());
1041 }
1042
1043 #[test]
1044 fn columnar_and_row_chunks_share_one_eviction_path() {
1045 let mk = |columnar: bool| {
1050 let reg = HypertableRegistry::new();
1051 reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1052 if columnar {
1053 reg.restore_chunk(columnar_chunk("m", 0, 0));
1054 } else {
1055 reg.route("m", 0).unwrap(); }
1057 reg.sweep_expired("m", 3 * DAY_NS).len()
1058 };
1059 assert_eq!(mk(false), 1, "row chunk evicts");
1060 assert_eq!(mk(true), 1, "columnar chunk evicts the same way");
1061 }
1062
1063 #[test]
1064 fn columnar_chunk_prunes_by_time_bounds_like_row_chunk() {
1065 let reg = HypertableRegistry::new();
1072 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1073 reg.restore_chunk(columnar_chunk("m", 0, 0));
1074 reg.restore_chunk(columnar_chunk("m", 2 * DAY_NS, 2 * DAY_NS));
1075 let chunks = reg.show_chunks("m");
1076 let lo = 2 * DAY_NS;
1079 let hi = 3 * DAY_NS;
1080 let overlapping: Vec<u64> = chunks
1081 .iter()
1082 .filter(|c| c.id.start_ns < hi && c.end_ns_exclusive > lo)
1083 .map(|c| c.id.start_ns)
1084 .collect();
1085 assert_eq!(
1086 overlapping,
1087 vec![2 * DAY_NS],
1088 "only in-window columnar chunk kept"
1089 );
1090 assert!(chunks.iter().all(|c| c.columnar_page.is_some()));
1091 }
1092
1093 #[test]
1099 fn chunk_format_dispatches_on_columnar_page_discriminant() {
1100 let reg = HypertableRegistry::new();
1101 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1102 reg.route("m", 0).unwrap();
1104 reg.restore_chunk(columnar_chunk("m", DAY_NS, DAY_NS));
1105
1106 let chunks = reg.show_chunks("m");
1107 let row = chunks.iter().find(|c| c.id.start_ns == 0).unwrap();
1108 let col = chunks.iter().find(|c| c.id.start_ns == DAY_NS).unwrap();
1109
1110 assert_eq!(row.format(), ChunkFormat::Row);
1111 assert!(!row.is_columnar());
1112 assert_eq!(col.format(), ChunkFormat::ColumnarV1);
1113 assert!(col.is_columnar());
1114 }
1115
1116 #[test]
1117 fn chunks_expiring_within_previews_without_dropping() {
1118 let reg = HypertableRegistry::new();
1119 reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1120 for t in [0, DAY_NS, 2 * DAY_NS] {
1122 reg.route("m", t).unwrap();
1123 }
1124 let preview = reg.chunks_expiring_within("m", 0, DAY_NS + DAY_NS / 2);
1127 assert_eq!(preview.len(), 1);
1128 assert_eq!(preview[0].id.start_ns, 0);
1129 let preview2 = reg.chunks_expiring_within("m", 0, 2 * DAY_NS);
1131 assert_eq!(preview2.len(), 2);
1132 assert_eq!(reg.show_chunks("m").len(), 3);
1134 }
1135}