1use std::collections::BTreeMap;
29use std::sync::{Arc, Mutex};
30
31use super::chunk::{
32 points_from_column_block, COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID, DEFAULT_GRANULE_SIZE,
33};
34use super::retention::parse_duration_ns;
35use crate::storage::engine::PageLocation;
36use crate::storage::schema::types::DataType;
37use crate::storage::unified::column_block::{write_column_block, ColumnBlockError, ColumnInput};
38use crate::storage::unified::segment_codec::ColumnSemantics;
39
40#[derive(Debug, Clone)]
42pub struct HypertableSpec {
43 pub name: String,
44 pub time_column: String,
47 pub chunk_interval_ns: u64,
49 pub default_ttl_ns: Option<u64>,
60}
61
62impl HypertableSpec {
63 pub fn new(
64 name: impl Into<String>,
65 time_column: impl Into<String>,
66 chunk_interval_ns: u64,
67 ) -> Self {
68 Self {
69 name: name.into(),
70 time_column: time_column.into(),
71 chunk_interval_ns: chunk_interval_ns.max(1),
72 default_ttl_ns: None,
73 }
74 }
75
76 pub fn from_interval_string(
79 name: impl Into<String>,
80 time_column: impl Into<String>,
81 interval: &str,
82 ) -> Option<Self> {
83 let ns = parse_duration_ns(interval)?;
84 if ns == 0 {
85 return None;
86 }
87 Some(Self::new(name, time_column, ns))
88 }
89
90 pub fn with_ttl(mut self, ttl: &str) -> Option<Self> {
93 let ns = parse_duration_ns(ttl)?;
94 if ns == 0 {
95 return None;
96 }
97 self.default_ttl_ns = Some(ns);
98 Some(self)
99 }
100
101 pub fn with_ttl_ns(mut self, ttl_ns: u64) -> Self {
103 self.default_ttl_ns = if ttl_ns == 0 { None } else { Some(ttl_ns) };
104 self
105 }
106
107 pub fn chunk_start(&self, timestamp_ns: u64) -> u64 {
111 (timestamp_ns / self.chunk_interval_ns) * self.chunk_interval_ns
112 }
113
114 pub fn chunk_end_exclusive(&self, timestamp_ns: u64) -> u64 {
115 self.chunk_start(timestamp_ns)
116 .saturating_add(self.chunk_interval_ns)
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, Hash)]
123pub struct ChunkId {
124 pub hypertable: String,
125 pub start_ns: u64,
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum ChunkFormat {
137 Row,
141 ColumnarV1,
144}
145
146#[derive(Debug, Clone)]
149pub struct ChunkMeta {
150 pub id: ChunkId,
151 pub end_ns_exclusive: u64,
152 pub row_count: u64,
153 pub min_ts_ns: u64,
154 pub max_ts_ns: u64,
155 pub sealed: bool,
156 pub ttl_override_ns: Option<u64>,
162 pub columnar_page: Option<PageLocation>,
170}
171
172impl ChunkMeta {
173 pub fn new(id: ChunkId, end_ns_exclusive: u64) -> Self {
174 Self {
175 id,
176 end_ns_exclusive,
177 row_count: 0,
178 min_ts_ns: u64::MAX,
179 max_ts_ns: 0,
180 sealed: false,
181 ttl_override_ns: None,
182 columnar_page: None,
183 }
184 }
185
186 pub fn format(&self) -> ChunkFormat {
193 match self.columnar_page {
194 Some(_) => ChunkFormat::ColumnarV1,
195 None => ChunkFormat::Row,
196 }
197 }
198
199 pub fn is_columnar(&self) -> bool {
201 matches!(self.format(), ChunkFormat::ColumnarV1)
202 }
203
204 pub fn observe(&mut self, ts_ns: u64) {
205 self.row_count += 1;
206 if ts_ns < self.min_ts_ns {
207 self.min_ts_ns = ts_ns;
208 }
209 if ts_ns > self.max_ts_ns {
210 self.max_ts_ns = ts_ns;
211 }
212 }
213
214 pub fn effective_ttl_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
218 self.ttl_override_ns.or(default_ttl_ns)
219 }
220
221 pub fn expiry_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
226 let ttl = self.effective_ttl_ns(default_ttl_ns)?;
227 if self.row_count == 0 {
228 return None;
229 }
230 Some(self.max_ts_ns.saturating_add(ttl))
231 }
232
233 pub fn is_expired_at(&self, now_ns: u64, default_ttl_ns: Option<u64>) -> bool {
234 match self.expiry_ns(default_ttl_ns) {
235 Some(expiry) => now_ns >= expiry,
236 None => false,
237 }
238 }
239}
240
241#[derive(Clone, Default)]
244pub struct HypertableRegistry {
245 inner: Arc<Mutex<RegistryInner>>,
246}
247
248#[derive(Default)]
249struct RegistryInner {
250 specs: BTreeMap<String, HypertableSpec>,
251 chunks: BTreeMap<(String, u64), ChunkMeta>,
255 columnar_blocks: BTreeMap<(String, u64), Vec<u8>>,
262}
263
264impl std::fmt::Debug for HypertableRegistry {
265 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266 let guard = match self.inner.lock() {
267 Ok(g) => g,
268 Err(p) => p.into_inner(),
269 };
270 f.debug_struct("HypertableRegistry")
271 .field("hypertables", &guard.specs.len())
272 .field("chunks", &guard.chunks.len())
273 .finish()
274 }
275}
276
277impl HypertableRegistry {
278 pub fn new() -> Self {
279 Self::default()
280 }
281
282 pub fn register(&self, spec: HypertableSpec) {
287 let mut guard = match self.inner.lock() {
288 Ok(g) => g,
289 Err(p) => p.into_inner(),
290 };
291 guard.specs.insert(spec.name.clone(), spec);
292 }
293
294 pub fn get(&self, name: &str) -> Option<HypertableSpec> {
295 let guard = match self.inner.lock() {
296 Ok(g) => g,
297 Err(p) => p.into_inner(),
298 };
299 guard.specs.get(name).cloned()
300 }
301
302 pub fn list(&self) -> Vec<HypertableSpec> {
303 let guard = match self.inner.lock() {
304 Ok(g) => g,
305 Err(p) => p.into_inner(),
306 };
307 guard.specs.values().cloned().collect()
308 }
309
310 pub fn unregister(&self, name: &str) -> Option<HypertableSpec> {
315 let mut guard = match self.inner.lock() {
316 Ok(g) => g,
317 Err(p) => p.into_inner(),
318 };
319 guard.specs.remove(name)
320 }
321
322 pub fn route(&self, hypertable: &str, timestamp_ns: u64) -> Option<ChunkId> {
326 let mut guard = match self.inner.lock() {
327 Ok(g) => g,
328 Err(p) => p.into_inner(),
329 };
330 let spec = guard.specs.get(hypertable)?.clone();
331 let start = spec.chunk_start(timestamp_ns);
332 let end = spec.chunk_end_exclusive(timestamp_ns);
333 let id = ChunkId {
334 hypertable: spec.name.clone(),
335 start_ns: start,
336 };
337 let key = (spec.name.clone(), start);
338 let meta = guard
339 .chunks
340 .entry(key)
341 .or_insert_with(|| ChunkMeta::new(id.clone(), end));
342 meta.observe(timestamp_ns);
343 Some(id)
344 }
345
346 pub fn show_chunks(&self, hypertable: &str) -> Vec<ChunkMeta> {
348 let guard = match self.inner.lock() {
349 Ok(g) => g,
350 Err(p) => p.into_inner(),
351 };
352 guard
353 .chunks
354 .iter()
355 .filter(|((name, _), _)| name == hypertable)
356 .map(|(_, meta)| meta.clone())
357 .collect()
358 }
359
360 pub fn drop_chunks_before(&self, hypertable: &str, cutoff_ns: u64) -> Vec<ChunkMeta> {
365 let mut guard = match self.inner.lock() {
366 Ok(g) => g,
367 Err(p) => p.into_inner(),
368 };
369 let mut dropped = Vec::new();
370 let keys: Vec<(String, u64)> = guard
371 .chunks
372 .iter()
373 .filter(|((name, _), meta)| name == hypertable && meta.max_ts_ns <= cutoff_ns)
374 .map(|(k, _)| k.clone())
375 .collect();
376 for key in keys {
377 if let Some(meta) = guard.chunks.remove(&key) {
378 dropped.push(meta);
379 }
380 }
381 dropped
382 }
383
384 pub fn sweep_expired(&self, hypertable: &str, now_ns: u64) -> Vec<ChunkMeta> {
396 let mut guard = match self.inner.lock() {
397 Ok(g) => g,
398 Err(p) => p.into_inner(),
399 };
400 let Some(spec) = guard.specs.get(hypertable).cloned() else {
401 return Vec::new();
402 };
403 let expired_keys: Vec<(String, u64)> = guard
404 .chunks
405 .iter()
406 .filter(|((name, _), meta)| {
407 name == hypertable && meta.is_expired_at(now_ns, spec.default_ttl_ns)
408 })
409 .map(|(k, _)| k.clone())
410 .collect();
411 let mut dropped = Vec::with_capacity(expired_keys.len());
412 for key in expired_keys {
413 if let Some(meta) = guard.chunks.remove(&key) {
414 dropped.push(meta);
415 }
416 }
417 dropped
418 }
419
420 pub fn sweep_all_expired(&self, now_ns: u64) -> Vec<(String, ChunkMeta)> {
424 let names: Vec<String> = {
425 let guard = match self.inner.lock() {
426 Ok(g) => g,
427 Err(p) => p.into_inner(),
428 };
429 guard.specs.keys().cloned().collect()
430 };
431 let mut out = Vec::new();
432 for name in names {
433 for meta in self.sweep_expired(&name, now_ns) {
434 out.push((name.clone(), meta));
435 }
436 }
437 out
438 }
439
440 pub fn set_default_ttl_ns(&self, hypertable: &str, ttl_ns: Option<u64>) -> bool {
444 let mut guard = match self.inner.lock() {
445 Ok(g) => g,
446 Err(p) => p.into_inner(),
447 };
448 match guard.specs.get_mut(hypertable) {
449 Some(spec) => {
450 spec.default_ttl_ns = match ttl_ns {
451 Some(0) | None => None,
452 Some(v) => Some(v),
453 };
454 true
455 }
456 None => false,
457 }
458 }
459
460 pub fn set_chunk_ttl_ns(&self, id: &ChunkId, ttl_ns: Option<u64>) -> bool {
466 let mut guard = match self.inner.lock() {
467 Ok(g) => g,
468 Err(p) => p.into_inner(),
469 };
470 if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
471 meta.ttl_override_ns = ttl_ns;
472 true
473 } else {
474 false
475 }
476 }
477
478 pub fn chunks_expiring_within(
482 &self,
483 hypertable: &str,
484 now_ns: u64,
485 horizon_ns: u64,
486 ) -> Vec<ChunkMeta> {
487 let guard = match self.inner.lock() {
488 Ok(g) => g,
489 Err(p) => p.into_inner(),
490 };
491 let Some(spec) = guard.specs.get(hypertable).cloned() else {
492 return Vec::new();
493 };
494 let cutoff = now_ns.saturating_add(horizon_ns);
495 guard
496 .chunks
497 .iter()
498 .filter(|((name, _), _)| name == hypertable)
499 .filter_map(|(_, meta)| {
500 let expiry = meta.expiry_ns(spec.default_ttl_ns)?;
501 if expiry <= cutoff {
502 Some(meta.clone())
503 } else {
504 None
505 }
506 })
507 .collect()
508 }
509
510 pub fn seal_chunk(&self, id: &ChunkId) -> bool {
515 let mut guard = match self.inner.lock() {
516 Ok(g) => g,
517 Err(p) => p.into_inner(),
518 };
519 if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
520 meta.sealed = true;
521 true
522 } else {
523 false
524 }
525 }
526
527 pub fn seal_chunk_columnar(&self, id: &ChunkId, page: PageLocation, bytes: Vec<u8>) -> bool {
534 let mut guard = match self.inner.lock() {
535 Ok(g) => g,
536 Err(p) => p.into_inner(),
537 };
538 let key = (id.hypertable.clone(), id.start_ns);
539 if let Some(meta) = guard.chunks.get_mut(&key) {
540 meta.sealed = true;
541 meta.columnar_page = Some(page);
542 guard.columnar_blocks.insert(key, bytes);
543 true
544 } else {
545 false
546 }
547 }
548
549 pub fn columnar_block(&self, id: &ChunkId) -> Option<Vec<u8>> {
554 let guard = match self.inner.lock() {
555 Ok(g) => g,
556 Err(p) => p.into_inner(),
557 };
558 guard
559 .columnar_blocks
560 .get(&(id.hypertable.clone(), id.start_ns))
561 .cloned()
562 }
563
564 pub fn total_rows(&self, hypertable: &str) -> u64 {
567 let guard = match self.inner.lock() {
568 Ok(g) => g,
569 Err(p) => p.into_inner(),
570 };
571 guard
572 .chunks
573 .iter()
574 .filter(|((name, _), _)| name == hypertable)
575 .map(|(_, meta)| meta.row_count)
576 .sum()
577 }
578
579 pub fn names(&self) -> Vec<String> {
581 let guard = match self.inner.lock() {
582 Ok(g) => g,
583 Err(p) => p.into_inner(),
584 };
585 guard.specs.keys().cloned().collect()
586 }
587
588 pub fn is_empty(&self) -> bool {
592 let guard = match self.inner.lock() {
593 Ok(g) => g,
594 Err(p) => p.into_inner(),
595 };
596 guard.specs.is_empty() && guard.chunks.is_empty()
597 }
598
599 pub fn snapshot_chunks(&self) -> Vec<ChunkMeta> {
604 let guard = match self.inner.lock() {
605 Ok(g) => g,
606 Err(p) => p.into_inner(),
607 };
608 guard.chunks.values().cloned().collect()
609 }
610
611 pub fn restore_chunk(&self, meta: ChunkMeta) {
624 let mut guard = match self.inner.lock() {
625 Ok(g) => g,
626 Err(p) => p.into_inner(),
627 };
628 let key = (meta.id.hypertable.clone(), meta.id.start_ns);
629 guard.chunks.insert(key, meta);
630 }
631
632 pub fn drop_hypertable(&self, name: &str) -> usize {
635 let mut guard = match self.inner.lock() {
636 Ok(g) => g,
637 Err(p) => p.into_inner(),
638 };
639 guard.specs.remove(name);
640 let keys: Vec<(String, u64)> = guard
641 .chunks
642 .keys()
643 .filter(|(n, _)| n == name)
644 .cloned()
645 .collect();
646 for key in &keys {
647 guard.chunks.remove(key);
648 }
649 keys.len()
650 }
651
652 pub fn select_compaction_candidates(
660 &self,
661 hypertable: &str,
662 max_rows_per_group: u64,
663 min_chunks: usize,
664 ) -> Vec<Vec<ChunkId>> {
665 let guard = match self.inner.lock() {
666 Ok(g) => g,
667 Err(p) => p.into_inner(),
668 };
669 let candidates: Vec<&ChunkMeta> = guard
671 .chunks
672 .iter()
673 .filter(|((name, _), meta)| name == hypertable && meta.sealed && meta.is_columnar())
674 .map(|(_, meta)| meta)
675 .collect();
676
677 let mut groups: Vec<Vec<ChunkId>> = Vec::new();
680 let mut current: Vec<ChunkId> = Vec::new();
681 let mut current_rows: u64 = 0;
682
683 for meta in candidates {
684 if !current.is_empty() && current_rows + meta.row_count > max_rows_per_group {
685 if current.len() >= min_chunks {
686 groups.push(std::mem::take(&mut current));
687 } else {
688 current.clear();
689 }
690 current_rows = 0;
691 }
692 current.push(meta.id.clone());
693 current_rows += meta.row_count;
694 }
695 if current.len() >= min_chunks {
696 groups.push(current);
697 }
698
699 groups
700 }
701
702 pub fn compact_columnar_chunks(
727 &self,
728 hypertable: &str,
729 source_ids: &[ChunkId],
730 merged_chunk_id: u64,
731 schema_ref: u64,
732 granule_size: u32,
733 ) -> Result<ChunkId, CompactionError> {
734 if source_ids.len() < 2 {
735 return Err(CompactionError::InsufficientSources);
736 }
737
738 let mut guard = match self.inner.lock() {
739 Ok(g) => g,
740 Err(p) => p.into_inner(),
741 };
742
743 for id in source_ids {
746 let key = (id.hypertable.clone(), id.start_ns);
747 let meta = guard
748 .chunks
749 .get(&key)
750 .ok_or_else(|| CompactionError::ChunkNotFound(id.clone()))?;
751 if !meta.sealed {
752 return Err(CompactionError::ChunkNotSealed(id.clone()));
753 }
754 if !meta.is_columnar() {
755 return Err(CompactionError::ChunkNotColumnar(id.clone()));
756 }
757 if !guard.columnar_blocks.contains_key(&key) {
758 return Err(CompactionError::BlockNotResident(id.clone()));
759 }
760 }
761
762 let mut all_points = Vec::new();
765 for id in source_ids {
766 let key = (id.hypertable.clone(), id.start_ns);
767 let bytes = guard.columnar_blocks.get(&key).unwrap();
768 let pts = points_from_column_block(bytes).map_err(CompactionError::Decode)?;
769 all_points.extend(pts);
770 }
771
772 all_points.sort_by_key(|p| p.timestamp_ns);
775
776 let row_count = all_points.len() as u64;
779 let min_ts_ns = all_points.first().map(|p| p.timestamp_ns).unwrap_or(0);
780 let max_ts_ns = all_points.last().map(|p| p.timestamp_ns).unwrap_or(0);
781
782 let ts_bytes: Vec<u8> = all_points
783 .iter()
784 .flat_map(|p| p.timestamp_ns.to_le_bytes())
785 .collect();
786 let val_bytes: Vec<u8> = all_points
787 .iter()
788 .flat_map(|p| p.value.to_le_bytes())
789 .collect();
790
791 let merged_bytes = write_column_block(
792 merged_chunk_id,
793 schema_ref,
794 row_count,
795 min_ts_ns,
796 max_ts_ns,
797 granule_size,
798 &[
799 ColumnInput {
800 column_id: COLUMNAR_TS_COLUMN_ID,
801 logical_type: DataType::UnsignedInteger.to_byte(),
802 semantics: ColumnSemantics::Timestamp,
803 data: &ts_bytes,
804 },
805 ColumnInput {
806 column_id: COLUMNAR_VALUE_COLUMN_ID,
807 logical_type: DataType::Float.to_byte(),
808 semantics: ColumnSemantics::Gauge,
809 data: &val_bytes,
810 },
811 ],
812 )
813 .map_err(CompactionError::Encode)?;
814
815 let merged_start_ns = source_ids.iter().map(|id| id.start_ns).min().unwrap(); let merged_end_ns_exclusive = source_ids
820 .iter()
821 .map(|id| {
822 guard
823 .chunks
824 .get(&(id.hypertable.clone(), id.start_ns))
825 .map(|m| m.end_ns_exclusive)
826 .unwrap_or(merged_start_ns)
827 })
828 .max()
829 .unwrap();
830
831 let merged_id = ChunkId {
832 hypertable: hypertable.to_string(),
833 start_ns: merged_start_ns,
834 };
835
836 let merged_page = PageLocation::new(0, 0, merged_bytes.len() as u32);
842
843 let mut merged_meta = ChunkMeta::new(merged_id.clone(), merged_end_ns_exclusive);
844 merged_meta.row_count = row_count;
845 merged_meta.min_ts_ns = min_ts_ns;
846 merged_meta.max_ts_ns = max_ts_ns;
847 merged_meta.sealed = true;
848 merged_meta.columnar_page = Some(merged_page);
849
850 let merged_key = (hypertable.to_string(), merged_start_ns);
853 guard.chunks.insert(merged_key.clone(), merged_meta);
854 guard.columnar_blocks.insert(merged_key, merged_bytes);
855
856 for id in source_ids {
857 if id.start_ns == merged_start_ns && id.hypertable == hypertable {
860 continue;
861 }
862 let key = (id.hypertable.clone(), id.start_ns);
863 guard.chunks.remove(&key);
864 guard.columnar_blocks.remove(&key);
865 }
866
867 Ok(merged_id)
868 }
869}
870
871#[derive(Debug, Clone, PartialEq)]
873pub enum CompactionError {
874 InsufficientSources,
876 ChunkNotFound(ChunkId),
878 ChunkNotSealed(ChunkId),
880 ChunkNotColumnar(ChunkId),
882 BlockNotResident(ChunkId),
884 Decode(ColumnBlockError),
886 Encode(ColumnBlockError),
888}
889
890impl std::fmt::Display for CompactionError {
891 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
892 match self {
893 Self::InsufficientSources => {
894 write!(f, "compaction requires at least 2 source chunks")
895 }
896 Self::ChunkNotFound(id) => write!(
897 f,
898 "source chunk {}@{} not found",
899 id.hypertable, id.start_ns
900 ),
901 Self::ChunkNotSealed(id) => write!(
902 f,
903 "source chunk {}@{} is not sealed",
904 id.hypertable, id.start_ns
905 ),
906 Self::ChunkNotColumnar(id) => write!(
907 f,
908 "source chunk {}@{} is not columnar",
909 id.hypertable, id.start_ns
910 ),
911 Self::BlockNotResident(id) => write!(
912 f,
913 "source chunk {}@{} block bytes are not RAM-resident",
914 id.hypertable, id.start_ns
915 ),
916 Self::Decode(e) => write!(f, "RDCC decode error: {e}"),
917 Self::Encode(e) => write!(f, "RDCC encode error: {e}"),
918 }
919 }
920}
921
922impl std::error::Error for CompactionError {}
923
924#[cfg(test)]
925mod tests {
926 use super::*;
927
928 const DAY_NS: u64 = 86_400_000_000_000;
929 const HOUR_NS: u64 = 3_600_000_000_000;
930
931 #[test]
932 fn chunk_start_aligns_to_interval_floor() {
933 let spec = HypertableSpec::new("m", "ts", DAY_NS);
934 assert_eq!(spec.chunk_start(0), 0);
935 assert_eq!(spec.chunk_start(DAY_NS - 1), 0);
936 assert_eq!(spec.chunk_start(DAY_NS), DAY_NS);
937 assert_eq!(spec.chunk_start(3 * DAY_NS + 123), 3 * DAY_NS);
938 }
939
940 #[test]
941 fn interval_string_accepts_duration_units() {
942 let s = HypertableSpec::from_interval_string("m", "ts", "1d").unwrap();
943 assert_eq!(s.chunk_interval_ns, DAY_NS);
944 let s = HypertableSpec::from_interval_string("m", "ts", "1h").unwrap();
945 assert_eq!(s.chunk_interval_ns, HOUR_NS);
946 assert!(HypertableSpec::from_interval_string("m", "ts", "raw").is_none());
947 assert!(HypertableSpec::from_interval_string("m", "ts", "garbage").is_none());
948 }
949
950 #[test]
951 fn route_allocates_chunk_on_first_write() {
952 let reg = HypertableRegistry::new();
953 reg.register(HypertableSpec::new("metrics", "ts", DAY_NS));
954 let id = reg.route("metrics", DAY_NS + 100).unwrap();
955 assert_eq!(id.hypertable, "metrics");
956 assert_eq!(id.start_ns, DAY_NS);
957 let chunks = reg.show_chunks("metrics");
958 assert_eq!(chunks.len(), 1);
959 assert_eq!(chunks[0].row_count, 1);
960 assert_eq!(chunks[0].min_ts_ns, DAY_NS + 100);
961 assert_eq!(chunks[0].max_ts_ns, DAY_NS + 100);
962 assert_eq!(chunks[0].end_ns_exclusive, 2 * DAY_NS);
963 }
964
965 #[test]
966 fn route_groups_writes_within_same_chunk() {
967 let reg = HypertableRegistry::new();
968 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
969 for offset in [10u64, 100, 1_000, DAY_NS - 1] {
970 let id = reg.route("m", offset).unwrap();
971 assert_eq!(id.start_ns, 0);
972 }
973 let chunks = reg.show_chunks("m");
974 assert_eq!(chunks.len(), 1);
975 assert_eq!(chunks[0].row_count, 4);
976 }
977
978 #[test]
979 fn route_splits_writes_across_adjacent_chunks() {
980 let reg = HypertableRegistry::new();
981 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
982 reg.route("m", DAY_NS - 1).unwrap();
983 reg.route("m", DAY_NS).unwrap();
984 reg.route("m", 2 * DAY_NS).unwrap();
985 let chunks = reg.show_chunks("m");
986 assert_eq!(chunks.len(), 3);
987 assert!(chunks[0].id.start_ns <= chunks[1].id.start_ns);
988 assert!(chunks[1].id.start_ns <= chunks[2].id.start_ns);
989 }
990
991 #[test]
992 fn route_returns_none_for_unknown_hypertable() {
993 let reg = HypertableRegistry::new();
994 assert!(reg.route("nope", 0).is_none());
995 }
996
997 #[test]
998 fn drop_chunks_before_removes_matching_chunks() {
999 let reg = HypertableRegistry::new();
1000 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1001 reg.route("m", 0).unwrap(); reg.route("m", DAY_NS).unwrap(); reg.route("m", 2 * DAY_NS + 5).unwrap(); let dropped = reg.drop_chunks_before("m", DAY_NS);
1006 assert_eq!(dropped.len(), 2);
1009 let remaining = reg.show_chunks("m");
1010 assert_eq!(remaining.len(), 1);
1011 assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
1012 }
1013
1014 #[test]
1015 fn show_chunks_is_ordered_by_start() {
1016 let reg = HypertableRegistry::new();
1017 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1018 for ts in [5 * DAY_NS, 2 * DAY_NS, 7 * DAY_NS, 1 * DAY_NS] {
1019 reg.route("m", ts).unwrap();
1020 }
1021 let starts: Vec<u64> = reg.show_chunks("m").iter().map(|c| c.id.start_ns).collect();
1022 assert_eq!(starts, vec![DAY_NS, 2 * DAY_NS, 5 * DAY_NS, 7 * DAY_NS]);
1023 }
1024
1025 #[test]
1026 fn seal_chunk_flips_flag() {
1027 let reg = HypertableRegistry::new();
1028 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1029 let id = reg.route("m", 0).unwrap();
1030 assert!(reg.seal_chunk(&id));
1031 assert!(reg.show_chunks("m")[0].sealed);
1032 }
1033
1034 #[test]
1035 fn drop_hypertable_removes_everything() {
1036 let reg = HypertableRegistry::new();
1037 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1038 reg.route("m", 0).unwrap();
1039 reg.route("m", DAY_NS).unwrap();
1040 assert_eq!(reg.drop_hypertable("m"), 2);
1041 assert!(reg.get("m").is_none());
1042 assert!(reg.show_chunks("m").is_empty());
1043 }
1044
1045 #[test]
1046 fn total_rows_sums_every_chunk() {
1047 let reg = HypertableRegistry::new();
1048 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1049 for ts in 0..1000 {
1050 reg.route("m", ts).unwrap();
1051 }
1052 for ts in DAY_NS..DAY_NS + 500 {
1053 reg.route("m", ts).unwrap();
1054 }
1055 assert_eq!(reg.total_rows("m"), 1500);
1056 }
1057
1058 #[test]
1059 fn names_lists_registered_hypertables() {
1060 let reg = HypertableRegistry::new();
1061 reg.register(HypertableSpec::new("a", "ts", DAY_NS));
1062 reg.register(HypertableSpec::new("b", "ts", HOUR_NS));
1063 let mut names = reg.names();
1064 names.sort();
1065 assert_eq!(names, vec!["a", "b"]);
1066 }
1067
1068 #[test]
1073 fn with_ttl_parses_duration_and_sets_default() {
1074 let s = HypertableSpec::new("m", "ts", DAY_NS)
1075 .with_ttl("7d")
1076 .unwrap();
1077 assert_eq!(s.default_ttl_ns, Some(7 * DAY_NS));
1078 assert!(HypertableSpec::new("m", "ts", DAY_NS)
1079 .with_ttl("raw")
1080 .is_none());
1081 assert!(HypertableSpec::new("m", "ts", DAY_NS)
1082 .with_ttl("garbage")
1083 .is_none());
1084 }
1085
1086 #[test]
1087 fn chunk_with_no_rows_never_expires() {
1088 let meta = ChunkMeta::new(
1089 ChunkId {
1090 hypertable: "m".into(),
1091 start_ns: 0,
1092 },
1093 DAY_NS,
1094 );
1095 assert!(!meta.is_expired_at(u64::MAX, Some(1)));
1096 }
1097
1098 #[test]
1099 fn chunk_expires_when_now_crosses_max_ts_plus_ttl() {
1100 let mut meta = ChunkMeta::new(
1101 ChunkId {
1102 hypertable: "m".into(),
1103 start_ns: 0,
1104 },
1105 DAY_NS,
1106 );
1107 meta.observe(500);
1108 assert!(!meta.is_expired_at(1000, Some(1000)));
1110 assert!(!meta.is_expired_at(1499, Some(1000)));
1111 assert!(meta.is_expired_at(1500, Some(1000)));
1112 }
1113
1114 #[test]
1115 fn per_chunk_override_wins_over_hypertable_default() {
1116 let mut meta = ChunkMeta::new(
1117 ChunkId {
1118 hypertable: "m".into(),
1119 start_ns: 0,
1120 },
1121 DAY_NS,
1122 );
1123 meta.observe(500);
1124 meta.ttl_override_ns = Some(100);
1127 assert!(meta.is_expired_at(600, Some(1000)));
1128 assert!(!meta.is_expired_at(599, Some(1000)));
1129 }
1130
1131 #[test]
1132 fn sweep_expired_drops_chunks_past_ttl_and_returns_them() {
1133 let reg = HypertableRegistry::new();
1134 reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(2 * DAY_NS));
1135 for t in [0, DAY_NS, 2 * DAY_NS] {
1137 reg.route("m", t).unwrap();
1138 }
1139 let dropped = reg.sweep_expired("m", 3 * DAY_NS + 1);
1142 let mut starts: Vec<u64> = dropped.iter().map(|m| m.id.start_ns).collect();
1143 starts.sort();
1144 assert_eq!(starts, vec![0, DAY_NS]);
1145 let remaining = reg.show_chunks("m");
1146 assert_eq!(remaining.len(), 1);
1147 assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
1148 }
1149
1150 #[test]
1151 fn sweep_without_ttl_keeps_every_chunk() {
1152 let reg = HypertableRegistry::new();
1153 reg.register(HypertableSpec::new("m", "ts", DAY_NS)); for t in [0, DAY_NS, 2 * DAY_NS] {
1155 reg.route("m", t).unwrap();
1156 }
1157 let dropped = reg.sweep_expired("m", 10_000 * DAY_NS);
1158 assert!(dropped.is_empty());
1159 assert_eq!(reg.show_chunks("m").len(), 3);
1160 }
1161
1162 #[test]
1163 fn sweep_all_expired_iterates_every_hypertable() {
1164 let reg = HypertableRegistry::new();
1165 reg.register(HypertableSpec::new("fast", "ts", HOUR_NS).with_ttl_ns(HOUR_NS));
1166 reg.register(HypertableSpec::new("slow", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
1167 reg.route("fast", 0).unwrap();
1170 reg.route("slow", 0).unwrap();
1171 let dropped = reg.sweep_all_expired(2 * HOUR_NS);
1172 assert_eq!(dropped.len(), 1);
1173 assert_eq!(dropped[0].0, "fast");
1174 assert_eq!(reg.show_chunks("slow").len(), 1);
1175 }
1176
1177 #[test]
1178 fn set_chunk_ttl_ns_lets_caller_pin_or_shorten() {
1179 let reg = HypertableRegistry::new();
1180 reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1181 let id = reg.route("m", 0).unwrap();
1182 assert!(reg.set_chunk_ttl_ns(&id, Some(100 * DAY_NS)));
1184 let dropped = reg.sweep_expired("m", 10 * DAY_NS);
1185 assert!(dropped.is_empty());
1186 reg.set_chunk_ttl_ns(&id, Some(HOUR_NS));
1188 let dropped = reg.sweep_expired("m", 10 * HOUR_NS);
1189 assert_eq!(dropped.len(), 1);
1190 }
1191
1192 #[test]
1193 fn snapshot_then_restore_reproduces_registry_identically() {
1194 let reg = HypertableRegistry::new();
1198 reg.register(HypertableSpec::new("metrics", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
1199 reg.register(HypertableSpec::new("events", "ts", HOUR_NS));
1200 for t in [0, DAY_NS + 5, DAY_NS + 9, 2 * DAY_NS] {
1201 reg.route("metrics", t).unwrap();
1202 }
1203 let id = reg.route("events", 0).unwrap();
1204 reg.seal_chunk(&id);
1205 reg.set_chunk_ttl_ns(&id, Some(3 * HOUR_NS));
1206
1207 let specs = reg.list();
1208 let chunks = reg.snapshot_chunks();
1209 assert!(!reg.is_empty());
1210
1211 let restored = HypertableRegistry::new();
1213 assert!(restored.is_empty());
1214 for spec in specs {
1215 restored.register(spec);
1216 }
1217 for chunk in chunks {
1218 restored.restore_chunk(chunk);
1219 }
1220
1221 let before = reg.get("metrics").unwrap();
1223 let after = restored.get("metrics").unwrap();
1224 assert_eq!(after.chunk_interval_ns, before.chunk_interval_ns);
1225 assert_eq!(after.time_column, before.time_column);
1226 assert_eq!(after.default_ttl_ns, before.default_ttl_ns);
1227
1228 let m_before = reg.show_chunks("metrics");
1230 let m_after = restored.show_chunks("metrics");
1231 assert_eq!(m_after.len(), m_before.len());
1232 for (a, b) in m_after.iter().zip(m_before.iter()) {
1233 assert_eq!(a.id.start_ns, b.id.start_ns);
1234 assert_eq!(a.end_ns_exclusive, b.end_ns_exclusive);
1235 assert_eq!(a.row_count, b.row_count);
1236 assert_eq!(a.min_ts_ns, b.min_ts_ns);
1237 assert_eq!(a.max_ts_ns, b.max_ts_ns);
1238 }
1239 let e_after = restored.show_chunks("events");
1240 assert_eq!(e_after.len(), 1);
1241 assert!(e_after[0].sealed, "sealed flag must survive restore");
1242 assert_eq!(e_after[0].ttl_override_ns, Some(3 * HOUR_NS));
1243
1244 let routed = restored.route("metrics", DAY_NS + 1).unwrap();
1247 assert_eq!(routed.start_ns, DAY_NS);
1248 assert_eq!(
1249 restored.show_chunks("metrics").len(),
1250 m_before.len(),
1251 "write after restore must not allocate a new chunk"
1252 );
1253 }
1254
1255 fn columnar_chunk(hypertable: &str, start_ns: u64, max_ts_ns: u64) -> ChunkMeta {
1271 let mut meta = ChunkMeta::new(
1272 ChunkId {
1273 hypertable: hypertable.into(),
1274 start_ns,
1275 },
1276 start_ns + DAY_NS,
1277 );
1278 meta.row_count = 1;
1279 meta.min_ts_ns = max_ts_ns;
1280 meta.max_ts_ns = max_ts_ns;
1281 meta.sealed = true;
1282 meta.columnar_page = Some(PageLocation::new(7, 0, 1234));
1283 meta
1284 }
1285
1286 #[test]
1287 fn columnar_chunk_evicts_via_sweep_expired_carrying_its_page() {
1288 let reg = HypertableRegistry::new();
1289 reg.register(HypertableSpec::new("metrics", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1290 reg.restore_chunk(columnar_chunk("metrics", 0, 0));
1292 assert!(reg.show_chunks("metrics")[0].columnar_page.is_some());
1293
1294 let dropped = reg.sweep_expired("metrics", 3 * DAY_NS);
1296 assert_eq!(dropped.len(), 1, "columnar chunk must evict via TTL sweep");
1297 assert_eq!(
1298 dropped[0].columnar_page,
1299 Some(PageLocation::new(7, 0, 1234)),
1300 "dropped meta must carry columnar_page so physical release frees the RDCC block"
1301 );
1302 assert!(reg.show_chunks("metrics").is_empty());
1303 }
1304
1305 #[test]
1306 fn columnar_chunk_evicts_via_drop_chunks_before() {
1307 let reg = HypertableRegistry::new();
1308 reg.register(HypertableSpec::new("metrics", "ts", DAY_NS));
1309 reg.restore_chunk(columnar_chunk("metrics", 0, 0));
1310
1311 let dropped = reg.drop_chunks_before("metrics", DAY_NS);
1312 assert_eq!(dropped.len(), 1);
1313 assert!(
1314 dropped[0].columnar_page.is_some(),
1315 "drop_chunks_before is metadata-only and carries columnar_page through"
1316 );
1317 assert!(reg.show_chunks("metrics").is_empty());
1318 }
1319
1320 #[test]
1321 fn columnar_and_row_chunks_share_one_eviction_path() {
1322 let mk = |columnar: bool| {
1327 let reg = HypertableRegistry::new();
1328 reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1329 if columnar {
1330 reg.restore_chunk(columnar_chunk("m", 0, 0));
1331 } else {
1332 reg.route("m", 0).unwrap(); }
1334 reg.sweep_expired("m", 3 * DAY_NS).len()
1335 };
1336 assert_eq!(mk(false), 1, "row chunk evicts");
1337 assert_eq!(mk(true), 1, "columnar chunk evicts the same way");
1338 }
1339
1340 #[test]
1341 fn columnar_chunk_prunes_by_time_bounds_like_row_chunk() {
1342 let reg = HypertableRegistry::new();
1349 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1350 reg.restore_chunk(columnar_chunk("m", 0, 0));
1351 reg.restore_chunk(columnar_chunk("m", 2 * DAY_NS, 2 * DAY_NS));
1352 let chunks = reg.show_chunks("m");
1353 let lo = 2 * DAY_NS;
1356 let hi = 3 * DAY_NS;
1357 let overlapping: Vec<u64> = chunks
1358 .iter()
1359 .filter(|c| c.id.start_ns < hi && c.end_ns_exclusive > lo)
1360 .map(|c| c.id.start_ns)
1361 .collect();
1362 assert_eq!(
1363 overlapping,
1364 vec![2 * DAY_NS],
1365 "only in-window columnar chunk kept"
1366 );
1367 assert!(chunks.iter().all(|c| c.columnar_page.is_some()));
1368 }
1369
1370 #[test]
1376 fn chunk_format_dispatches_on_columnar_page_discriminant() {
1377 let reg = HypertableRegistry::new();
1378 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1379 reg.route("m", 0).unwrap();
1381 reg.restore_chunk(columnar_chunk("m", DAY_NS, DAY_NS));
1382
1383 let chunks = reg.show_chunks("m");
1384 let row = chunks.iter().find(|c| c.id.start_ns == 0).unwrap();
1385 let col = chunks.iter().find(|c| c.id.start_ns == DAY_NS).unwrap();
1386
1387 assert_eq!(row.format(), ChunkFormat::Row);
1388 assert!(!row.is_columnar());
1389 assert_eq!(col.format(), ChunkFormat::ColumnarV1);
1390 assert!(col.is_columnar());
1391 }
1392
1393 #[test]
1394 fn chunks_expiring_within_previews_without_dropping() {
1395 let reg = HypertableRegistry::new();
1396 reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1397 for t in [0, DAY_NS, 2 * DAY_NS] {
1399 reg.route("m", t).unwrap();
1400 }
1401 let preview = reg.chunks_expiring_within("m", 0, DAY_NS + DAY_NS / 2);
1404 assert_eq!(preview.len(), 1);
1405 assert_eq!(preview[0].id.start_ns, 0);
1406 let preview2 = reg.chunks_expiring_within("m", 0, 2 * DAY_NS);
1408 assert_eq!(preview2.len(), 2);
1409 assert_eq!(reg.show_chunks("m").len(), 3);
1411 }
1412
1413 fn seal_columnar_chunk_into(
1419 reg: &HypertableRegistry,
1420 hypertable: &str,
1421 start_ns: u64,
1422 end_ns_exclusive: u64,
1423 points: &[(u64, f64)],
1424 schema_ref: u64,
1425 ) -> ChunkId {
1426 use super::super::chunk::TimeSeriesChunk;
1427 use std::collections::HashMap;
1428
1429 let id = ChunkId {
1430 hypertable: hypertable.to_string(),
1431 start_ns,
1432 };
1433 let mut chunk = TimeSeriesChunk::new("m", HashMap::new());
1434 for &(ts, v) in points {
1435 chunk.append(ts, v);
1436 }
1437 let block = chunk
1438 .seal_columnar(start_ns, schema_ref)
1439 .expect("seal_columnar");
1440 let page = PageLocation::new(0, 0, block.len() as u32);
1441
1442 let mut meta = ChunkMeta::new(id.clone(), end_ns_exclusive);
1443 meta.sealed = true;
1444 meta.columnar_page = Some(page);
1445 meta.row_count = points.len() as u64;
1446 if let Some(&(min_ts, _)) = points.iter().min_by_key(|(ts, _)| ts) {
1447 meta.min_ts_ns = min_ts;
1448 }
1449 if let Some(&(max_ts, _)) = points.iter().max_by_key(|(ts, _)| ts) {
1450 meta.max_ts_ns = max_ts;
1451 }
1452
1453 {
1454 let mut guard = reg.inner.lock().unwrap();
1455 guard
1456 .chunks
1457 .insert((hypertable.to_string(), start_ns), meta);
1458 guard
1459 .columnar_blocks
1460 .insert((hypertable.to_string(), start_ns), block);
1461 }
1462 id
1463 }
1464
1465 #[test]
1468 fn compact_merges_chunks_to_identical_rows() {
1469 let reg = HypertableRegistry::new();
1470 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1471
1472 let pts_a: Vec<(u64, f64)> = (0..10).map(|i| (i * 1_000, i as f64)).collect();
1474 let pts_b: Vec<(u64, f64)> = (10..20).map(|i| (i * 1_000, i as f64)).collect();
1475 let pts_c: Vec<(u64, f64)> = (20..30).map(|i| (i * 1_000, i as f64)).collect();
1476
1477 let id_a = seal_columnar_chunk_into(®, "m", 0, DAY_NS, &pts_a, 1);
1478 let id_b = seal_columnar_chunk_into(®, "m", DAY_NS, 2 * DAY_NS, &pts_b, 1);
1479 let id_c = seal_columnar_chunk_into(®, "m", 2 * DAY_NS, 3 * DAY_NS, &pts_c, 1);
1480
1481 let merged_id = reg
1482 .compact_columnar_chunks("m", &[id_a, id_b, id_c], 0, 1, DEFAULT_GRANULE_SIZE)
1483 .expect("compaction failed");
1484
1485 let chunks = reg.show_chunks("m");
1487 assert_eq!(chunks.len(), 1, "three source chunks must collapse to one");
1488 let merged_meta = &chunks[0];
1489 assert_eq!(merged_meta.id.start_ns, merged_id.start_ns);
1490 assert_eq!(merged_meta.row_count, 30);
1491 assert!(merged_meta.sealed);
1492 assert!(merged_meta.is_columnar());
1493
1494 let block = reg
1496 .columnar_block(&merged_id)
1497 .expect("merged block must be RAM-resident");
1498 let got = points_from_column_block(&block).expect("decode merged block");
1499
1500 let mut expected: Vec<(u64, f64)> =
1501 pts_a.iter().chain(&pts_b).chain(&pts_c).copied().collect();
1502 expected.sort_by_key(|(ts, _)| *ts);
1503
1504 assert_eq!(got.len(), expected.len());
1505 for (point, (exp_ts, exp_val)) in got.iter().zip(&expected) {
1506 assert_eq!(point.timestamp_ns, *exp_ts);
1507 assert!(
1508 (point.value - exp_val).abs() < 1e-9,
1509 "value mismatch at ts {}: got {}, expected {}",
1510 exp_ts,
1511 point.value,
1512 exp_val
1513 );
1514 }
1515 }
1516
1517 #[test]
1520 fn compact_reduces_chunk_count_and_recompresses() {
1521 let reg = HypertableRegistry::new();
1522 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1523
1524 let mut ids = Vec::new();
1526 for i in 0..5u64 {
1527 let pts: Vec<(u64, f64)> = (0..50u64)
1528 .map(|j| (i * DAY_NS + j * 1_000, 1.0 + j as f64 * 0.01))
1529 .collect();
1530 let id = seal_columnar_chunk_into(®, "m", i * DAY_NS, (i + 1) * DAY_NS, &pts, 1);
1531 ids.push(id);
1532 }
1533
1534 assert_eq!(reg.show_chunks("m").len(), 5);
1535
1536 let merged_id = reg
1537 .compact_columnar_chunks("m", &ids, 0, 1, DEFAULT_GRANULE_SIZE)
1538 .expect("compaction failed");
1539
1540 let chunks = reg.show_chunks("m");
1542 assert_eq!(chunks.len(), 1, "five chunks must compact to one");
1543 assert_eq!(chunks[0].row_count, 250);
1544
1545 let block = reg.columnar_block(&merged_id).unwrap();
1547 let pts = points_from_column_block(&block).unwrap();
1548 assert_eq!(pts.len(), 250, "all 250 points must survive compaction");
1549
1550 let raw_uncompressed = 250 * 16;
1553 assert!(
1554 block.len() < raw_uncompressed,
1555 "merged block ({} bytes) should be compressed (raw = {})",
1556 block.len(),
1557 raw_uncompressed
1558 );
1559 }
1560
1561 #[test]
1570 fn torn_merge_leaves_inputs_intact() {
1571 let reg = HypertableRegistry::new();
1572 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1573
1574 let pts_a: Vec<(u64, f64)> = (0..20).map(|i| (i * 1_000, i as f64)).collect();
1575 let pts_b: Vec<(u64, f64)> = (100..120).map(|i| (i * 1_000, i as f64)).collect();
1576
1577 let id_a = seal_columnar_chunk_into(®, "m", 0, DAY_NS, &pts_a, 1);
1578 let id_b = seal_columnar_chunk_into(®, "m", DAY_NS, 2 * DAY_NS, &pts_b, 1);
1579
1580 {
1584 let guard = reg.inner.lock().unwrap();
1585 let block_a = guard
1586 .columnar_blocks
1587 .get(&("m".to_string(), 0))
1588 .expect("block_a must be present before any merge");
1589 let block_b = guard
1590 .columnar_blocks
1591 .get(&("m".to_string(), DAY_NS))
1592 .expect("block_b must be present before any merge");
1593 let pts_decoded_a = points_from_column_block(block_a).unwrap();
1594 let pts_decoded_b = points_from_column_block(block_b).unwrap();
1595 assert_eq!(pts_decoded_a.len(), 20, "source A readable before merge");
1597 assert_eq!(pts_decoded_b.len(), 20, "source B readable before merge");
1598
1599 assert!(
1601 guard.chunks.contains_key(&("m".to_string(), 0)),
1602 "source A must remain intact after torn merge"
1603 );
1604 assert!(
1605 guard.chunks.contains_key(&("m".to_string(), DAY_NS)),
1606 "source B must remain intact after torn merge"
1607 );
1608 }
1609
1610 let merged_id = reg
1612 .compact_columnar_chunks("m", &[id_a, id_b], 0, 1, DEFAULT_GRANULE_SIZE)
1613 .expect("compaction after torn-merge simulation must succeed");
1614
1615 let chunks = reg.show_chunks("m");
1617 assert_eq!(chunks.len(), 1, "only the merged chunk must remain");
1618 assert_eq!(chunks[0].id.start_ns, merged_id.start_ns);
1619
1620 let block = reg.columnar_block(&merged_id).unwrap();
1622 let pts = points_from_column_block(&block).unwrap();
1623 assert_eq!(pts.len(), 40, "all 40 points (20+20) must survive");
1624 }
1625
1626 #[test]
1629 fn compact_rejects_single_source() {
1630 let reg = HypertableRegistry::new();
1631 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1632 let id = seal_columnar_chunk_into(®, "m", 0, DAY_NS, &[(1_000, 1.0)], 1);
1633 let err = reg
1634 .compact_columnar_chunks("m", &[id], 0, 1, DEFAULT_GRANULE_SIZE)
1635 .unwrap_err();
1636 assert_eq!(err, CompactionError::InsufficientSources);
1637 }
1638
1639 #[test]
1641 fn compact_rejects_unsealed_chunk() {
1642 let reg = HypertableRegistry::new();
1643 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1644
1645 let open_id = ChunkId {
1647 hypertable: "m".to_string(),
1648 start_ns: 0,
1649 };
1650 reg.restore_chunk(ChunkMeta::new(open_id.clone(), DAY_NS));
1651
1652 let sealed_id =
1653 seal_columnar_chunk_into(®, "m", DAY_NS, 2 * DAY_NS, &[(DAY_NS + 1, 1.0)], 1);
1654
1655 let err = reg
1656 .compact_columnar_chunks("m", &[open_id, sealed_id], 0, 1, DEFAULT_GRANULE_SIZE)
1657 .unwrap_err();
1658 assert!(matches!(err, CompactionError::ChunkNotSealed(_)));
1659 }
1660
1661 #[test]
1664 fn select_candidates_respects_budget_and_threshold() {
1665 let reg = HypertableRegistry::new();
1666 reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1667
1668 for i in 0..5u64 {
1670 let pts: Vec<(u64, f64)> = (0..100u64).map(|j| (i * DAY_NS + j, j as f64)).collect();
1671 seal_columnar_chunk_into(®, "m", i * DAY_NS, (i + 1) * DAY_NS, &pts, 1);
1672 }
1673
1674 let groups = reg.select_compaction_candidates("m", 250, 2);
1676 assert!(
1677 !groups.is_empty(),
1678 "must find at least one compaction group"
1679 );
1680 for group in &groups {
1681 assert!(group.len() >= 2, "each group must have at least 2 chunks");
1682 let total: u64 = group
1684 .iter()
1685 .map(|id| {
1686 reg.show_chunks("m")
1687 .iter()
1688 .find(|c| c.id.start_ns == id.start_ns)
1689 .map(|c| c.row_count)
1690 .unwrap_or(0)
1691 })
1692 .sum();
1693 assert!(
1694 total <= 250,
1695 "group total rows {total} must not exceed budget 250"
1696 );
1697 }
1698
1699 let groups_high = reg.select_compaction_candidates("m", 250, 10);
1701 assert!(
1702 groups_high.is_empty(),
1703 "threshold of 10 must yield no groups when max is 5 chunks"
1704 );
1705 }
1706}