Skip to main content

reddb_server/storage/timeseries/
hypertable.rs

1//! Hypertables — time-range-partitioned tables à la TimescaleDB.
2//!
3//! A hypertable is a logical collection that auto-partitions writes
4//! into child chunks, each covering a fixed time interval
5//! (`chunk_interval_ns`). Queries that filter by the time column
6//! see the partition pruner eliminate child chunks whose bounds
7//! fall outside the predicate; drops happen per-chunk so operators
8//! can retain the last N days without a full-table scan.
9//!
10//! This module defines the **metadata + router**. The physical
11//! chunk is the same [`super::chunk::TimeSeriesChunk`] the standalone
12//! time-series path already uses — the hypertable layer just tracks
13//! which chunk a row goes into.
14//!
15//! SQL surface (parsed elsewhere in the sprint):
16//!
17//! ```sql
18//! CREATE HYPERTABLE metrics (
19//!   ts    BIGINT,
20//!   host  TEXT,
21//!   value DOUBLE
22//! ) CHUNK INTERVAL '1 day';
23//!
24//! SELECT drop_chunks('metrics', INTERVAL '90 days');
25//! SELECT show_chunks('metrics');
26//! ```
27
28use std::collections::BTreeMap;
29use std::sync::{Arc, Mutex};
30
31use super::chunk::{
32    points_from_column_block, COLUMNAR_TS_COLUMN_ID, COLUMNAR_VALUE_COLUMN_ID, DEFAULT_GRANULE_SIZE,
33};
34use super::retention::parse_duration_ns;
35use crate::storage::engine::PageLocation;
36use crate::storage::schema::types::DataType;
37use crate::storage::unified::column_block::{write_column_block, ColumnBlockError, ColumnInput};
38use crate::storage::unified::segment_codec::ColumnSemantics;
39
40/// Spec declared by `CREATE HYPERTABLE`.
41#[derive(Debug, Clone)]
42pub struct HypertableSpec {
43    pub name: String,
44    /// Column name that carries the time axis (must be unix-ns BIGINT
45    /// or parseable to one).
46    pub time_column: String,
47    /// Fixed width of a single chunk, in nanoseconds.
48    pub chunk_interval_ns: u64,
49    /// Default TTL applied to every new chunk when the DDL didn't
50    /// request an explicit override. `None` means "no TTL — chunks
51    /// live until explicit `drop_chunks` / retention policy fires".
52    ///
53    /// The effective expiry of a chunk is `max_ts_ns + ttl_ns`, so
54    /// a chunk is safely droppable once `now_ns ≥ expiry`. That
55    /// matches the contract callers already learnt from the
56    /// retention daemon — partition TTL is the **declarative** way
57    /// to say the same thing at CREATE time without a separate
58    /// `add_retention_policy` call.
59    pub default_ttl_ns: Option<u64>,
60}
61
62impl HypertableSpec {
63    pub fn new(
64        name: impl Into<String>,
65        time_column: impl Into<String>,
66        chunk_interval_ns: u64,
67    ) -> Self {
68        Self {
69            name: name.into(),
70            time_column: time_column.into(),
71            chunk_interval_ns: chunk_interval_ns.max(1),
72            default_ttl_ns: None,
73        }
74    }
75
76    /// Convenience: construct from a Timescale-style duration string
77    /// (`"1d"`, `"1h"`, `"30m"`…).
78    pub fn from_interval_string(
79        name: impl Into<String>,
80        time_column: impl Into<String>,
81        interval: &str,
82    ) -> Option<Self> {
83        let ns = parse_duration_ns(interval)?;
84        if ns == 0 {
85            return None;
86        }
87        Some(Self::new(name, time_column, ns))
88    }
89
90    /// Builder-style: attach a default TTL. Uses the same duration
91    /// grammar as `chunk_interval` (`"90d"`, `"30s"`, …).
92    pub fn with_ttl(mut self, ttl: &str) -> Option<Self> {
93        let ns = parse_duration_ns(ttl)?;
94        if ns == 0 {
95            return None;
96        }
97        self.default_ttl_ns = Some(ns);
98        Some(self)
99    }
100
101    /// Direct setter when the TTL is already computed in ns.
102    pub fn with_ttl_ns(mut self, ttl_ns: u64) -> Self {
103        self.default_ttl_ns = if ttl_ns == 0 { None } else { Some(ttl_ns) };
104        self
105    }
106
107    /// Align `timestamp_ns` to the chunk's floor — the chunk that
108    /// row belongs to starts at this timestamp and covers
109    /// `[start, start + chunk_interval_ns)`.
110    pub fn chunk_start(&self, timestamp_ns: u64) -> u64 {
111        (timestamp_ns / self.chunk_interval_ns) * self.chunk_interval_ns
112    }
113
114    pub fn chunk_end_exclusive(&self, timestamp_ns: u64) -> u64 {
115        self.chunk_start(timestamp_ns)
116            .saturating_add(self.chunk_interval_ns)
117    }
118}
119
120/// Identifier of a single child chunk. Stable across restart so
121/// catalog + retention can reference it unambiguously.
122#[derive(Debug, Clone, PartialEq, Eq, Hash)]
123pub struct ChunkId {
124    pub hypertable: String,
125    /// Chunk start (inclusive), aligned to `chunk_interval_ns`.
126    pub start_ns: u64,
127}
128
129/// On-disk storage format of a chunk — the **read-bridge dispatch key**
130/// (PRD #850 Phase 1, #861). After `COLUMNAR` is enabled on a collection
131/// that already holds row data, pre-existing chunks stay `Row` and new
132/// chunks seal `ColumnarV1`; the two coexist in the same collection and a
133/// read dispatches on this discriminant — `Row` to the entity/row reader,
134/// `ColumnarV1` to the RDCC column-block reader — with no mass rewrite.
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum ChunkFormat {
137    /// Legacy row-stored chunk (`columnar_page == None`): it predates the
138    /// columnar seal, or sealed while the collection was non-columnar. Its
139    /// rows are served from the entity/row path.
140    Row,
141    /// Columnar `RDCC` chunk, format version 1 (`columnar_page == Some`).
142    /// Its rows decode from the recorded `ColumnBlock`.
143    ColumnarV1,
144}
145
146/// Metadata tracked per child chunk. Physical storage lives in
147/// `TimeSeriesChunk` keyed by `(hypertable, start_ns)`.
148#[derive(Debug, Clone)]
149pub struct ChunkMeta {
150    pub id: ChunkId,
151    pub end_ns_exclusive: u64,
152    pub row_count: u64,
153    pub min_ts_ns: u64,
154    pub max_ts_ns: u64,
155    pub sealed: bool,
156    /// Optional per-chunk TTL override. `None` means "fall back to
157    /// the hypertable's default TTL". Setting this lets mixed-TTL
158    /// policies live inside the same hypertable — e.g. keep the
159    /// current month of data forever but expire everything older
160    /// than 90 days.
161    pub ttl_override_ns: Option<u64>,
162    /// Columnar-vs-row **migration discriminant** (PRD #850, Phase 1).
163    /// `Some(loc)` → this chunk was sealed columnar; its `RDCC`
164    /// [`ColumnBlock`](crate::storage::engine::PageType::ColumnBlock) lives
165    /// at `loc` and reads decode the columnar form. `None` → a legacy
166    /// row-stored chunk served by the entity path (read-bridge lands in
167    /// #861). This MUST persist so pre-existing row-stored data is never
168    /// mis-read as columnar after a restart.
169    pub columnar_page: Option<PageLocation>,
170}
171
172impl ChunkMeta {
173    pub fn new(id: ChunkId, end_ns_exclusive: u64) -> Self {
174        Self {
175            id,
176            end_ns_exclusive,
177            row_count: 0,
178            min_ts_ns: u64::MAX,
179            max_ts_ns: 0,
180            sealed: false,
181            ttl_override_ns: None,
182            columnar_page: None,
183        }
184    }
185
186    /// The chunk's storage format — the read-bridge dispatch key (#861).
187    /// Derived from the migration discriminant `columnar_page`: a recorded
188    /// RDCC `ColumnBlock` location means [`ChunkFormat::ColumnarV1`], its
189    /// absence means the legacy [`ChunkFormat::Row`] form. This is the
190    /// format-version gate that lets old row chunks and new columnar chunks
191    /// coexist in one collection without a rewrite.
192    pub fn format(&self) -> ChunkFormat {
193        match self.columnar_page {
194            Some(_) => ChunkFormat::ColumnarV1,
195            None => ChunkFormat::Row,
196        }
197    }
198
199    /// True when this chunk is stored in the columnar `RDCC` form.
200    pub fn is_columnar(&self) -> bool {
201        matches!(self.format(), ChunkFormat::ColumnarV1)
202    }
203
204    pub fn observe(&mut self, ts_ns: u64) {
205        self.row_count += 1;
206        if ts_ns < self.min_ts_ns {
207            self.min_ts_ns = ts_ns;
208        }
209        if ts_ns > self.max_ts_ns {
210            self.max_ts_ns = ts_ns;
211        }
212    }
213
214    /// Effective TTL = per-chunk override if present, otherwise the
215    /// hypertable default. `None` = the chunk has no automatic
216    /// expiry.
217    pub fn effective_ttl_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
218        self.ttl_override_ns.or(default_ttl_ns)
219    }
220
221    /// Absolute epoch-ns at which the chunk becomes droppable. Uses
222    /// `max_ts_ns` as the baseline — the newest row the chunk has
223    /// ever accepted — so an empty chunk (no rows yet) never
224    /// expires until at least one row lands.
225    pub fn expiry_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
226        let ttl = self.effective_ttl_ns(default_ttl_ns)?;
227        if self.row_count == 0 {
228            return None;
229        }
230        Some(self.max_ts_ns.saturating_add(ttl))
231    }
232
233    pub fn is_expired_at(&self, now_ns: u64, default_ttl_ns: Option<u64>) -> bool {
234        match self.expiry_ns(default_ttl_ns) {
235            Some(expiry) => now_ns >= expiry,
236            None => false,
237        }
238    }
239}
240
241/// In-memory catalog of hypertables and their chunks. Thread-safe
242/// because INSERTs can land from multiple writers simultaneously.
243#[derive(Clone, Default)]
244pub struct HypertableRegistry {
245    inner: Arc<Mutex<RegistryInner>>,
246}
247
248#[derive(Default)]
249struct RegistryInner {
250    specs: BTreeMap<String, HypertableSpec>,
251    /// `(hypertable, start_ns)` → chunk meta. `BTreeMap` so lookups
252    /// by name produce an ordered view (show_chunks must be
253    /// deterministic).
254    chunks: BTreeMap<(String, u64), ChunkMeta>,
255    /// `(hypertable, start_ns)` → the sealed chunk's RDCC `ColumnBlock`
256    /// bytes, populated by [`seal_chunk_columnar`](HypertableRegistry::seal_chunk_columnar)
257    /// (PRD #850, #911). RAM-resident: the migration discriminant
258    /// `ChunkMeta.columnar_page` persists across restart, but durable
259    /// engine-page persistence of these bytes is a follow-up — until the
260    /// page-write bridge lands, a columnar chunk's block lives only here.
261    columnar_blocks: BTreeMap<(String, u64), Vec<u8>>,
262}
263
264impl std::fmt::Debug for HypertableRegistry {
265    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266        let guard = match self.inner.lock() {
267            Ok(g) => g,
268            Err(p) => p.into_inner(),
269        };
270        f.debug_struct("HypertableRegistry")
271            .field("hypertables", &guard.specs.len())
272            .field("chunks", &guard.chunks.len())
273            .finish()
274    }
275}
276
277impl HypertableRegistry {
278    pub fn new() -> Self {
279        Self::default()
280    }
281
282    /// Register a new hypertable. Replaces the previous spec if one
283    /// existed with the same name; chunks for that name are kept —
284    /// the operator is assumed to know what they're doing when they
285    /// redefine (e.g. widening `chunk_interval_ns`).
286    pub fn register(&self, spec: HypertableSpec) {
287        let mut guard = match self.inner.lock() {
288            Ok(g) => g,
289            Err(p) => p.into_inner(),
290        };
291        guard.specs.insert(spec.name.clone(), spec);
292    }
293
294    pub fn get(&self, name: &str) -> Option<HypertableSpec> {
295        let guard = match self.inner.lock() {
296            Ok(g) => g,
297            Err(p) => p.into_inner(),
298        };
299        guard.specs.get(name).cloned()
300    }
301
302    pub fn list(&self) -> Vec<HypertableSpec> {
303        let guard = match self.inner.lock() {
304            Ok(g) => g,
305            Err(p) => p.into_inner(),
306        };
307        guard.specs.values().cloned().collect()
308    }
309
310    /// Drop a hypertable from the registry. Returns the removed spec
311    /// when present, or `None` for unknown names. Callers drop the
312    /// backing collection separately — this is registry housekeeping
313    /// only.
314    pub fn unregister(&self, name: &str) -> Option<HypertableSpec> {
315        let mut guard = match self.inner.lock() {
316            Ok(g) => g,
317            Err(p) => p.into_inner(),
318        };
319        guard.specs.remove(name)
320    }
321
322    /// Route a write: returns the `ChunkId` the row belongs in,
323    /// allocating the chunk on first write. `None` when the
324    /// hypertable is unknown.
325    pub fn route(&self, hypertable: &str, timestamp_ns: u64) -> Option<ChunkId> {
326        let mut guard = match self.inner.lock() {
327            Ok(g) => g,
328            Err(p) => p.into_inner(),
329        };
330        let spec = guard.specs.get(hypertable)?.clone();
331        let start = spec.chunk_start(timestamp_ns);
332        let end = spec.chunk_end_exclusive(timestamp_ns);
333        let id = ChunkId {
334            hypertable: spec.name.clone(),
335            start_ns: start,
336        };
337        let key = (spec.name.clone(), start);
338        let meta = guard
339            .chunks
340            .entry(key)
341            .or_insert_with(|| ChunkMeta::new(id.clone(), end));
342        meta.observe(timestamp_ns);
343        Some(id)
344    }
345
346    /// Return every chunk for `hypertable`, oldest-first.
347    pub fn show_chunks(&self, hypertable: &str) -> Vec<ChunkMeta> {
348        let guard = match self.inner.lock() {
349            Ok(g) => g,
350            Err(p) => p.into_inner(),
351        };
352        guard
353            .chunks
354            .iter()
355            .filter(|((name, _), _)| name == hypertable)
356            .map(|(_, meta)| meta.clone())
357            .collect()
358    }
359
360    /// Drop every chunk of `hypertable` whose `max_ts_ns` is at or
361    /// below `cutoff_ns`. Returns the count dropped — the physical
362    /// storage release is the caller's responsibility (this module
363    /// only owns the metadata).
364    pub fn drop_chunks_before(&self, hypertable: &str, cutoff_ns: u64) -> Vec<ChunkMeta> {
365        let mut guard = match self.inner.lock() {
366            Ok(g) => g,
367            Err(p) => p.into_inner(),
368        };
369        let mut dropped = Vec::new();
370        let keys: Vec<(String, u64)> = guard
371            .chunks
372            .iter()
373            .filter(|((name, _), meta)| name == hypertable && meta.max_ts_ns <= cutoff_ns)
374            .map(|(k, _)| k.clone())
375            .collect();
376        for key in keys {
377            if let Some(meta) = guard.chunks.remove(&key) {
378                dropped.push(meta);
379            }
380        }
381        dropped
382    }
383
384    /// Sweep chunks whose effective TTL has fired. A chunk is
385    /// droppable when `now_ns ≥ max_ts_ns + effective_ttl_ns` — the
386    /// registry hands back every removed `ChunkMeta` so the
387    /// physical-storage callback can release bytes + indexes. Chunks
388    /// without an effective TTL (neither per-chunk override nor
389    /// hypertable default) are never touched.
390    ///
391    /// This is the "TTL applied at the partition level" primitive:
392    /// one O(1) metadata sweep reclaims every row of every expired
393    /// chunk, instead of scanning rows individually like an
394    /// entity-level TTL would. Empty hypertables stay empty.
395    pub fn sweep_expired(&self, hypertable: &str, now_ns: u64) -> Vec<ChunkMeta> {
396        let mut guard = match self.inner.lock() {
397            Ok(g) => g,
398            Err(p) => p.into_inner(),
399        };
400        let Some(spec) = guard.specs.get(hypertable).cloned() else {
401            return Vec::new();
402        };
403        let expired_keys: Vec<(String, u64)> = guard
404            .chunks
405            .iter()
406            .filter(|((name, _), meta)| {
407                name == hypertable && meta.is_expired_at(now_ns, spec.default_ttl_ns)
408            })
409            .map(|(k, _)| k.clone())
410            .collect();
411        let mut dropped = Vec::with_capacity(expired_keys.len());
412        for key in expired_keys {
413            if let Some(meta) = guard.chunks.remove(&key) {
414                dropped.push(meta);
415            }
416        }
417        dropped
418    }
419
420    /// Sweep every registered hypertable in one shot — the loop the
421    /// retention daemon runs every cycle. Returns a flat list of
422    /// `(hypertable_name, chunk_dropped)` pairs.
423    pub fn sweep_all_expired(&self, now_ns: u64) -> Vec<(String, ChunkMeta)> {
424        let names: Vec<String> = {
425            let guard = match self.inner.lock() {
426                Ok(g) => g,
427                Err(p) => p.into_inner(),
428            };
429            guard.specs.keys().cloned().collect()
430        };
431        let mut out = Vec::new();
432        for name in names {
433            for meta in self.sweep_expired(&name, now_ns) {
434                out.push((name.clone(), meta));
435            }
436        }
437        out
438    }
439
440    /// Install / replace the hypertable-wide default TTL. `None`
441    /// disables automatic expiry — chunks live until explicit
442    /// `drop_chunks` / per-chunk override fires.
443    pub fn set_default_ttl_ns(&self, hypertable: &str, ttl_ns: Option<u64>) -> bool {
444        let mut guard = match self.inner.lock() {
445            Ok(g) => g,
446            Err(p) => p.into_inner(),
447        };
448        match guard.specs.get_mut(hypertable) {
449            Some(spec) => {
450                spec.default_ttl_ns = match ttl_ns {
451                    Some(0) | None => None,
452                    Some(v) => Some(v),
453                };
454                true
455            }
456            None => false,
457        }
458    }
459
460    /// Override the TTL for a single chunk. Useful for "keep this
461    /// specific chunk longer because it contains an incident
462    /// replay" or "expire this one faster because it was filled
463    /// from a backfill we're about to redo". `None` removes the
464    /// override and falls back to the hypertable default.
465    pub fn set_chunk_ttl_ns(&self, id: &ChunkId, ttl_ns: Option<u64>) -> bool {
466        let mut guard = match self.inner.lock() {
467            Ok(g) => g,
468            Err(p) => p.into_inner(),
469        };
470        if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
471            meta.ttl_override_ns = ttl_ns;
472            true
473        } else {
474            false
475        }
476    }
477
478    /// Inspect the list of chunks that are *about* to expire within
479    /// `horizon_ns`. Powers preview endpoints ("what will the next
480    /// sweep drop?") without actually dropping anything.
481    pub fn chunks_expiring_within(
482        &self,
483        hypertable: &str,
484        now_ns: u64,
485        horizon_ns: u64,
486    ) -> Vec<ChunkMeta> {
487        let guard = match self.inner.lock() {
488            Ok(g) => g,
489            Err(p) => p.into_inner(),
490        };
491        let Some(spec) = guard.specs.get(hypertable).cloned() else {
492            return Vec::new();
493        };
494        let cutoff = now_ns.saturating_add(horizon_ns);
495        guard
496            .chunks
497            .iter()
498            .filter(|((name, _), _)| name == hypertable)
499            .filter_map(|(_, meta)| {
500                let expiry = meta.expiry_ns(spec.default_ttl_ns)?;
501                if expiry <= cutoff {
502                    Some(meta.clone())
503                } else {
504                    None
505                }
506            })
507            .collect()
508    }
509
510    /// Seal a chunk — future writes to the same `start_ns` bucket
511    /// will still land (open-ended), but the `sealed` flag signals
512    /// the maintenance layer that the chunk can now be compressed /
513    /// uploaded / migrated. Returns `true` if the chunk existed.
514    pub fn seal_chunk(&self, id: &ChunkId) -> bool {
515        let mut guard = match self.inner.lock() {
516            Ok(g) => g,
517            Err(p) => p.into_inner(),
518        };
519        if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
520            meta.sealed = true;
521            true
522        } else {
523            false
524        }
525    }
526
527    /// Seal a chunk **columnar** (PRD #850, #911): mark it sealed, record
528    /// where its RDCC `ColumnBlock` lives in `ChunkMeta.columnar_page`,
529    /// and stash the block `bytes` so the columnar read path can decode
530    /// them. The columnar counterpart of [`seal_chunk`](Self::seal_chunk)
531    /// — the production caller (`seal_chunk_with_config`'s columnar arm)
532    /// hands the sealed bytes here. Returns `true` if the chunk existed.
533    pub fn seal_chunk_columnar(&self, id: &ChunkId, page: PageLocation, bytes: Vec<u8>) -> bool {
534        let mut guard = match self.inner.lock() {
535            Ok(g) => g,
536            Err(p) => p.into_inner(),
537        };
538        let key = (id.hypertable.clone(), id.start_ns);
539        if let Some(meta) = guard.chunks.get_mut(&key) {
540            meta.sealed = true;
541            meta.columnar_page = Some(page);
542            guard.columnar_blocks.insert(key, bytes);
543            true
544        } else {
545            false
546        }
547    }
548
549    /// Fetch the RDCC `ColumnBlock` bytes recorded for a columnar-sealed
550    /// chunk by [`seal_chunk_columnar`](Self::seal_chunk_columnar). `None`
551    /// for row-sealed chunks or chunks whose bytes are not RAM-resident
552    /// (e.g. after a restart, pending the durable page-write bridge).
553    pub fn columnar_block(&self, id: &ChunkId) -> Option<Vec<u8>> {
554        let guard = match self.inner.lock() {
555            Ok(g) => g,
556            Err(p) => p.into_inner(),
557        };
558        guard
559            .columnar_blocks
560            .get(&(id.hypertable.clone(), id.start_ns))
561            .cloned()
562    }
563
564    /// Total row count across every chunk of `hypertable`. Used by
565    /// catalog views + benchmark harnesses.
566    pub fn total_rows(&self, hypertable: &str) -> u64 {
567        let guard = match self.inner.lock() {
568            Ok(g) => g,
569            Err(p) => p.into_inner(),
570        };
571        guard
572            .chunks
573            .iter()
574            .filter(|((name, _), _)| name == hypertable)
575            .map(|(_, meta)| meta.row_count)
576            .sum()
577    }
578
579    /// List the hypertables the retention daemon should sweep.
580    pub fn names(&self) -> Vec<String> {
581        let guard = match self.inner.lock() {
582            Ok(g) => g,
583            Err(p) => p.into_inner(),
584        };
585        guard.specs.keys().cloned().collect()
586    }
587
588    /// True when no hypertable is registered and no chunk is tracked.
589    /// Lets the durability layer skip the persist step entirely for
590    /// workloads that never declared a hypertable (zero overhead).
591    pub fn is_empty(&self) -> bool {
592        let guard = match self.inner.lock() {
593            Ok(g) => g,
594            Err(p) => p.into_inner(),
595        };
596        guard.specs.is_empty() && guard.chunks.is_empty()
597    }
598
599    /// Snapshot every chunk across all hypertables, ordered by
600    /// `(hypertable, start_ns)` so the persisted form is deterministic.
601    /// Pairs with [`restore_chunk`] on boot. Specs are snapshotted via
602    /// [`list`].
603    pub fn snapshot_chunks(&self) -> Vec<ChunkMeta> {
604        let guard = match self.inner.lock() {
605            Ok(g) => g,
606            Err(p) => p.into_inner(),
607        };
608        guard.chunks.values().cloned().collect()
609    }
610
611    /// Reinstate a chunk verbatim during recovery. Overwrites any
612    /// existing entry for the same `(hypertable, start_ns)`.
613    ///
614    /// Unlike [`route`], this does **not** observe a timestamp — it
615    /// restores the persisted counters (`row_count`, `min_ts_ns`,
616    /// `max_ts_ns`, `sealed`, `ttl_override_ns`) exactly so the
617    /// post-restart registry is identical to the pre-restart one. The
618    /// caller is expected to [`register`] the owning spec first; a
619    /// chunk whose hypertable has no spec is still tracked (routing
620    /// falls back to the spec once it is registered), matching the
621    /// pre-restart invariant that chunks outlive a missing spec only
622    /// transiently.
623    pub fn restore_chunk(&self, meta: ChunkMeta) {
624        let mut guard = match self.inner.lock() {
625            Ok(g) => g,
626            Err(p) => p.into_inner(),
627        };
628        let key = (meta.id.hypertable.clone(), meta.id.start_ns);
629        guard.chunks.insert(key, meta);
630    }
631
632    /// Drop the whole hypertable (spec + every chunk). Returns the
633    /// number of chunks removed.
634    pub fn drop_hypertable(&self, name: &str) -> usize {
635        let mut guard = match self.inner.lock() {
636            Ok(g) => g,
637            Err(p) => p.into_inner(),
638        };
639        guard.specs.remove(name);
640        let keys: Vec<(String, u64)> = guard
641            .chunks
642            .keys()
643            .filter(|(n, _)| n == name)
644            .cloned()
645            .collect();
646        for key in &keys {
647            guard.chunks.remove(key);
648        }
649        keys.len()
650    }
651
652    /// Select groups of small sealed columnar chunks that are candidates
653    /// for compaction. Returns a list of groups; each group is a list of
654    /// `ChunkId`s that together hold at most `max_rows_total` rows. Groups
655    /// with fewer than `min_chunks` chunks are not returned (no-op merges).
656    ///
657    /// Chunks are considered in oldest-first order within each hypertable.
658    /// The caller decides the merge policy (threshold sizes, timing).
659    pub fn select_compaction_candidates(
660        &self,
661        hypertable: &str,
662        max_rows_per_group: u64,
663        min_chunks: usize,
664    ) -> Vec<Vec<ChunkId>> {
665        let guard = match self.inner.lock() {
666            Ok(g) => g,
667            Err(p) => p.into_inner(),
668        };
669        // Collect sealed columnar chunks oldest-first.
670        let candidates: Vec<&ChunkMeta> = guard
671            .chunks
672            .iter()
673            .filter(|((name, _), meta)| name == hypertable && meta.sealed && meta.is_columnar())
674            .map(|(_, meta)| meta)
675            .collect();
676
677        // Greedy bin-packing: accumulate into a group until adding the next
678        // chunk would exceed the row budget, then close the group.
679        let mut groups: Vec<Vec<ChunkId>> = Vec::new();
680        let mut current: Vec<ChunkId> = Vec::new();
681        let mut current_rows: u64 = 0;
682
683        for meta in candidates {
684            if !current.is_empty() && current_rows + meta.row_count > max_rows_per_group {
685                if current.len() >= min_chunks {
686                    groups.push(std::mem::take(&mut current));
687                } else {
688                    current.clear();
689                }
690                current_rows = 0;
691            }
692            current.push(meta.id.clone());
693            current_rows += meta.row_count;
694        }
695        if current.len() >= min_chunks {
696            groups.push(current);
697        }
698
699        groups
700    }
701
702    /// Merge N sealed columnar chunks into one larger sealed columnar chunk.
703    ///
704    /// # Crash safety
705    ///
706    /// The merge is computed entirely in memory before any registry state is
707    /// touched. The final registry update (insert merged entry + remove source
708    /// entries) happens atomically under one `Mutex` lock acquisition. If the
709    /// process is killed before the lock is taken, all source chunks remain
710    /// intact — no committed data is lost. A torn merge (crash mid-computation,
711    /// before the atomic commit) is safe because the registry never saw the
712    /// partial output.
713    ///
714    /// # Arguments
715    ///
716    /// * `hypertable` — name of the owning hypertable.
717    /// * `source_ids` — the `ChunkId`s of the chunks to merge (must all be
718    ///   sealed, columnar, and have their blocks RAM-resident).
719    /// * `merged_chunk_id` — the `chunk_id` header field for the output RDCC
720    ///   block; the caller assigns this (e.g. `min(source start_ns)` cast).
721    /// * `schema_ref` — the catalog schema id written into the output header.
722    /// * `granule_size` — sparse-granule-index stride for the merged block.
723    ///   Pass `DEFAULT_GRANULE_SIZE` for the standard 8 192-row marks.
724    ///
725    /// Returns the `ChunkId` of the newly inserted merged chunk on success.
726    pub fn compact_columnar_chunks(
727        &self,
728        hypertable: &str,
729        source_ids: &[ChunkId],
730        merged_chunk_id: u64,
731        schema_ref: u64,
732        granule_size: u32,
733    ) -> Result<ChunkId, CompactionError> {
734        if source_ids.len() < 2 {
735            return Err(CompactionError::InsufficientSources);
736        }
737
738        let mut guard = match self.inner.lock() {
739            Ok(g) => g,
740            Err(p) => p.into_inner(),
741        };
742
743        // --- Validate and collect source data (still holding the lock) ---
744
745        for id in source_ids {
746            let key = (id.hypertable.clone(), id.start_ns);
747            let meta = guard
748                .chunks
749                .get(&key)
750                .ok_or_else(|| CompactionError::ChunkNotFound(id.clone()))?;
751            if !meta.sealed {
752                return Err(CompactionError::ChunkNotSealed(id.clone()));
753            }
754            if !meta.is_columnar() {
755                return Err(CompactionError::ChunkNotColumnar(id.clone()));
756            }
757            if !guard.columnar_blocks.contains_key(&key) {
758                return Err(CompactionError::BlockNotResident(id.clone()));
759            }
760        }
761
762        // --- Decode all source blocks into points ---
763
764        let mut all_points = Vec::new();
765        for id in source_ids {
766            let key = (id.hypertable.clone(), id.start_ns);
767            let bytes = guard.columnar_blocks.get(&key).unwrap();
768            let pts = points_from_column_block(bytes).map_err(CompactionError::Decode)?;
769            all_points.extend(pts);
770        }
771
772        // Sort by timestamp — chunks are time-partitioned so they should not
773        // overlap, but we sort anyway to guarantee the output is ordered.
774        all_points.sort_by_key(|p| p.timestamp_ns);
775
776        // --- Build the merged RDCC block ---
777
778        let row_count = all_points.len() as u64;
779        let min_ts_ns = all_points.first().map(|p| p.timestamp_ns).unwrap_or(0);
780        let max_ts_ns = all_points.last().map(|p| p.timestamp_ns).unwrap_or(0);
781
782        let ts_bytes: Vec<u8> = all_points
783            .iter()
784            .flat_map(|p| p.timestamp_ns.to_le_bytes())
785            .collect();
786        let val_bytes: Vec<u8> = all_points
787            .iter()
788            .flat_map(|p| p.value.to_le_bytes())
789            .collect();
790
791        let merged_bytes = write_column_block(
792            merged_chunk_id,
793            schema_ref,
794            row_count,
795            min_ts_ns,
796            max_ts_ns,
797            granule_size,
798            &[
799                ColumnInput {
800                    column_id: COLUMNAR_TS_COLUMN_ID,
801                    logical_type: DataType::UnsignedInteger.to_byte(),
802                    semantics: ColumnSemantics::Timestamp,
803                    data: &ts_bytes,
804                },
805                ColumnInput {
806                    column_id: COLUMNAR_VALUE_COLUMN_ID,
807                    logical_type: DataType::Float.to_byte(),
808                    semantics: ColumnSemantics::Gauge,
809                    data: &val_bytes,
810                },
811            ],
812        )
813        .map_err(CompactionError::Encode)?;
814
815        // --- Derive merged chunk metadata ---
816
817        // The merged chunk spans the time range of all sources.
818        let merged_start_ns = source_ids.iter().map(|id| id.start_ns).min().unwrap(); // safe: source_ids.len() >= 2
819        let merged_end_ns_exclusive = source_ids
820            .iter()
821            .map(|id| {
822                guard
823                    .chunks
824                    .get(&(id.hypertable.clone(), id.start_ns))
825                    .map(|m| m.end_ns_exclusive)
826                    .unwrap_or(merged_start_ns)
827            })
828            .max()
829            .unwrap();
830
831        let merged_id = ChunkId {
832            hypertable: hypertable.to_string(),
833            start_ns: merged_start_ns,
834        };
835
836        // Synthetic page location: page_id=0, offset=0, length=block_len.
837        // Callers that write through a real Pager should instead call
838        // `seal_chunk_columnar` with the returned PageLocation from the pager
839        // write; this sentinel is correct for the RAM-only path used here and
840        // in tests.
841        let merged_page = PageLocation::new(0, 0, merged_bytes.len() as u32);
842
843        let mut merged_meta = ChunkMeta::new(merged_id.clone(), merged_end_ns_exclusive);
844        merged_meta.row_count = row_count;
845        merged_meta.min_ts_ns = min_ts_ns;
846        merged_meta.max_ts_ns = max_ts_ns;
847        merged_meta.sealed = true;
848        merged_meta.columnar_page = Some(merged_page);
849
850        // --- Atomic commit: insert merged, remove sources ---
851
852        let merged_key = (hypertable.to_string(), merged_start_ns);
853        guard.chunks.insert(merged_key.clone(), merged_meta);
854        guard.columnar_blocks.insert(merged_key, merged_bytes);
855
856        for id in source_ids {
857            // Skip if source_id == merged_id (idempotent when the first source
858            // shares start_ns with the merged chunk).
859            if id.start_ns == merged_start_ns && id.hypertable == hypertable {
860                continue;
861            }
862            let key = (id.hypertable.clone(), id.start_ns);
863            guard.chunks.remove(&key);
864            guard.columnar_blocks.remove(&key);
865        }
866
867        Ok(merged_id)
868    }
869}
870
871/// Errors returned by [`HypertableRegistry::compact_columnar_chunks`].
872#[derive(Debug, Clone, PartialEq)]
873pub enum CompactionError {
874    /// Need at least 2 source chunks — merging 0 or 1 is a no-op.
875    InsufficientSources,
876    /// One of the source chunks does not exist in this hypertable.
877    ChunkNotFound(ChunkId),
878    /// A source chunk is not yet sealed — only sealed chunks can be compacted.
879    ChunkNotSealed(ChunkId),
880    /// A source chunk is not in columnar form.
881    ChunkNotColumnar(ChunkId),
882    /// A source chunk's RDCC block bytes are not RAM-resident.
883    BlockNotResident(ChunkId),
884    /// The RDCC block could not be decoded.
885    Decode(ColumnBlockError),
886    /// The merged RDCC block could not be encoded.
887    Encode(ColumnBlockError),
888}
889
890impl std::fmt::Display for CompactionError {
891    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
892        match self {
893            Self::InsufficientSources => {
894                write!(f, "compaction requires at least 2 source chunks")
895            }
896            Self::ChunkNotFound(id) => write!(
897                f,
898                "source chunk {}@{} not found",
899                id.hypertable, id.start_ns
900            ),
901            Self::ChunkNotSealed(id) => write!(
902                f,
903                "source chunk {}@{} is not sealed",
904                id.hypertable, id.start_ns
905            ),
906            Self::ChunkNotColumnar(id) => write!(
907                f,
908                "source chunk {}@{} is not columnar",
909                id.hypertable, id.start_ns
910            ),
911            Self::BlockNotResident(id) => write!(
912                f,
913                "source chunk {}@{} block bytes are not RAM-resident",
914                id.hypertable, id.start_ns
915            ),
916            Self::Decode(e) => write!(f, "RDCC decode error: {e}"),
917            Self::Encode(e) => write!(f, "RDCC encode error: {e}"),
918        }
919    }
920}
921
922impl std::error::Error for CompactionError {}
923
924#[cfg(test)]
925mod tests {
926    use super::*;
927
928    const DAY_NS: u64 = 86_400_000_000_000;
929    const HOUR_NS: u64 = 3_600_000_000_000;
930
931    #[test]
932    fn chunk_start_aligns_to_interval_floor() {
933        let spec = HypertableSpec::new("m", "ts", DAY_NS);
934        assert_eq!(spec.chunk_start(0), 0);
935        assert_eq!(spec.chunk_start(DAY_NS - 1), 0);
936        assert_eq!(spec.chunk_start(DAY_NS), DAY_NS);
937        assert_eq!(spec.chunk_start(3 * DAY_NS + 123), 3 * DAY_NS);
938    }
939
940    #[test]
941    fn interval_string_accepts_duration_units() {
942        let s = HypertableSpec::from_interval_string("m", "ts", "1d").unwrap();
943        assert_eq!(s.chunk_interval_ns, DAY_NS);
944        let s = HypertableSpec::from_interval_string("m", "ts", "1h").unwrap();
945        assert_eq!(s.chunk_interval_ns, HOUR_NS);
946        assert!(HypertableSpec::from_interval_string("m", "ts", "raw").is_none());
947        assert!(HypertableSpec::from_interval_string("m", "ts", "garbage").is_none());
948    }
949
950    #[test]
951    fn route_allocates_chunk_on_first_write() {
952        let reg = HypertableRegistry::new();
953        reg.register(HypertableSpec::new("metrics", "ts", DAY_NS));
954        let id = reg.route("metrics", DAY_NS + 100).unwrap();
955        assert_eq!(id.hypertable, "metrics");
956        assert_eq!(id.start_ns, DAY_NS);
957        let chunks = reg.show_chunks("metrics");
958        assert_eq!(chunks.len(), 1);
959        assert_eq!(chunks[0].row_count, 1);
960        assert_eq!(chunks[0].min_ts_ns, DAY_NS + 100);
961        assert_eq!(chunks[0].max_ts_ns, DAY_NS + 100);
962        assert_eq!(chunks[0].end_ns_exclusive, 2 * DAY_NS);
963    }
964
965    #[test]
966    fn route_groups_writes_within_same_chunk() {
967        let reg = HypertableRegistry::new();
968        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
969        for offset in [10u64, 100, 1_000, DAY_NS - 1] {
970            let id = reg.route("m", offset).unwrap();
971            assert_eq!(id.start_ns, 0);
972        }
973        let chunks = reg.show_chunks("m");
974        assert_eq!(chunks.len(), 1);
975        assert_eq!(chunks[0].row_count, 4);
976    }
977
978    #[test]
979    fn route_splits_writes_across_adjacent_chunks() {
980        let reg = HypertableRegistry::new();
981        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
982        reg.route("m", DAY_NS - 1).unwrap();
983        reg.route("m", DAY_NS).unwrap();
984        reg.route("m", 2 * DAY_NS).unwrap();
985        let chunks = reg.show_chunks("m");
986        assert_eq!(chunks.len(), 3);
987        assert!(chunks[0].id.start_ns <= chunks[1].id.start_ns);
988        assert!(chunks[1].id.start_ns <= chunks[2].id.start_ns);
989    }
990
991    #[test]
992    fn route_returns_none_for_unknown_hypertable() {
993        let reg = HypertableRegistry::new();
994        assert!(reg.route("nope", 0).is_none());
995    }
996
997    #[test]
998    fn drop_chunks_before_removes_matching_chunks() {
999        let reg = HypertableRegistry::new();
1000        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1001        reg.route("m", 0).unwrap(); // chunk start 0, max=0
1002        reg.route("m", DAY_NS).unwrap(); // chunk start DAY_NS, max=DAY_NS
1003        reg.route("m", 2 * DAY_NS + 5).unwrap(); // chunk start 2*DAY_NS
1004
1005        let dropped = reg.drop_chunks_before("m", DAY_NS);
1006        // max_ts_ns of first chunk is 0, of second chunk is DAY_NS.
1007        // cutoff = DAY_NS, so both are "<= cutoff" → both dropped.
1008        assert_eq!(dropped.len(), 2);
1009        let remaining = reg.show_chunks("m");
1010        assert_eq!(remaining.len(), 1);
1011        assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
1012    }
1013
1014    #[test]
1015    fn show_chunks_is_ordered_by_start() {
1016        let reg = HypertableRegistry::new();
1017        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1018        for ts in [5 * DAY_NS, 2 * DAY_NS, 7 * DAY_NS, 1 * DAY_NS] {
1019            reg.route("m", ts).unwrap();
1020        }
1021        let starts: Vec<u64> = reg.show_chunks("m").iter().map(|c| c.id.start_ns).collect();
1022        assert_eq!(starts, vec![DAY_NS, 2 * DAY_NS, 5 * DAY_NS, 7 * DAY_NS]);
1023    }
1024
1025    #[test]
1026    fn seal_chunk_flips_flag() {
1027        let reg = HypertableRegistry::new();
1028        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1029        let id = reg.route("m", 0).unwrap();
1030        assert!(reg.seal_chunk(&id));
1031        assert!(reg.show_chunks("m")[0].sealed);
1032    }
1033
1034    #[test]
1035    fn drop_hypertable_removes_everything() {
1036        let reg = HypertableRegistry::new();
1037        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1038        reg.route("m", 0).unwrap();
1039        reg.route("m", DAY_NS).unwrap();
1040        assert_eq!(reg.drop_hypertable("m"), 2);
1041        assert!(reg.get("m").is_none());
1042        assert!(reg.show_chunks("m").is_empty());
1043    }
1044
1045    #[test]
1046    fn total_rows_sums_every_chunk() {
1047        let reg = HypertableRegistry::new();
1048        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1049        for ts in 0..1000 {
1050            reg.route("m", ts).unwrap();
1051        }
1052        for ts in DAY_NS..DAY_NS + 500 {
1053            reg.route("m", ts).unwrap();
1054        }
1055        assert_eq!(reg.total_rows("m"), 1500);
1056    }
1057
1058    #[test]
1059    fn names_lists_registered_hypertables() {
1060        let reg = HypertableRegistry::new();
1061        reg.register(HypertableSpec::new("a", "ts", DAY_NS));
1062        reg.register(HypertableSpec::new("b", "ts", HOUR_NS));
1063        let mut names = reg.names();
1064        names.sort();
1065        assert_eq!(names, vec!["a", "b"]);
1066    }
1067
1068    // -----------------------------------------------------------------
1069    // Partition-level TTL
1070    // -----------------------------------------------------------------
1071
1072    #[test]
1073    fn with_ttl_parses_duration_and_sets_default() {
1074        let s = HypertableSpec::new("m", "ts", DAY_NS)
1075            .with_ttl("7d")
1076            .unwrap();
1077        assert_eq!(s.default_ttl_ns, Some(7 * DAY_NS));
1078        assert!(HypertableSpec::new("m", "ts", DAY_NS)
1079            .with_ttl("raw")
1080            .is_none());
1081        assert!(HypertableSpec::new("m", "ts", DAY_NS)
1082            .with_ttl("garbage")
1083            .is_none());
1084    }
1085
1086    #[test]
1087    fn chunk_with_no_rows_never_expires() {
1088        let meta = ChunkMeta::new(
1089            ChunkId {
1090                hypertable: "m".into(),
1091                start_ns: 0,
1092            },
1093            DAY_NS,
1094        );
1095        assert!(!meta.is_expired_at(u64::MAX, Some(1)));
1096    }
1097
1098    #[test]
1099    fn chunk_expires_when_now_crosses_max_ts_plus_ttl() {
1100        let mut meta = ChunkMeta::new(
1101            ChunkId {
1102                hypertable: "m".into(),
1103                start_ns: 0,
1104            },
1105            DAY_NS,
1106        );
1107        meta.observe(500);
1108        // TTL = 1000, max_ts = 500 → expires at 1500.
1109        assert!(!meta.is_expired_at(1000, Some(1000)));
1110        assert!(!meta.is_expired_at(1499, Some(1000)));
1111        assert!(meta.is_expired_at(1500, Some(1000)));
1112    }
1113
1114    #[test]
1115    fn per_chunk_override_wins_over_hypertable_default() {
1116        let mut meta = ChunkMeta::new(
1117            ChunkId {
1118                hypertable: "m".into(),
1119                start_ns: 0,
1120            },
1121            DAY_NS,
1122        );
1123        meta.observe(500);
1124        // Default would say "expire at 500+1000 = 1500"; override
1125        // narrows to 500+100 = 600.
1126        meta.ttl_override_ns = Some(100);
1127        assert!(meta.is_expired_at(600, Some(1000)));
1128        assert!(!meta.is_expired_at(599, Some(1000)));
1129    }
1130
1131    #[test]
1132    fn sweep_expired_drops_chunks_past_ttl_and_returns_them() {
1133        let reg = HypertableRegistry::new();
1134        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(2 * DAY_NS));
1135        // 3 chunks — at 0, DAY, 2*DAY.
1136        for t in [0, DAY_NS, 2 * DAY_NS] {
1137            reg.route("m", t).unwrap();
1138        }
1139        // now = 3 * DAY + 1 → chunks with max_ts in {0, DAY_NS} both
1140        // expired (max + 2d ≤ now), chunk at 2*DAY_NS still alive.
1141        let dropped = reg.sweep_expired("m", 3 * DAY_NS + 1);
1142        let mut starts: Vec<u64> = dropped.iter().map(|m| m.id.start_ns).collect();
1143        starts.sort();
1144        assert_eq!(starts, vec![0, DAY_NS]);
1145        let remaining = reg.show_chunks("m");
1146        assert_eq!(remaining.len(), 1);
1147        assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
1148    }
1149
1150    #[test]
1151    fn sweep_without_ttl_keeps_every_chunk() {
1152        let reg = HypertableRegistry::new();
1153        reg.register(HypertableSpec::new("m", "ts", DAY_NS)); // no TTL
1154        for t in [0, DAY_NS, 2 * DAY_NS] {
1155            reg.route("m", t).unwrap();
1156        }
1157        let dropped = reg.sweep_expired("m", 10_000 * DAY_NS);
1158        assert!(dropped.is_empty());
1159        assert_eq!(reg.show_chunks("m").len(), 3);
1160    }
1161
1162    #[test]
1163    fn sweep_all_expired_iterates_every_hypertable() {
1164        let reg = HypertableRegistry::new();
1165        reg.register(HypertableSpec::new("fast", "ts", HOUR_NS).with_ttl_ns(HOUR_NS));
1166        reg.register(HypertableSpec::new("slow", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
1167        // fast: chunk at 0 expires fast; slow: chunk at 0 still
1168        // within its 7-day TTL.
1169        reg.route("fast", 0).unwrap();
1170        reg.route("slow", 0).unwrap();
1171        let dropped = reg.sweep_all_expired(2 * HOUR_NS);
1172        assert_eq!(dropped.len(), 1);
1173        assert_eq!(dropped[0].0, "fast");
1174        assert_eq!(reg.show_chunks("slow").len(), 1);
1175    }
1176
1177    #[test]
1178    fn set_chunk_ttl_ns_lets_caller_pin_or_shorten() {
1179        let reg = HypertableRegistry::new();
1180        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1181        let id = reg.route("m", 0).unwrap();
1182        // Raise TTL to 100 days — chunk should survive the sweep.
1183        assert!(reg.set_chunk_ttl_ns(&id, Some(100 * DAY_NS)));
1184        let dropped = reg.sweep_expired("m", 10 * DAY_NS);
1185        assert!(dropped.is_empty());
1186        // Now shorten to 1 hour and sweep.
1187        reg.set_chunk_ttl_ns(&id, Some(HOUR_NS));
1188        let dropped = reg.sweep_expired("m", 10 * HOUR_NS);
1189        assert_eq!(dropped.len(), 1);
1190    }
1191
1192    #[test]
1193    fn snapshot_then_restore_reproduces_registry_identically() {
1194        // Pre-restart registry: two hypertables, several chunks, with a
1195        // sealed chunk and a per-chunk TTL override — the bits that are
1196        // NOT derivable from row data and so must round-trip verbatim.
1197        let reg = HypertableRegistry::new();
1198        reg.register(HypertableSpec::new("metrics", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
1199        reg.register(HypertableSpec::new("events", "ts", HOUR_NS));
1200        for t in [0, DAY_NS + 5, DAY_NS + 9, 2 * DAY_NS] {
1201            reg.route("metrics", t).unwrap();
1202        }
1203        let id = reg.route("events", 0).unwrap();
1204        reg.seal_chunk(&id);
1205        reg.set_chunk_ttl_ns(&id, Some(3 * HOUR_NS));
1206
1207        let specs = reg.list();
1208        let chunks = reg.snapshot_chunks();
1209        assert!(!reg.is_empty());
1210
1211        // Simulated restart: rebuild a fresh registry from the snapshot.
1212        let restored = HypertableRegistry::new();
1213        assert!(restored.is_empty());
1214        for spec in specs {
1215            restored.register(spec);
1216        }
1217        for chunk in chunks {
1218            restored.restore_chunk(chunk);
1219        }
1220
1221        // Specs identical.
1222        let before = reg.get("metrics").unwrap();
1223        let after = restored.get("metrics").unwrap();
1224        assert_eq!(after.chunk_interval_ns, before.chunk_interval_ns);
1225        assert_eq!(after.time_column, before.time_column);
1226        assert_eq!(after.default_ttl_ns, before.default_ttl_ns);
1227
1228        // Chunk metadata identical (bounds, counts, sealed, TTL override).
1229        let m_before = reg.show_chunks("metrics");
1230        let m_after = restored.show_chunks("metrics");
1231        assert_eq!(m_after.len(), m_before.len());
1232        for (a, b) in m_after.iter().zip(m_before.iter()) {
1233            assert_eq!(a.id.start_ns, b.id.start_ns);
1234            assert_eq!(a.end_ns_exclusive, b.end_ns_exclusive);
1235            assert_eq!(a.row_count, b.row_count);
1236            assert_eq!(a.min_ts_ns, b.min_ts_ns);
1237            assert_eq!(a.max_ts_ns, b.max_ts_ns);
1238        }
1239        let e_after = restored.show_chunks("events");
1240        assert_eq!(e_after.len(), 1);
1241        assert!(e_after[0].sealed, "sealed flag must survive restore");
1242        assert_eq!(e_after[0].ttl_override_ns, Some(3 * HOUR_NS));
1243
1244        // A post-restart write routes to the EXISTING chunk — no
1245        // duplicate allocation.
1246        let routed = restored.route("metrics", DAY_NS + 1).unwrap();
1247        assert_eq!(routed.start_ns, DAY_NS);
1248        assert_eq!(
1249            restored.show_chunks("metrics").len(),
1250            m_before.len(),
1251            "write after restore must not allocate a new chunk"
1252        );
1253    }
1254
1255    // -----------------------------------------------------------------
1256    // Columnar chunk eviction via the EXISTING TTL/drop path (issue #859)
1257    //
1258    // A sealed *columnar* chunk is one whose `ChunkMeta.columnar_page` is
1259    // `Some(..)` — the RDCC `ColumnBlock` discriminant (PRD #850 Phase 1).
1260    // These guards prove the retention path is storage-form agnostic: the
1261    // SAME `sweep_expired` / `drop_chunks_before` metadata sweep that drops
1262    // row chunks drops columnar chunks too, in O(1) metadata work (no
1263    // per-row delete), and hands the `columnar_page` back on the dropped
1264    // meta so the physical-storage callback can release the RDCC block.
1265    // No separate columnar TTL/partition subsystem exists or is needed.
1266    // -----------------------------------------------------------------
1267
1268    /// Build a sealed columnar chunk meta directly — mirrors what the
1269    /// boot/seal path restores: a chunk carrying its RDCC `columnar_page`.
1270    fn columnar_chunk(hypertable: &str, start_ns: u64, max_ts_ns: u64) -> ChunkMeta {
1271        let mut meta = ChunkMeta::new(
1272            ChunkId {
1273                hypertable: hypertable.into(),
1274                start_ns,
1275            },
1276            start_ns + DAY_NS,
1277        );
1278        meta.row_count = 1;
1279        meta.min_ts_ns = max_ts_ns;
1280        meta.max_ts_ns = max_ts_ns;
1281        meta.sealed = true;
1282        meta.columnar_page = Some(PageLocation::new(7, 0, 1234));
1283        meta
1284    }
1285
1286    #[test]
1287    fn columnar_chunk_evicts_via_sweep_expired_carrying_its_page() {
1288        let reg = HypertableRegistry::new();
1289        reg.register(HypertableSpec::new("metrics", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1290        // Inject a sealed columnar chunk (max_ts=0 → expiry at 1d).
1291        reg.restore_chunk(columnar_chunk("metrics", 0, 0));
1292        assert!(reg.show_chunks("metrics")[0].columnar_page.is_some());
1293
1294        // now = 3d → past the 1d TTL. The existing partition sweep drops it.
1295        let dropped = reg.sweep_expired("metrics", 3 * DAY_NS);
1296        assert_eq!(dropped.len(), 1, "columnar chunk must evict via TTL sweep");
1297        assert_eq!(
1298            dropped[0].columnar_page,
1299            Some(PageLocation::new(7, 0, 1234)),
1300            "dropped meta must carry columnar_page so physical release frees the RDCC block"
1301        );
1302        assert!(reg.show_chunks("metrics").is_empty());
1303    }
1304
1305    #[test]
1306    fn columnar_chunk_evicts_via_drop_chunks_before() {
1307        let reg = HypertableRegistry::new();
1308        reg.register(HypertableSpec::new("metrics", "ts", DAY_NS));
1309        reg.restore_chunk(columnar_chunk("metrics", 0, 0));
1310
1311        let dropped = reg.drop_chunks_before("metrics", DAY_NS);
1312        assert_eq!(dropped.len(), 1);
1313        assert!(
1314            dropped[0].columnar_page.is_some(),
1315            "drop_chunks_before is metadata-only and carries columnar_page through"
1316        );
1317        assert!(reg.show_chunks("metrics").is_empty());
1318    }
1319
1320    #[test]
1321    fn columnar_and_row_chunks_share_one_eviction_path() {
1322        // Regression guard (acceptance #3): a row chunk and a columnar
1323        // chunk with identical bounds + TTL must produce identical sweep
1324        // outcomes. If a separate columnar TTL subsystem were ever
1325        // introduced, the two would diverge here.
1326        let mk = |columnar: bool| {
1327            let reg = HypertableRegistry::new();
1328            reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1329            if columnar {
1330                reg.restore_chunk(columnar_chunk("m", 0, 0));
1331            } else {
1332                reg.route("m", 0).unwrap(); // row chunk, max_ts=0
1333            }
1334            reg.sweep_expired("m", 3 * DAY_NS).len()
1335        };
1336        assert_eq!(mk(false), 1, "row chunk evicts");
1337        assert_eq!(mk(true), 1, "columnar chunk evicts the same way");
1338    }
1339
1340    #[test]
1341    fn columnar_chunk_prunes_by_time_bounds_like_row_chunk() {
1342        // Acceptance #2: the partition pruner selects chunks by their
1343        // [start_ns, end_ns_exclusive) bounds (surfaced via show_chunks) —
1344        // it never inspects columnar_page, so a columnar chunk outside the
1345        // query window is eliminated identically to a row chunk. We assert
1346        // the bounds the pruner consumes survive verbatim for a columnar
1347        // chunk.
1348        let reg = HypertableRegistry::new();
1349        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1350        reg.restore_chunk(columnar_chunk("m", 0, 0));
1351        reg.restore_chunk(columnar_chunk("m", 2 * DAY_NS, 2 * DAY_NS));
1352        let chunks = reg.show_chunks("m");
1353        // A window of [2d, 3d) overlaps only the second chunk — same
1354        // arithmetic the RangeChild pruner runs against these bounds.
1355        let lo = 2 * DAY_NS;
1356        let hi = 3 * DAY_NS;
1357        let overlapping: Vec<u64> = chunks
1358            .iter()
1359            .filter(|c| c.id.start_ns < hi && c.end_ns_exclusive > lo)
1360            .map(|c| c.id.start_ns)
1361            .collect();
1362        assert_eq!(
1363            overlapping,
1364            vec![2 * DAY_NS],
1365            "only in-window columnar chunk kept"
1366        );
1367        assert!(chunks.iter().all(|c| c.columnar_page.is_some()));
1368    }
1369
1370    /// Read-bridge dispatch key (#861): `ChunkMeta::format()` classifies a
1371    /// chunk purely from the `columnar_page` migration discriminant, so a
1372    /// pre-existing row chunk and a newly columnar-sealed chunk in the same
1373    /// registry are disambiguated by format version — the gate the read
1374    /// path dispatches on.
1375    #[test]
1376    fn chunk_format_dispatches_on_columnar_page_discriminant() {
1377        let reg = HypertableRegistry::new();
1378        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1379        // A row chunk (allocated by a write) and a columnar chunk coexist.
1380        reg.route("m", 0).unwrap();
1381        reg.restore_chunk(columnar_chunk("m", DAY_NS, DAY_NS));
1382
1383        let chunks = reg.show_chunks("m");
1384        let row = chunks.iter().find(|c| c.id.start_ns == 0).unwrap();
1385        let col = chunks.iter().find(|c| c.id.start_ns == DAY_NS).unwrap();
1386
1387        assert_eq!(row.format(), ChunkFormat::Row);
1388        assert!(!row.is_columnar());
1389        assert_eq!(col.format(), ChunkFormat::ColumnarV1);
1390        assert!(col.is_columnar());
1391    }
1392
1393    #[test]
1394    fn chunks_expiring_within_previews_without_dropping() {
1395        let reg = HypertableRegistry::new();
1396        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1397        // Chunks with max_ts at 0, 1d, 2d → expiries at 1d, 2d, 3d.
1398        for t in [0, DAY_NS, 2 * DAY_NS] {
1399            reg.route("m", t).unwrap();
1400        }
1401        // now=0, horizon=1.5d → only the first chunk (expiry 1d)
1402        // fits. Tight horizon proves the cutoff math.
1403        let preview = reg.chunks_expiring_within("m", 0, DAY_NS + DAY_NS / 2);
1404        assert_eq!(preview.len(), 1);
1405        assert_eq!(preview[0].id.start_ns, 0);
1406        // Wider horizon pulls in the second chunk too.
1407        let preview2 = reg.chunks_expiring_within("m", 0, 2 * DAY_NS);
1408        assert_eq!(preview2.len(), 2);
1409        // Registry still has every chunk — preview never drops.
1410        assert_eq!(reg.show_chunks("m").len(), 3);
1411    }
1412
1413    // -----------------------------------------------------------------
1414    // Chunk compaction (#857)
1415    // -----------------------------------------------------------------
1416
1417    /// Seal a columnar chunk into the registry with real RDCC block bytes.
1418    fn seal_columnar_chunk_into(
1419        reg: &HypertableRegistry,
1420        hypertable: &str,
1421        start_ns: u64,
1422        end_ns_exclusive: u64,
1423        points: &[(u64, f64)],
1424        schema_ref: u64,
1425    ) -> ChunkId {
1426        use super::super::chunk::TimeSeriesChunk;
1427        use std::collections::HashMap;
1428
1429        let id = ChunkId {
1430            hypertable: hypertable.to_string(),
1431            start_ns,
1432        };
1433        let mut chunk = TimeSeriesChunk::new("m", HashMap::new());
1434        for &(ts, v) in points {
1435            chunk.append(ts, v);
1436        }
1437        let block = chunk
1438            .seal_columnar(start_ns, schema_ref)
1439            .expect("seal_columnar");
1440        let page = PageLocation::new(0, 0, block.len() as u32);
1441
1442        let mut meta = ChunkMeta::new(id.clone(), end_ns_exclusive);
1443        meta.sealed = true;
1444        meta.columnar_page = Some(page);
1445        meta.row_count = points.len() as u64;
1446        if let Some(&(min_ts, _)) = points.iter().min_by_key(|(ts, _)| ts) {
1447            meta.min_ts_ns = min_ts;
1448        }
1449        if let Some(&(max_ts, _)) = points.iter().max_by_key(|(ts, _)| ts) {
1450            meta.max_ts_ns = max_ts;
1451        }
1452
1453        {
1454            let mut guard = reg.inner.lock().unwrap();
1455            guard
1456                .chunks
1457                .insert((hypertable.to_string(), start_ns), meta);
1458            guard
1459                .columnar_blocks
1460                .insert((hypertable.to_string(), start_ns), block);
1461        }
1462        id
1463    }
1464
1465    /// Acceptance criterion 1: N sealed chunks merge into one with logically
1466    /// identical contents (same rows and values, in timestamp order).
1467    #[test]
1468    fn compact_merges_chunks_to_identical_rows() {
1469        let reg = HypertableRegistry::new();
1470        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1471
1472        // Three small chunks with non-overlapping time ranges.
1473        let pts_a: Vec<(u64, f64)> = (0..10).map(|i| (i * 1_000, i as f64)).collect();
1474        let pts_b: Vec<(u64, f64)> = (10..20).map(|i| (i * 1_000, i as f64)).collect();
1475        let pts_c: Vec<(u64, f64)> = (20..30).map(|i| (i * 1_000, i as f64)).collect();
1476
1477        let id_a = seal_columnar_chunk_into(&reg, "m", 0, DAY_NS, &pts_a, 1);
1478        let id_b = seal_columnar_chunk_into(&reg, "m", DAY_NS, 2 * DAY_NS, &pts_b, 1);
1479        let id_c = seal_columnar_chunk_into(&reg, "m", 2 * DAY_NS, 3 * DAY_NS, &pts_c, 1);
1480
1481        let merged_id = reg
1482            .compact_columnar_chunks("m", &[id_a, id_b, id_c], 0, 1, DEFAULT_GRANULE_SIZE)
1483            .expect("compaction failed");
1484
1485        // Merged chunk exists and carries the right row count.
1486        let chunks = reg.show_chunks("m");
1487        assert_eq!(chunks.len(), 1, "three source chunks must collapse to one");
1488        let merged_meta = &chunks[0];
1489        assert_eq!(merged_meta.id.start_ns, merged_id.start_ns);
1490        assert_eq!(merged_meta.row_count, 30);
1491        assert!(merged_meta.sealed);
1492        assert!(merged_meta.is_columnar());
1493
1494        // Decode the merged block and verify every point is present.
1495        let block = reg
1496            .columnar_block(&merged_id)
1497            .expect("merged block must be RAM-resident");
1498        let got = points_from_column_block(&block).expect("decode merged block");
1499
1500        let mut expected: Vec<(u64, f64)> =
1501            pts_a.iter().chain(&pts_b).chain(&pts_c).copied().collect();
1502        expected.sort_by_key(|(ts, _)| *ts);
1503
1504        assert_eq!(got.len(), expected.len());
1505        for (point, (exp_ts, exp_val)) in got.iter().zip(&expected) {
1506            assert_eq!(point.timestamp_ns, *exp_ts);
1507            assert!(
1508                (point.value - exp_val).abs() < 1e-9,
1509                "value mismatch at ts {}: got {}, expected {}",
1510                exp_ts,
1511                point.value,
1512                exp_val
1513            );
1514        }
1515    }
1516
1517    /// Acceptance criterion 2: small-chunk count drops after compaction;
1518    /// merged block is recompressed (byte length is within reason).
1519    #[test]
1520    fn compact_reduces_chunk_count_and_recompresses() {
1521        let reg = HypertableRegistry::new();
1522        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1523
1524        // Five small chunks; each contributes 50 points.
1525        let mut ids = Vec::new();
1526        for i in 0..5u64 {
1527            let pts: Vec<(u64, f64)> = (0..50u64)
1528                .map(|j| (i * DAY_NS + j * 1_000, 1.0 + j as f64 * 0.01))
1529                .collect();
1530            let id = seal_columnar_chunk_into(&reg, "m", i * DAY_NS, (i + 1) * DAY_NS, &pts, 1);
1531            ids.push(id);
1532        }
1533
1534        assert_eq!(reg.show_chunks("m").len(), 5);
1535
1536        let merged_id = reg
1537            .compact_columnar_chunks("m", &ids, 0, 1, DEFAULT_GRANULE_SIZE)
1538            .expect("compaction failed");
1539
1540        // Count dropped from 5 to 1.
1541        let chunks = reg.show_chunks("m");
1542        assert_eq!(chunks.len(), 1, "five chunks must compact to one");
1543        assert_eq!(chunks[0].row_count, 250);
1544
1545        // Merged block is present and decodable.
1546        let block = reg.columnar_block(&merged_id).unwrap();
1547        let pts = points_from_column_block(&block).unwrap();
1548        assert_eq!(pts.len(), 250, "all 250 points must survive compaction");
1549
1550        // Compression sanity: the merged block must be smaller than 250 * 16
1551        // uncompressed bytes (timestamp 8 + value 8).
1552        let raw_uncompressed = 250 * 16;
1553        assert!(
1554            block.len() < raw_uncompressed,
1555            "merged block ({} bytes) should be compressed (raw = {})",
1556            block.len(),
1557            raw_uncompressed
1558        );
1559    }
1560
1561    /// Acceptance criterion 3: a "torn merge" (crash before commit) leaves
1562    /// the source chunks intact and loses no committed data.
1563    ///
1564    /// We simulate a torn merge by:
1565    /// 1. Reading the source blocks (the expensive "compute" phase).
1566    /// 2. Verifying sources still exist before the atomic commit fires.
1567    /// 3. Running the actual compaction and verifying sources are removed
1568    ///    only after the merged chunk is fully committed.
1569    #[test]
1570    fn torn_merge_leaves_inputs_intact() {
1571        let reg = HypertableRegistry::new();
1572        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1573
1574        let pts_a: Vec<(u64, f64)> = (0..20).map(|i| (i * 1_000, i as f64)).collect();
1575        let pts_b: Vec<(u64, f64)> = (100..120).map(|i| (i * 1_000, i as f64)).collect();
1576
1577        let id_a = seal_columnar_chunk_into(&reg, "m", 0, DAY_NS, &pts_a, 1);
1578        let id_b = seal_columnar_chunk_into(&reg, "m", DAY_NS, 2 * DAY_NS, &pts_b, 1);
1579
1580        // Simulate the "compute" phase: read source blocks without committing.
1581        // This mirrors what would happen if the process died between decode and
1582        // the atomic registry update — the registry never sees any write.
1583        {
1584            let guard = reg.inner.lock().unwrap();
1585            let block_a = guard
1586                .columnar_blocks
1587                .get(&("m".to_string(), 0))
1588                .expect("block_a must be present before any merge");
1589            let block_b = guard
1590                .columnar_blocks
1591                .get(&("m".to_string(), DAY_NS))
1592                .expect("block_b must be present before any merge");
1593            let pts_decoded_a = points_from_column_block(block_a).unwrap();
1594            let pts_decoded_b = points_from_column_block(block_b).unwrap();
1595            // "Compute" completed; simulate crash by simply not committing.
1596            assert_eq!(pts_decoded_a.len(), 20, "source A readable before merge");
1597            assert_eq!(pts_decoded_b.len(), 20, "source B readable before merge");
1598
1599            // Both source chunks are still in the registry — no partial state.
1600            assert!(
1601                guard.chunks.contains_key(&("m".to_string(), 0)),
1602                "source A must remain intact after torn merge"
1603            );
1604            assert!(
1605                guard.chunks.contains_key(&("m".to_string(), DAY_NS)),
1606                "source B must remain intact after torn merge"
1607            );
1608        }
1609
1610        // Now actually run compaction — verify it commits atomically.
1611        let merged_id = reg
1612            .compact_columnar_chunks("m", &[id_a, id_b], 0, 1, DEFAULT_GRANULE_SIZE)
1613            .expect("compaction after torn-merge simulation must succeed");
1614
1615        // After commit: merged chunk present, sources gone.
1616        let chunks = reg.show_chunks("m");
1617        assert_eq!(chunks.len(), 1, "only the merged chunk must remain");
1618        assert_eq!(chunks[0].id.start_ns, merged_id.start_ns);
1619
1620        // All data survives — no rows lost.
1621        let block = reg.columnar_block(&merged_id).unwrap();
1622        let pts = points_from_column_block(&block).unwrap();
1623        assert_eq!(pts.len(), 40, "all 40 points (20+20) must survive");
1624    }
1625
1626    /// Guard: `compact_columnar_chunks` returns `InsufficientSources` when
1627    /// called with fewer than 2 source chunks (a 1-to-1 "merge" is a no-op).
1628    #[test]
1629    fn compact_rejects_single_source() {
1630        let reg = HypertableRegistry::new();
1631        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1632        let id = seal_columnar_chunk_into(&reg, "m", 0, DAY_NS, &[(1_000, 1.0)], 1);
1633        let err = reg
1634            .compact_columnar_chunks("m", &[id], 0, 1, DEFAULT_GRANULE_SIZE)
1635            .unwrap_err();
1636        assert_eq!(err, CompactionError::InsufficientSources);
1637    }
1638
1639    /// Guard: `compact_columnar_chunks` rejects unsealed source chunks.
1640    #[test]
1641    fn compact_rejects_unsealed_chunk() {
1642        let reg = HypertableRegistry::new();
1643        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1644
1645        // Manually insert an unsealed (open) chunk meta.
1646        let open_id = ChunkId {
1647            hypertable: "m".to_string(),
1648            start_ns: 0,
1649        };
1650        reg.restore_chunk(ChunkMeta::new(open_id.clone(), DAY_NS));
1651
1652        let sealed_id =
1653            seal_columnar_chunk_into(&reg, "m", DAY_NS, 2 * DAY_NS, &[(DAY_NS + 1, 1.0)], 1);
1654
1655        let err = reg
1656            .compact_columnar_chunks("m", &[open_id, sealed_id], 0, 1, DEFAULT_GRANULE_SIZE)
1657            .unwrap_err();
1658        assert!(matches!(err, CompactionError::ChunkNotSealed(_)));
1659    }
1660
1661    /// Guard: `select_compaction_candidates` returns groups respecting the
1662    /// row budget and minimum-chunks threshold.
1663    #[test]
1664    fn select_candidates_respects_budget_and_threshold() {
1665        let reg = HypertableRegistry::new();
1666        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1667
1668        // Five chunks of 100 rows each.
1669        for i in 0..5u64 {
1670            let pts: Vec<(u64, f64)> = (0..100u64).map(|j| (i * DAY_NS + j, j as f64)).collect();
1671            seal_columnar_chunk_into(&reg, "m", i * DAY_NS, (i + 1) * DAY_NS, &pts, 1);
1672        }
1673
1674        // Budget 250 rows per group, min 2 chunks → 2 groups of 2-3 chunks.
1675        let groups = reg.select_compaction_candidates("m", 250, 2);
1676        assert!(
1677            !groups.is_empty(),
1678            "must find at least one compaction group"
1679        );
1680        for group in &groups {
1681            assert!(group.len() >= 2, "each group must have at least 2 chunks");
1682            // Each group's total rows must be within budget.
1683            let total: u64 = group
1684                .iter()
1685                .map(|id| {
1686                    reg.show_chunks("m")
1687                        .iter()
1688                        .find(|c| c.id.start_ns == id.start_ns)
1689                        .map(|c| c.row_count)
1690                        .unwrap_or(0)
1691                })
1692                .sum();
1693            assert!(
1694                total <= 250,
1695                "group total rows {total} must not exceed budget 250"
1696            );
1697        }
1698
1699        // Threshold of 10 chunks → no group qualifies.
1700        let groups_high = reg.select_compaction_candidates("m", 250, 10);
1701        assert!(
1702            groups_high.is_empty(),
1703            "threshold of 10 must yield no groups when max is 5 chunks"
1704        );
1705    }
1706}