Skip to main content

reddb_server/storage/timeseries/
hypertable.rs

1//! Hypertables — time-range-partitioned tables à la TimescaleDB.
2//!
3//! A hypertable is a logical collection that auto-partitions writes
4//! into child chunks, each covering a fixed time interval
5//! (`chunk_interval_ns`). Queries that filter by the time column
6//! see the partition pruner eliminate child chunks whose bounds
7//! fall outside the predicate; drops happen per-chunk so operators
8//! can retain the last N days without a full-table scan.
9//!
10//! This module defines the **metadata + router**. The physical
11//! chunk is the same [`super::chunk::TimeSeriesChunk`] the standalone
12//! time-series path already uses — the hypertable layer just tracks
13//! which chunk a row goes into.
14//!
15//! SQL surface (parsed elsewhere in the sprint):
16//!
17//! ```sql
18//! CREATE HYPERTABLE metrics (
19//!   ts    BIGINT,
20//!   host  TEXT,
21//!   value DOUBLE
22//! ) CHUNK INTERVAL '1 day';
23//!
24//! SELECT drop_chunks('metrics', INTERVAL '90 days');
25//! SELECT show_chunks('metrics');
26//! ```
27
28use std::collections::BTreeMap;
29use std::sync::{Arc, Mutex};
30
31use super::retention::parse_duration_ns;
32use crate::storage::engine::PageLocation;
33
34/// Spec declared by `CREATE HYPERTABLE`.
35#[derive(Debug, Clone)]
36pub struct HypertableSpec {
37    pub name: String,
38    /// Column name that carries the time axis (must be unix-ns BIGINT
39    /// or parseable to one).
40    pub time_column: String,
41    /// Fixed width of a single chunk, in nanoseconds.
42    pub chunk_interval_ns: u64,
43    /// Default TTL applied to every new chunk when the DDL didn't
44    /// request an explicit override. `None` means "no TTL — chunks
45    /// live until explicit `drop_chunks` / retention policy fires".
46    ///
47    /// The effective expiry of a chunk is `max_ts_ns + ttl_ns`, so
48    /// a chunk is safely droppable once `now_ns ≥ expiry`. That
49    /// matches the contract callers already learnt from the
50    /// retention daemon — partition TTL is the **declarative** way
51    /// to say the same thing at CREATE time without a separate
52    /// `add_retention_policy` call.
53    pub default_ttl_ns: Option<u64>,
54}
55
56impl HypertableSpec {
57    pub fn new(
58        name: impl Into<String>,
59        time_column: impl Into<String>,
60        chunk_interval_ns: u64,
61    ) -> Self {
62        Self {
63            name: name.into(),
64            time_column: time_column.into(),
65            chunk_interval_ns: chunk_interval_ns.max(1),
66            default_ttl_ns: None,
67        }
68    }
69
70    /// Convenience: construct from a Timescale-style duration string
71    /// (`"1d"`, `"1h"`, `"30m"`…).
72    pub fn from_interval_string(
73        name: impl Into<String>,
74        time_column: impl Into<String>,
75        interval: &str,
76    ) -> Option<Self> {
77        let ns = parse_duration_ns(interval)?;
78        if ns == 0 {
79            return None;
80        }
81        Some(Self::new(name, time_column, ns))
82    }
83
84    /// Builder-style: attach a default TTL. Uses the same duration
85    /// grammar as `chunk_interval` (`"90d"`, `"30s"`, …).
86    pub fn with_ttl(mut self, ttl: &str) -> Option<Self> {
87        let ns = parse_duration_ns(ttl)?;
88        if ns == 0 {
89            return None;
90        }
91        self.default_ttl_ns = Some(ns);
92        Some(self)
93    }
94
95    /// Direct setter when the TTL is already computed in ns.
96    pub fn with_ttl_ns(mut self, ttl_ns: u64) -> Self {
97        self.default_ttl_ns = if ttl_ns == 0 { None } else { Some(ttl_ns) };
98        self
99    }
100
101    /// Align `timestamp_ns` to the chunk's floor — the chunk that
102    /// row belongs to starts at this timestamp and covers
103    /// `[start, start + chunk_interval_ns)`.
104    pub fn chunk_start(&self, timestamp_ns: u64) -> u64 {
105        (timestamp_ns / self.chunk_interval_ns) * self.chunk_interval_ns
106    }
107
108    pub fn chunk_end_exclusive(&self, timestamp_ns: u64) -> u64 {
109        self.chunk_start(timestamp_ns)
110            .saturating_add(self.chunk_interval_ns)
111    }
112}
113
114/// Identifier of a single child chunk. Stable across restart so
115/// catalog + retention can reference it unambiguously.
116#[derive(Debug, Clone, PartialEq, Eq, Hash)]
117pub struct ChunkId {
118    pub hypertable: String,
119    /// Chunk start (inclusive), aligned to `chunk_interval_ns`.
120    pub start_ns: u64,
121}
122
123/// On-disk storage format of a chunk — the **read-bridge dispatch key**
124/// (PRD #850 Phase 1, #861). After `COLUMNAR` is enabled on a collection
125/// that already holds row data, pre-existing chunks stay `Row` and new
126/// chunks seal `ColumnarV1`; the two coexist in the same collection and a
127/// read dispatches on this discriminant — `Row` to the entity/row reader,
128/// `ColumnarV1` to the RDCC column-block reader — with no mass rewrite.
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum ChunkFormat {
131    /// Legacy row-stored chunk (`columnar_page == None`): it predates the
132    /// columnar seal, or sealed while the collection was non-columnar. Its
133    /// rows are served from the entity/row path.
134    Row,
135    /// Columnar `RDCC` chunk, format version 1 (`columnar_page == Some`).
136    /// Its rows decode from the recorded `ColumnBlock`.
137    ColumnarV1,
138}
139
140/// Metadata tracked per child chunk. Physical storage lives in
141/// `TimeSeriesChunk` keyed by `(hypertable, start_ns)`.
142#[derive(Debug, Clone)]
143pub struct ChunkMeta {
144    pub id: ChunkId,
145    pub end_ns_exclusive: u64,
146    pub row_count: u64,
147    pub min_ts_ns: u64,
148    pub max_ts_ns: u64,
149    pub sealed: bool,
150    /// Optional per-chunk TTL override. `None` means "fall back to
151    /// the hypertable's default TTL". Setting this lets mixed-TTL
152    /// policies live inside the same hypertable — e.g. keep the
153    /// current month of data forever but expire everything older
154    /// than 90 days.
155    pub ttl_override_ns: Option<u64>,
156    /// Columnar-vs-row **migration discriminant** (PRD #850, Phase 1).
157    /// `Some(loc)` → this chunk was sealed columnar; its `RDCC`
158    /// [`ColumnBlock`](crate::storage::engine::PageType::ColumnBlock) lives
159    /// at `loc` and reads decode the columnar form. `None` → a legacy
160    /// row-stored chunk served by the entity path (read-bridge lands in
161    /// #861). This MUST persist so pre-existing row-stored data is never
162    /// mis-read as columnar after a restart.
163    pub columnar_page: Option<PageLocation>,
164}
165
166impl ChunkMeta {
167    pub fn new(id: ChunkId, end_ns_exclusive: u64) -> Self {
168        Self {
169            id,
170            end_ns_exclusive,
171            row_count: 0,
172            min_ts_ns: u64::MAX,
173            max_ts_ns: 0,
174            sealed: false,
175            ttl_override_ns: None,
176            columnar_page: None,
177        }
178    }
179
180    /// The chunk's storage format — the read-bridge dispatch key (#861).
181    /// Derived from the migration discriminant `columnar_page`: a recorded
182    /// RDCC `ColumnBlock` location means [`ChunkFormat::ColumnarV1`], its
183    /// absence means the legacy [`ChunkFormat::Row`] form. This is the
184    /// format-version gate that lets old row chunks and new columnar chunks
185    /// coexist in one collection without a rewrite.
186    pub fn format(&self) -> ChunkFormat {
187        match self.columnar_page {
188            Some(_) => ChunkFormat::ColumnarV1,
189            None => ChunkFormat::Row,
190        }
191    }
192
193    /// True when this chunk is stored in the columnar `RDCC` form.
194    pub fn is_columnar(&self) -> bool {
195        matches!(self.format(), ChunkFormat::ColumnarV1)
196    }
197
198    pub fn observe(&mut self, ts_ns: u64) {
199        self.row_count += 1;
200        if ts_ns < self.min_ts_ns {
201            self.min_ts_ns = ts_ns;
202        }
203        if ts_ns > self.max_ts_ns {
204            self.max_ts_ns = ts_ns;
205        }
206    }
207
208    /// Effective TTL = per-chunk override if present, otherwise the
209    /// hypertable default. `None` = the chunk has no automatic
210    /// expiry.
211    pub fn effective_ttl_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
212        self.ttl_override_ns.or(default_ttl_ns)
213    }
214
215    /// Absolute epoch-ns at which the chunk becomes droppable. Uses
216    /// `max_ts_ns` as the baseline — the newest row the chunk has
217    /// ever accepted — so an empty chunk (no rows yet) never
218    /// expires until at least one row lands.
219    pub fn expiry_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
220        let ttl = self.effective_ttl_ns(default_ttl_ns)?;
221        if self.row_count == 0 {
222            return None;
223        }
224        Some(self.max_ts_ns.saturating_add(ttl))
225    }
226
227    pub fn is_expired_at(&self, now_ns: u64, default_ttl_ns: Option<u64>) -> bool {
228        match self.expiry_ns(default_ttl_ns) {
229            Some(expiry) => now_ns >= expiry,
230            None => false,
231        }
232    }
233}
234
235/// In-memory catalog of hypertables and their chunks. Thread-safe
236/// because INSERTs can land from multiple writers simultaneously.
237#[derive(Clone, Default)]
238pub struct HypertableRegistry {
239    inner: Arc<Mutex<RegistryInner>>,
240}
241
242#[derive(Default)]
243struct RegistryInner {
244    specs: BTreeMap<String, HypertableSpec>,
245    /// `(hypertable, start_ns)` → chunk meta. `BTreeMap` so lookups
246    /// by name produce an ordered view (show_chunks must be
247    /// deterministic).
248    chunks: BTreeMap<(String, u64), ChunkMeta>,
249    /// `(hypertable, start_ns)` → the sealed chunk's RDCC `ColumnBlock`
250    /// bytes, populated by [`seal_chunk_columnar`](HypertableRegistry::seal_chunk_columnar)
251    /// (PRD #850, #911). RAM-resident: the migration discriminant
252    /// `ChunkMeta.columnar_page` persists across restart, but durable
253    /// engine-page persistence of these bytes is a follow-up — until the
254    /// page-write bridge lands, a columnar chunk's block lives only here.
255    columnar_blocks: BTreeMap<(String, u64), Vec<u8>>,
256}
257
258impl std::fmt::Debug for HypertableRegistry {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        let guard = match self.inner.lock() {
261            Ok(g) => g,
262            Err(p) => p.into_inner(),
263        };
264        f.debug_struct("HypertableRegistry")
265            .field("hypertables", &guard.specs.len())
266            .field("chunks", &guard.chunks.len())
267            .finish()
268    }
269}
270
271impl HypertableRegistry {
272    pub fn new() -> Self {
273        Self::default()
274    }
275
276    /// Register a new hypertable. Replaces the previous spec if one
277    /// existed with the same name; chunks for that name are kept —
278    /// the operator is assumed to know what they're doing when they
279    /// redefine (e.g. widening `chunk_interval_ns`).
280    pub fn register(&self, spec: HypertableSpec) {
281        let mut guard = match self.inner.lock() {
282            Ok(g) => g,
283            Err(p) => p.into_inner(),
284        };
285        guard.specs.insert(spec.name.clone(), spec);
286    }
287
288    pub fn get(&self, name: &str) -> Option<HypertableSpec> {
289        let guard = match self.inner.lock() {
290            Ok(g) => g,
291            Err(p) => p.into_inner(),
292        };
293        guard.specs.get(name).cloned()
294    }
295
296    pub fn list(&self) -> Vec<HypertableSpec> {
297        let guard = match self.inner.lock() {
298            Ok(g) => g,
299            Err(p) => p.into_inner(),
300        };
301        guard.specs.values().cloned().collect()
302    }
303
304    /// Drop a hypertable from the registry. Returns the removed spec
305    /// when present, or `None` for unknown names. Callers drop the
306    /// backing collection separately — this is registry housekeeping
307    /// only.
308    pub fn unregister(&self, name: &str) -> Option<HypertableSpec> {
309        let mut guard = match self.inner.lock() {
310            Ok(g) => g,
311            Err(p) => p.into_inner(),
312        };
313        guard.specs.remove(name)
314    }
315
316    /// Route a write: returns the `ChunkId` the row belongs in,
317    /// allocating the chunk on first write. `None` when the
318    /// hypertable is unknown.
319    pub fn route(&self, hypertable: &str, timestamp_ns: u64) -> Option<ChunkId> {
320        let mut guard = match self.inner.lock() {
321            Ok(g) => g,
322            Err(p) => p.into_inner(),
323        };
324        let spec = guard.specs.get(hypertable)?.clone();
325        let start = spec.chunk_start(timestamp_ns);
326        let end = spec.chunk_end_exclusive(timestamp_ns);
327        let id = ChunkId {
328            hypertable: spec.name.clone(),
329            start_ns: start,
330        };
331        let key = (spec.name.clone(), start);
332        let meta = guard
333            .chunks
334            .entry(key)
335            .or_insert_with(|| ChunkMeta::new(id.clone(), end));
336        meta.observe(timestamp_ns);
337        Some(id)
338    }
339
340    /// Return every chunk for `hypertable`, oldest-first.
341    pub fn show_chunks(&self, hypertable: &str) -> Vec<ChunkMeta> {
342        let guard = match self.inner.lock() {
343            Ok(g) => g,
344            Err(p) => p.into_inner(),
345        };
346        guard
347            .chunks
348            .iter()
349            .filter(|((name, _), _)| name == hypertable)
350            .map(|(_, meta)| meta.clone())
351            .collect()
352    }
353
354    /// Drop every chunk of `hypertable` whose `max_ts_ns` is at or
355    /// below `cutoff_ns`. Returns the count dropped — the physical
356    /// storage release is the caller's responsibility (this module
357    /// only owns the metadata).
358    pub fn drop_chunks_before(&self, hypertable: &str, cutoff_ns: u64) -> Vec<ChunkMeta> {
359        let mut guard = match self.inner.lock() {
360            Ok(g) => g,
361            Err(p) => p.into_inner(),
362        };
363        let mut dropped = Vec::new();
364        let keys: Vec<(String, u64)> = guard
365            .chunks
366            .iter()
367            .filter(|((name, _), meta)| name == hypertable && meta.max_ts_ns <= cutoff_ns)
368            .map(|(k, _)| k.clone())
369            .collect();
370        for key in keys {
371            if let Some(meta) = guard.chunks.remove(&key) {
372                dropped.push(meta);
373            }
374        }
375        dropped
376    }
377
378    /// Sweep chunks whose effective TTL has fired. A chunk is
379    /// droppable when `now_ns ≥ max_ts_ns + effective_ttl_ns` — the
380    /// registry hands back every removed `ChunkMeta` so the
381    /// physical-storage callback can release bytes + indexes. Chunks
382    /// without an effective TTL (neither per-chunk override nor
383    /// hypertable default) are never touched.
384    ///
385    /// This is the "TTL applied at the partition level" primitive:
386    /// one O(1) metadata sweep reclaims every row of every expired
387    /// chunk, instead of scanning rows individually like an
388    /// entity-level TTL would. Empty hypertables stay empty.
389    pub fn sweep_expired(&self, hypertable: &str, now_ns: u64) -> Vec<ChunkMeta> {
390        let mut guard = match self.inner.lock() {
391            Ok(g) => g,
392            Err(p) => p.into_inner(),
393        };
394        let Some(spec) = guard.specs.get(hypertable).cloned() else {
395            return Vec::new();
396        };
397        let expired_keys: Vec<(String, u64)> = guard
398            .chunks
399            .iter()
400            .filter(|((name, _), meta)| {
401                name == hypertable && meta.is_expired_at(now_ns, spec.default_ttl_ns)
402            })
403            .map(|(k, _)| k.clone())
404            .collect();
405        let mut dropped = Vec::with_capacity(expired_keys.len());
406        for key in expired_keys {
407            if let Some(meta) = guard.chunks.remove(&key) {
408                dropped.push(meta);
409            }
410        }
411        dropped
412    }
413
414    /// Sweep every registered hypertable in one shot — the loop the
415    /// retention daemon runs every cycle. Returns a flat list of
416    /// `(hypertable_name, chunk_dropped)` pairs.
417    pub fn sweep_all_expired(&self, now_ns: u64) -> Vec<(String, ChunkMeta)> {
418        let names: Vec<String> = {
419            let guard = match self.inner.lock() {
420                Ok(g) => g,
421                Err(p) => p.into_inner(),
422            };
423            guard.specs.keys().cloned().collect()
424        };
425        let mut out = Vec::new();
426        for name in names {
427            for meta in self.sweep_expired(&name, now_ns) {
428                out.push((name.clone(), meta));
429            }
430        }
431        out
432    }
433
434    /// Install / replace the hypertable-wide default TTL. `None`
435    /// disables automatic expiry — chunks live until explicit
436    /// `drop_chunks` / per-chunk override fires.
437    pub fn set_default_ttl_ns(&self, hypertable: &str, ttl_ns: Option<u64>) -> bool {
438        let mut guard = match self.inner.lock() {
439            Ok(g) => g,
440            Err(p) => p.into_inner(),
441        };
442        match guard.specs.get_mut(hypertable) {
443            Some(spec) => {
444                spec.default_ttl_ns = match ttl_ns {
445                    Some(0) | None => None,
446                    Some(v) => Some(v),
447                };
448                true
449            }
450            None => false,
451        }
452    }
453
454    /// Override the TTL for a single chunk. Useful for "keep this
455    /// specific chunk longer because it contains an incident
456    /// replay" or "expire this one faster because it was filled
457    /// from a backfill we're about to redo". `None` removes the
458    /// override and falls back to the hypertable default.
459    pub fn set_chunk_ttl_ns(&self, id: &ChunkId, ttl_ns: Option<u64>) -> bool {
460        let mut guard = match self.inner.lock() {
461            Ok(g) => g,
462            Err(p) => p.into_inner(),
463        };
464        if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
465            meta.ttl_override_ns = ttl_ns;
466            true
467        } else {
468            false
469        }
470    }
471
472    /// Inspect the list of chunks that are *about* to expire within
473    /// `horizon_ns`. Powers preview endpoints ("what will the next
474    /// sweep drop?") without actually dropping anything.
475    pub fn chunks_expiring_within(
476        &self,
477        hypertable: &str,
478        now_ns: u64,
479        horizon_ns: u64,
480    ) -> Vec<ChunkMeta> {
481        let guard = match self.inner.lock() {
482            Ok(g) => g,
483            Err(p) => p.into_inner(),
484        };
485        let Some(spec) = guard.specs.get(hypertable).cloned() else {
486            return Vec::new();
487        };
488        let cutoff = now_ns.saturating_add(horizon_ns);
489        guard
490            .chunks
491            .iter()
492            .filter(|((name, _), _)| name == hypertable)
493            .filter_map(|(_, meta)| {
494                let expiry = meta.expiry_ns(spec.default_ttl_ns)?;
495                if expiry <= cutoff {
496                    Some(meta.clone())
497                } else {
498                    None
499                }
500            })
501            .collect()
502    }
503
504    /// Seal a chunk — future writes to the same `start_ns` bucket
505    /// will still land (open-ended), but the `sealed` flag signals
506    /// the maintenance layer that the chunk can now be compressed /
507    /// uploaded / migrated. Returns `true` if the chunk existed.
508    pub fn seal_chunk(&self, id: &ChunkId) -> bool {
509        let mut guard = match self.inner.lock() {
510            Ok(g) => g,
511            Err(p) => p.into_inner(),
512        };
513        if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
514            meta.sealed = true;
515            true
516        } else {
517            false
518        }
519    }
520
521    /// Seal a chunk **columnar** (PRD #850, #911): mark it sealed, record
522    /// where its RDCC `ColumnBlock` lives in `ChunkMeta.columnar_page`,
523    /// and stash the block `bytes` so the columnar read path can decode
524    /// them. The columnar counterpart of [`seal_chunk`](Self::seal_chunk)
525    /// — the production caller (`seal_chunk_with_config`'s columnar arm)
526    /// hands the sealed bytes here. Returns `true` if the chunk existed.
527    pub fn seal_chunk_columnar(&self, id: &ChunkId, page: PageLocation, bytes: Vec<u8>) -> bool {
528        let mut guard = match self.inner.lock() {
529            Ok(g) => g,
530            Err(p) => p.into_inner(),
531        };
532        let key = (id.hypertable.clone(), id.start_ns);
533        if let Some(meta) = guard.chunks.get_mut(&key) {
534            meta.sealed = true;
535            meta.columnar_page = Some(page);
536            guard.columnar_blocks.insert(key, bytes);
537            true
538        } else {
539            false
540        }
541    }
542
543    /// Fetch the RDCC `ColumnBlock` bytes recorded for a columnar-sealed
544    /// chunk by [`seal_chunk_columnar`](Self::seal_chunk_columnar). `None`
545    /// for row-sealed chunks or chunks whose bytes are not RAM-resident
546    /// (e.g. after a restart, pending the durable page-write bridge).
547    pub fn columnar_block(&self, id: &ChunkId) -> Option<Vec<u8>> {
548        let guard = match self.inner.lock() {
549            Ok(g) => g,
550            Err(p) => p.into_inner(),
551        };
552        guard
553            .columnar_blocks
554            .get(&(id.hypertable.clone(), id.start_ns))
555            .cloned()
556    }
557
558    /// Total row count across every chunk of `hypertable`. Used by
559    /// catalog views + benchmark harnesses.
560    pub fn total_rows(&self, hypertable: &str) -> u64 {
561        let guard = match self.inner.lock() {
562            Ok(g) => g,
563            Err(p) => p.into_inner(),
564        };
565        guard
566            .chunks
567            .iter()
568            .filter(|((name, _), _)| name == hypertable)
569            .map(|(_, meta)| meta.row_count)
570            .sum()
571    }
572
573    /// List the hypertables the retention daemon should sweep.
574    pub fn names(&self) -> Vec<String> {
575        let guard = match self.inner.lock() {
576            Ok(g) => g,
577            Err(p) => p.into_inner(),
578        };
579        guard.specs.keys().cloned().collect()
580    }
581
582    /// True when no hypertable is registered and no chunk is tracked.
583    /// Lets the durability layer skip the persist step entirely for
584    /// workloads that never declared a hypertable (zero overhead).
585    pub fn is_empty(&self) -> bool {
586        let guard = match self.inner.lock() {
587            Ok(g) => g,
588            Err(p) => p.into_inner(),
589        };
590        guard.specs.is_empty() && guard.chunks.is_empty()
591    }
592
593    /// Snapshot every chunk across all hypertables, ordered by
594    /// `(hypertable, start_ns)` so the persisted form is deterministic.
595    /// Pairs with [`restore_chunk`] on boot. Specs are snapshotted via
596    /// [`list`].
597    pub fn snapshot_chunks(&self) -> Vec<ChunkMeta> {
598        let guard = match self.inner.lock() {
599            Ok(g) => g,
600            Err(p) => p.into_inner(),
601        };
602        guard.chunks.values().cloned().collect()
603    }
604
605    /// Reinstate a chunk verbatim during recovery. Overwrites any
606    /// existing entry for the same `(hypertable, start_ns)`.
607    ///
608    /// Unlike [`route`], this does **not** observe a timestamp — it
609    /// restores the persisted counters (`row_count`, `min_ts_ns`,
610    /// `max_ts_ns`, `sealed`, `ttl_override_ns`) exactly so the
611    /// post-restart registry is identical to the pre-restart one. The
612    /// caller is expected to [`register`] the owning spec first; a
613    /// chunk whose hypertable has no spec is still tracked (routing
614    /// falls back to the spec once it is registered), matching the
615    /// pre-restart invariant that chunks outlive a missing spec only
616    /// transiently.
617    pub fn restore_chunk(&self, meta: ChunkMeta) {
618        let mut guard = match self.inner.lock() {
619            Ok(g) => g,
620            Err(p) => p.into_inner(),
621        };
622        let key = (meta.id.hypertable.clone(), meta.id.start_ns);
623        guard.chunks.insert(key, meta);
624    }
625
626    /// Drop the whole hypertable (spec + every chunk). Returns the
627    /// number of chunks removed.
628    pub fn drop_hypertable(&self, name: &str) -> usize {
629        let mut guard = match self.inner.lock() {
630            Ok(g) => g,
631            Err(p) => p.into_inner(),
632        };
633        guard.specs.remove(name);
634        let keys: Vec<(String, u64)> = guard
635            .chunks
636            .keys()
637            .filter(|(n, _)| n == name)
638            .cloned()
639            .collect();
640        for key in &keys {
641            guard.chunks.remove(key);
642        }
643        keys.len()
644    }
645}
646
647#[cfg(test)]
648mod tests {
649    use super::*;
650
651    const DAY_NS: u64 = 86_400_000_000_000;
652    const HOUR_NS: u64 = 3_600_000_000_000;
653
654    #[test]
655    fn chunk_start_aligns_to_interval_floor() {
656        let spec = HypertableSpec::new("m", "ts", DAY_NS);
657        assert_eq!(spec.chunk_start(0), 0);
658        assert_eq!(spec.chunk_start(DAY_NS - 1), 0);
659        assert_eq!(spec.chunk_start(DAY_NS), DAY_NS);
660        assert_eq!(spec.chunk_start(3 * DAY_NS + 123), 3 * DAY_NS);
661    }
662
663    #[test]
664    fn interval_string_accepts_duration_units() {
665        let s = HypertableSpec::from_interval_string("m", "ts", "1d").unwrap();
666        assert_eq!(s.chunk_interval_ns, DAY_NS);
667        let s = HypertableSpec::from_interval_string("m", "ts", "1h").unwrap();
668        assert_eq!(s.chunk_interval_ns, HOUR_NS);
669        assert!(HypertableSpec::from_interval_string("m", "ts", "raw").is_none());
670        assert!(HypertableSpec::from_interval_string("m", "ts", "garbage").is_none());
671    }
672
673    #[test]
674    fn route_allocates_chunk_on_first_write() {
675        let reg = HypertableRegistry::new();
676        reg.register(HypertableSpec::new("metrics", "ts", DAY_NS));
677        let id = reg.route("metrics", DAY_NS + 100).unwrap();
678        assert_eq!(id.hypertable, "metrics");
679        assert_eq!(id.start_ns, DAY_NS);
680        let chunks = reg.show_chunks("metrics");
681        assert_eq!(chunks.len(), 1);
682        assert_eq!(chunks[0].row_count, 1);
683        assert_eq!(chunks[0].min_ts_ns, DAY_NS + 100);
684        assert_eq!(chunks[0].max_ts_ns, DAY_NS + 100);
685        assert_eq!(chunks[0].end_ns_exclusive, 2 * DAY_NS);
686    }
687
688    #[test]
689    fn route_groups_writes_within_same_chunk() {
690        let reg = HypertableRegistry::new();
691        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
692        for offset in [10u64, 100, 1_000, DAY_NS - 1] {
693            let id = reg.route("m", offset).unwrap();
694            assert_eq!(id.start_ns, 0);
695        }
696        let chunks = reg.show_chunks("m");
697        assert_eq!(chunks.len(), 1);
698        assert_eq!(chunks[0].row_count, 4);
699    }
700
701    #[test]
702    fn route_splits_writes_across_adjacent_chunks() {
703        let reg = HypertableRegistry::new();
704        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
705        reg.route("m", DAY_NS - 1).unwrap();
706        reg.route("m", DAY_NS).unwrap();
707        reg.route("m", 2 * DAY_NS).unwrap();
708        let chunks = reg.show_chunks("m");
709        assert_eq!(chunks.len(), 3);
710        assert!(chunks[0].id.start_ns <= chunks[1].id.start_ns);
711        assert!(chunks[1].id.start_ns <= chunks[2].id.start_ns);
712    }
713
714    #[test]
715    fn route_returns_none_for_unknown_hypertable() {
716        let reg = HypertableRegistry::new();
717        assert!(reg.route("nope", 0).is_none());
718    }
719
720    #[test]
721    fn drop_chunks_before_removes_matching_chunks() {
722        let reg = HypertableRegistry::new();
723        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
724        reg.route("m", 0).unwrap(); // chunk start 0, max=0
725        reg.route("m", DAY_NS).unwrap(); // chunk start DAY_NS, max=DAY_NS
726        reg.route("m", 2 * DAY_NS + 5).unwrap(); // chunk start 2*DAY_NS
727
728        let dropped = reg.drop_chunks_before("m", DAY_NS);
729        // max_ts_ns of first chunk is 0, of second chunk is DAY_NS.
730        // cutoff = DAY_NS, so both are "<= cutoff" → both dropped.
731        assert_eq!(dropped.len(), 2);
732        let remaining = reg.show_chunks("m");
733        assert_eq!(remaining.len(), 1);
734        assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
735    }
736
737    #[test]
738    fn show_chunks_is_ordered_by_start() {
739        let reg = HypertableRegistry::new();
740        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
741        for ts in [5 * DAY_NS, 2 * DAY_NS, 7 * DAY_NS, 1 * DAY_NS] {
742            reg.route("m", ts).unwrap();
743        }
744        let starts: Vec<u64> = reg.show_chunks("m").iter().map(|c| c.id.start_ns).collect();
745        assert_eq!(starts, vec![DAY_NS, 2 * DAY_NS, 5 * DAY_NS, 7 * DAY_NS]);
746    }
747
748    #[test]
749    fn seal_chunk_flips_flag() {
750        let reg = HypertableRegistry::new();
751        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
752        let id = reg.route("m", 0).unwrap();
753        assert!(reg.seal_chunk(&id));
754        assert!(reg.show_chunks("m")[0].sealed);
755    }
756
757    #[test]
758    fn drop_hypertable_removes_everything() {
759        let reg = HypertableRegistry::new();
760        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
761        reg.route("m", 0).unwrap();
762        reg.route("m", DAY_NS).unwrap();
763        assert_eq!(reg.drop_hypertable("m"), 2);
764        assert!(reg.get("m").is_none());
765        assert!(reg.show_chunks("m").is_empty());
766    }
767
768    #[test]
769    fn total_rows_sums_every_chunk() {
770        let reg = HypertableRegistry::new();
771        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
772        for ts in 0..1000 {
773            reg.route("m", ts).unwrap();
774        }
775        for ts in DAY_NS..DAY_NS + 500 {
776            reg.route("m", ts).unwrap();
777        }
778        assert_eq!(reg.total_rows("m"), 1500);
779    }
780
781    #[test]
782    fn names_lists_registered_hypertables() {
783        let reg = HypertableRegistry::new();
784        reg.register(HypertableSpec::new("a", "ts", DAY_NS));
785        reg.register(HypertableSpec::new("b", "ts", HOUR_NS));
786        let mut names = reg.names();
787        names.sort();
788        assert_eq!(names, vec!["a", "b"]);
789    }
790
791    // -----------------------------------------------------------------
792    // Partition-level TTL
793    // -----------------------------------------------------------------
794
795    #[test]
796    fn with_ttl_parses_duration_and_sets_default() {
797        let s = HypertableSpec::new("m", "ts", DAY_NS)
798            .with_ttl("7d")
799            .unwrap();
800        assert_eq!(s.default_ttl_ns, Some(7 * DAY_NS));
801        assert!(HypertableSpec::new("m", "ts", DAY_NS)
802            .with_ttl("raw")
803            .is_none());
804        assert!(HypertableSpec::new("m", "ts", DAY_NS)
805            .with_ttl("garbage")
806            .is_none());
807    }
808
809    #[test]
810    fn chunk_with_no_rows_never_expires() {
811        let meta = ChunkMeta::new(
812            ChunkId {
813                hypertable: "m".into(),
814                start_ns: 0,
815            },
816            DAY_NS,
817        );
818        assert!(!meta.is_expired_at(u64::MAX, Some(1)));
819    }
820
821    #[test]
822    fn chunk_expires_when_now_crosses_max_ts_plus_ttl() {
823        let mut meta = ChunkMeta::new(
824            ChunkId {
825                hypertable: "m".into(),
826                start_ns: 0,
827            },
828            DAY_NS,
829        );
830        meta.observe(500);
831        // TTL = 1000, max_ts = 500 → expires at 1500.
832        assert!(!meta.is_expired_at(1000, Some(1000)));
833        assert!(!meta.is_expired_at(1499, Some(1000)));
834        assert!(meta.is_expired_at(1500, Some(1000)));
835    }
836
837    #[test]
838    fn per_chunk_override_wins_over_hypertable_default() {
839        let mut meta = ChunkMeta::new(
840            ChunkId {
841                hypertable: "m".into(),
842                start_ns: 0,
843            },
844            DAY_NS,
845        );
846        meta.observe(500);
847        // Default would say "expire at 500+1000 = 1500"; override
848        // narrows to 500+100 = 600.
849        meta.ttl_override_ns = Some(100);
850        assert!(meta.is_expired_at(600, Some(1000)));
851        assert!(!meta.is_expired_at(599, Some(1000)));
852    }
853
854    #[test]
855    fn sweep_expired_drops_chunks_past_ttl_and_returns_them() {
856        let reg = HypertableRegistry::new();
857        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(2 * DAY_NS));
858        // 3 chunks — at 0, DAY, 2*DAY.
859        for t in [0, DAY_NS, 2 * DAY_NS] {
860            reg.route("m", t).unwrap();
861        }
862        // now = 3 * DAY + 1 → chunks with max_ts in {0, DAY_NS} both
863        // expired (max + 2d ≤ now), chunk at 2*DAY_NS still alive.
864        let dropped = reg.sweep_expired("m", 3 * DAY_NS + 1);
865        let mut starts: Vec<u64> = dropped.iter().map(|m| m.id.start_ns).collect();
866        starts.sort();
867        assert_eq!(starts, vec![0, DAY_NS]);
868        let remaining = reg.show_chunks("m");
869        assert_eq!(remaining.len(), 1);
870        assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
871    }
872
873    #[test]
874    fn sweep_without_ttl_keeps_every_chunk() {
875        let reg = HypertableRegistry::new();
876        reg.register(HypertableSpec::new("m", "ts", DAY_NS)); // no TTL
877        for t in [0, DAY_NS, 2 * DAY_NS] {
878            reg.route("m", t).unwrap();
879        }
880        let dropped = reg.sweep_expired("m", 10_000 * DAY_NS);
881        assert!(dropped.is_empty());
882        assert_eq!(reg.show_chunks("m").len(), 3);
883    }
884
885    #[test]
886    fn sweep_all_expired_iterates_every_hypertable() {
887        let reg = HypertableRegistry::new();
888        reg.register(HypertableSpec::new("fast", "ts", HOUR_NS).with_ttl_ns(HOUR_NS));
889        reg.register(HypertableSpec::new("slow", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
890        // fast: chunk at 0 expires fast; slow: chunk at 0 still
891        // within its 7-day TTL.
892        reg.route("fast", 0).unwrap();
893        reg.route("slow", 0).unwrap();
894        let dropped = reg.sweep_all_expired(2 * HOUR_NS);
895        assert_eq!(dropped.len(), 1);
896        assert_eq!(dropped[0].0, "fast");
897        assert_eq!(reg.show_chunks("slow").len(), 1);
898    }
899
900    #[test]
901    fn set_chunk_ttl_ns_lets_caller_pin_or_shorten() {
902        let reg = HypertableRegistry::new();
903        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
904        let id = reg.route("m", 0).unwrap();
905        // Raise TTL to 100 days — chunk should survive the sweep.
906        assert!(reg.set_chunk_ttl_ns(&id, Some(100 * DAY_NS)));
907        let dropped = reg.sweep_expired("m", 10 * DAY_NS);
908        assert!(dropped.is_empty());
909        // Now shorten to 1 hour and sweep.
910        reg.set_chunk_ttl_ns(&id, Some(HOUR_NS));
911        let dropped = reg.sweep_expired("m", 10 * HOUR_NS);
912        assert_eq!(dropped.len(), 1);
913    }
914
915    #[test]
916    fn snapshot_then_restore_reproduces_registry_identically() {
917        // Pre-restart registry: two hypertables, several chunks, with a
918        // sealed chunk and a per-chunk TTL override — the bits that are
919        // NOT derivable from row data and so must round-trip verbatim.
920        let reg = HypertableRegistry::new();
921        reg.register(HypertableSpec::new("metrics", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
922        reg.register(HypertableSpec::new("events", "ts", HOUR_NS));
923        for t in [0, DAY_NS + 5, DAY_NS + 9, 2 * DAY_NS] {
924            reg.route("metrics", t).unwrap();
925        }
926        let id = reg.route("events", 0).unwrap();
927        reg.seal_chunk(&id);
928        reg.set_chunk_ttl_ns(&id, Some(3 * HOUR_NS));
929
930        let specs = reg.list();
931        let chunks = reg.snapshot_chunks();
932        assert!(!reg.is_empty());
933
934        // Simulated restart: rebuild a fresh registry from the snapshot.
935        let restored = HypertableRegistry::new();
936        assert!(restored.is_empty());
937        for spec in specs {
938            restored.register(spec);
939        }
940        for chunk in chunks {
941            restored.restore_chunk(chunk);
942        }
943
944        // Specs identical.
945        let before = reg.get("metrics").unwrap();
946        let after = restored.get("metrics").unwrap();
947        assert_eq!(after.chunk_interval_ns, before.chunk_interval_ns);
948        assert_eq!(after.time_column, before.time_column);
949        assert_eq!(after.default_ttl_ns, before.default_ttl_ns);
950
951        // Chunk metadata identical (bounds, counts, sealed, TTL override).
952        let m_before = reg.show_chunks("metrics");
953        let m_after = restored.show_chunks("metrics");
954        assert_eq!(m_after.len(), m_before.len());
955        for (a, b) in m_after.iter().zip(m_before.iter()) {
956            assert_eq!(a.id.start_ns, b.id.start_ns);
957            assert_eq!(a.end_ns_exclusive, b.end_ns_exclusive);
958            assert_eq!(a.row_count, b.row_count);
959            assert_eq!(a.min_ts_ns, b.min_ts_ns);
960            assert_eq!(a.max_ts_ns, b.max_ts_ns);
961        }
962        let e_after = restored.show_chunks("events");
963        assert_eq!(e_after.len(), 1);
964        assert!(e_after[0].sealed, "sealed flag must survive restore");
965        assert_eq!(e_after[0].ttl_override_ns, Some(3 * HOUR_NS));
966
967        // A post-restart write routes to the EXISTING chunk — no
968        // duplicate allocation.
969        let routed = restored.route("metrics", DAY_NS + 1).unwrap();
970        assert_eq!(routed.start_ns, DAY_NS);
971        assert_eq!(
972            restored.show_chunks("metrics").len(),
973            m_before.len(),
974            "write after restore must not allocate a new chunk"
975        );
976    }
977
978    // -----------------------------------------------------------------
979    // Columnar chunk eviction via the EXISTING TTL/drop path (issue #859)
980    //
981    // A sealed *columnar* chunk is one whose `ChunkMeta.columnar_page` is
982    // `Some(..)` — the RDCC `ColumnBlock` discriminant (PRD #850 Phase 1).
983    // These guards prove the retention path is storage-form agnostic: the
984    // SAME `sweep_expired` / `drop_chunks_before` metadata sweep that drops
985    // row chunks drops columnar chunks too, in O(1) metadata work (no
986    // per-row delete), and hands the `columnar_page` back on the dropped
987    // meta so the physical-storage callback can release the RDCC block.
988    // No separate columnar TTL/partition subsystem exists or is needed.
989    // -----------------------------------------------------------------
990
991    /// Build a sealed columnar chunk meta directly — mirrors what the
992    /// boot/seal path restores: a chunk carrying its RDCC `columnar_page`.
993    fn columnar_chunk(hypertable: &str, start_ns: u64, max_ts_ns: u64) -> ChunkMeta {
994        let mut meta = ChunkMeta::new(
995            ChunkId {
996                hypertable: hypertable.into(),
997                start_ns,
998            },
999            start_ns + DAY_NS,
1000        );
1001        meta.row_count = 1;
1002        meta.min_ts_ns = max_ts_ns;
1003        meta.max_ts_ns = max_ts_ns;
1004        meta.sealed = true;
1005        meta.columnar_page = Some(PageLocation::new(7, 0, 1234));
1006        meta
1007    }
1008
1009    #[test]
1010    fn columnar_chunk_evicts_via_sweep_expired_carrying_its_page() {
1011        let reg = HypertableRegistry::new();
1012        reg.register(HypertableSpec::new("metrics", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1013        // Inject a sealed columnar chunk (max_ts=0 → expiry at 1d).
1014        reg.restore_chunk(columnar_chunk("metrics", 0, 0));
1015        assert!(reg.show_chunks("metrics")[0].columnar_page.is_some());
1016
1017        // now = 3d → past the 1d TTL. The existing partition sweep drops it.
1018        let dropped = reg.sweep_expired("metrics", 3 * DAY_NS);
1019        assert_eq!(dropped.len(), 1, "columnar chunk must evict via TTL sweep");
1020        assert_eq!(
1021            dropped[0].columnar_page,
1022            Some(PageLocation::new(7, 0, 1234)),
1023            "dropped meta must carry columnar_page so physical release frees the RDCC block"
1024        );
1025        assert!(reg.show_chunks("metrics").is_empty());
1026    }
1027
1028    #[test]
1029    fn columnar_chunk_evicts_via_drop_chunks_before() {
1030        let reg = HypertableRegistry::new();
1031        reg.register(HypertableSpec::new("metrics", "ts", DAY_NS));
1032        reg.restore_chunk(columnar_chunk("metrics", 0, 0));
1033
1034        let dropped = reg.drop_chunks_before("metrics", DAY_NS);
1035        assert_eq!(dropped.len(), 1);
1036        assert!(
1037            dropped[0].columnar_page.is_some(),
1038            "drop_chunks_before is metadata-only and carries columnar_page through"
1039        );
1040        assert!(reg.show_chunks("metrics").is_empty());
1041    }
1042
1043    #[test]
1044    fn columnar_and_row_chunks_share_one_eviction_path() {
1045        // Regression guard (acceptance #3): a row chunk and a columnar
1046        // chunk with identical bounds + TTL must produce identical sweep
1047        // outcomes. If a separate columnar TTL subsystem were ever
1048        // introduced, the two would diverge here.
1049        let mk = |columnar: bool| {
1050            let reg = HypertableRegistry::new();
1051            reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1052            if columnar {
1053                reg.restore_chunk(columnar_chunk("m", 0, 0));
1054            } else {
1055                reg.route("m", 0).unwrap(); // row chunk, max_ts=0
1056            }
1057            reg.sweep_expired("m", 3 * DAY_NS).len()
1058        };
1059        assert_eq!(mk(false), 1, "row chunk evicts");
1060        assert_eq!(mk(true), 1, "columnar chunk evicts the same way");
1061    }
1062
1063    #[test]
1064    fn columnar_chunk_prunes_by_time_bounds_like_row_chunk() {
1065        // Acceptance #2: the partition pruner selects chunks by their
1066        // [start_ns, end_ns_exclusive) bounds (surfaced via show_chunks) —
1067        // it never inspects columnar_page, so a columnar chunk outside the
1068        // query window is eliminated identically to a row chunk. We assert
1069        // the bounds the pruner consumes survive verbatim for a columnar
1070        // chunk.
1071        let reg = HypertableRegistry::new();
1072        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1073        reg.restore_chunk(columnar_chunk("m", 0, 0));
1074        reg.restore_chunk(columnar_chunk("m", 2 * DAY_NS, 2 * DAY_NS));
1075        let chunks = reg.show_chunks("m");
1076        // A window of [2d, 3d) overlaps only the second chunk — same
1077        // arithmetic the RangeChild pruner runs against these bounds.
1078        let lo = 2 * DAY_NS;
1079        let hi = 3 * DAY_NS;
1080        let overlapping: Vec<u64> = chunks
1081            .iter()
1082            .filter(|c| c.id.start_ns < hi && c.end_ns_exclusive > lo)
1083            .map(|c| c.id.start_ns)
1084            .collect();
1085        assert_eq!(
1086            overlapping,
1087            vec![2 * DAY_NS],
1088            "only in-window columnar chunk kept"
1089        );
1090        assert!(chunks.iter().all(|c| c.columnar_page.is_some()));
1091    }
1092
1093    /// Read-bridge dispatch key (#861): `ChunkMeta::format()` classifies a
1094    /// chunk purely from the `columnar_page` migration discriminant, so a
1095    /// pre-existing row chunk and a newly columnar-sealed chunk in the same
1096    /// registry are disambiguated by format version — the gate the read
1097    /// path dispatches on.
1098    #[test]
1099    fn chunk_format_dispatches_on_columnar_page_discriminant() {
1100        let reg = HypertableRegistry::new();
1101        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
1102        // A row chunk (allocated by a write) and a columnar chunk coexist.
1103        reg.route("m", 0).unwrap();
1104        reg.restore_chunk(columnar_chunk("m", DAY_NS, DAY_NS));
1105
1106        let chunks = reg.show_chunks("m");
1107        let row = chunks.iter().find(|c| c.id.start_ns == 0).unwrap();
1108        let col = chunks.iter().find(|c| c.id.start_ns == DAY_NS).unwrap();
1109
1110        assert_eq!(row.format(), ChunkFormat::Row);
1111        assert!(!row.is_columnar());
1112        assert_eq!(col.format(), ChunkFormat::ColumnarV1);
1113        assert!(col.is_columnar());
1114    }
1115
1116    #[test]
1117    fn chunks_expiring_within_previews_without_dropping() {
1118        let reg = HypertableRegistry::new();
1119        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
1120        // Chunks with max_ts at 0, 1d, 2d → expiries at 1d, 2d, 3d.
1121        for t in [0, DAY_NS, 2 * DAY_NS] {
1122            reg.route("m", t).unwrap();
1123        }
1124        // now=0, horizon=1.5d → only the first chunk (expiry 1d)
1125        // fits. Tight horizon proves the cutoff math.
1126        let preview = reg.chunks_expiring_within("m", 0, DAY_NS + DAY_NS / 2);
1127        assert_eq!(preview.len(), 1);
1128        assert_eq!(preview[0].id.start_ns, 0);
1129        // Wider horizon pulls in the second chunk too.
1130        let preview2 = reg.chunks_expiring_within("m", 0, 2 * DAY_NS);
1131        assert_eq!(preview2.len(), 2);
1132        // Registry still has every chunk — preview never drops.
1133        assert_eq!(reg.show_chunks("m").len(), 3);
1134    }
1135}