Skip to main content

reddb_server/storage/timeseries/
hypertable.rs

1//! Hypertables — time-range-partitioned tables à la TimescaleDB.
2//!
3//! A hypertable is a logical collection that auto-partitions writes
4//! into child chunks, each covering a fixed time interval
5//! (`chunk_interval_ns`). Queries that filter by the time column
6//! see the partition pruner eliminate child chunks whose bounds
7//! fall outside the predicate; drops happen per-chunk so operators
8//! can retain the last N days without a full-table scan.
9//!
10//! This module defines the **metadata + router**. The physical
11//! chunk is the same [`super::chunk::TimeSeriesChunk`] the standalone
12//! time-series path already uses — the hypertable layer just tracks
13//! which chunk a row goes into.
14//!
15//! SQL surface (parsed elsewhere in the sprint):
16//!
17//! ```sql
18//! CREATE HYPERTABLE metrics (
19//!   ts    BIGINT,
20//!   host  TEXT,
21//!   value DOUBLE
22//! ) CHUNK INTERVAL '1 day';
23//!
24//! SELECT drop_chunks('metrics', INTERVAL '90 days');
25//! SELECT show_chunks('metrics');
26//! ```
27
28use std::collections::BTreeMap;
29use std::sync::{Arc, Mutex};
30
31use super::retention::parse_duration_ns;
32
33/// Spec declared by `CREATE HYPERTABLE`.
34#[derive(Debug, Clone)]
35pub struct HypertableSpec {
36    pub name: String,
37    /// Column name that carries the time axis (must be unix-ns BIGINT
38    /// or parseable to one).
39    pub time_column: String,
40    /// Fixed width of a single chunk, in nanoseconds.
41    pub chunk_interval_ns: u64,
42    /// Default TTL applied to every new chunk when the DDL didn't
43    /// request an explicit override. `None` means "no TTL — chunks
44    /// live until explicit `drop_chunks` / retention policy fires".
45    ///
46    /// The effective expiry of a chunk is `max_ts_ns + ttl_ns`, so
47    /// a chunk is safely droppable once `now_ns ≥ expiry`. That
48    /// matches the contract callers already learnt from the
49    /// retention daemon — partition TTL is the **declarative** way
50    /// to say the same thing at CREATE time without a separate
51    /// `add_retention_policy` call.
52    pub default_ttl_ns: Option<u64>,
53}
54
55impl HypertableSpec {
56    pub fn new(
57        name: impl Into<String>,
58        time_column: impl Into<String>,
59        chunk_interval_ns: u64,
60    ) -> Self {
61        Self {
62            name: name.into(),
63            time_column: time_column.into(),
64            chunk_interval_ns: chunk_interval_ns.max(1),
65            default_ttl_ns: None,
66        }
67    }
68
69    /// Convenience: construct from a Timescale-style duration string
70    /// (`"1d"`, `"1h"`, `"30m"`…).
71    pub fn from_interval_string(
72        name: impl Into<String>,
73        time_column: impl Into<String>,
74        interval: &str,
75    ) -> Option<Self> {
76        let ns = parse_duration_ns(interval)?;
77        if ns == 0 {
78            return None;
79        }
80        Some(Self::new(name, time_column, ns))
81    }
82
83    /// Builder-style: attach a default TTL. Uses the same duration
84    /// grammar as `chunk_interval` (`"90d"`, `"30s"`, …).
85    pub fn with_ttl(mut self, ttl: &str) -> Option<Self> {
86        let ns = parse_duration_ns(ttl)?;
87        if ns == 0 {
88            return None;
89        }
90        self.default_ttl_ns = Some(ns);
91        Some(self)
92    }
93
94    /// Direct setter when the TTL is already computed in ns.
95    pub fn with_ttl_ns(mut self, ttl_ns: u64) -> Self {
96        self.default_ttl_ns = if ttl_ns == 0 { None } else { Some(ttl_ns) };
97        self
98    }
99
100    /// Align `timestamp_ns` to the chunk's floor — the chunk that
101    /// row belongs to starts at this timestamp and covers
102    /// `[start, start + chunk_interval_ns)`.
103    pub fn chunk_start(&self, timestamp_ns: u64) -> u64 {
104        (timestamp_ns / self.chunk_interval_ns) * self.chunk_interval_ns
105    }
106
107    pub fn chunk_end_exclusive(&self, timestamp_ns: u64) -> u64 {
108        self.chunk_start(timestamp_ns)
109            .saturating_add(self.chunk_interval_ns)
110    }
111}
112
113/// Identifier of a single child chunk. Stable across restart so
114/// catalog + retention can reference it unambiguously.
115#[derive(Debug, Clone, PartialEq, Eq, Hash)]
116pub struct ChunkId {
117    pub hypertable: String,
118    /// Chunk start (inclusive), aligned to `chunk_interval_ns`.
119    pub start_ns: u64,
120}
121
122/// Metadata tracked per child chunk. Physical storage lives in
123/// `TimeSeriesChunk` keyed by `(hypertable, start_ns)`.
124#[derive(Debug, Clone)]
125pub struct ChunkMeta {
126    pub id: ChunkId,
127    pub end_ns_exclusive: u64,
128    pub row_count: u64,
129    pub min_ts_ns: u64,
130    pub max_ts_ns: u64,
131    pub sealed: bool,
132    /// Optional per-chunk TTL override. `None` means "fall back to
133    /// the hypertable's default TTL". Setting this lets mixed-TTL
134    /// policies live inside the same hypertable — e.g. keep the
135    /// current month of data forever but expire everything older
136    /// than 90 days.
137    pub ttl_override_ns: Option<u64>,
138}
139
140impl ChunkMeta {
141    pub fn new(id: ChunkId, end_ns_exclusive: u64) -> Self {
142        Self {
143            id,
144            end_ns_exclusive,
145            row_count: 0,
146            min_ts_ns: u64::MAX,
147            max_ts_ns: 0,
148            sealed: false,
149            ttl_override_ns: None,
150        }
151    }
152
153    pub fn observe(&mut self, ts_ns: u64) {
154        self.row_count += 1;
155        if ts_ns < self.min_ts_ns {
156            self.min_ts_ns = ts_ns;
157        }
158        if ts_ns > self.max_ts_ns {
159            self.max_ts_ns = ts_ns;
160        }
161    }
162
163    /// Effective TTL = per-chunk override if present, otherwise the
164    /// hypertable default. `None` = the chunk has no automatic
165    /// expiry.
166    pub fn effective_ttl_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
167        self.ttl_override_ns.or(default_ttl_ns)
168    }
169
170    /// Absolute epoch-ns at which the chunk becomes droppable. Uses
171    /// `max_ts_ns` as the baseline — the newest row the chunk has
172    /// ever accepted — so an empty chunk (no rows yet) never
173    /// expires until at least one row lands.
174    pub fn expiry_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
175        let ttl = self.effective_ttl_ns(default_ttl_ns)?;
176        if self.row_count == 0 {
177            return None;
178        }
179        Some(self.max_ts_ns.saturating_add(ttl))
180    }
181
182    pub fn is_expired_at(&self, now_ns: u64, default_ttl_ns: Option<u64>) -> bool {
183        match self.expiry_ns(default_ttl_ns) {
184            Some(expiry) => now_ns >= expiry,
185            None => false,
186        }
187    }
188}
189
190/// In-memory catalog of hypertables and their chunks. Thread-safe
191/// because INSERTs can land from multiple writers simultaneously.
192#[derive(Clone, Default)]
193pub struct HypertableRegistry {
194    inner: Arc<Mutex<RegistryInner>>,
195}
196
197#[derive(Default)]
198struct RegistryInner {
199    specs: BTreeMap<String, HypertableSpec>,
200    /// `(hypertable, start_ns)` → chunk meta. `BTreeMap` so lookups
201    /// by name produce an ordered view (show_chunks must be
202    /// deterministic).
203    chunks: BTreeMap<(String, u64), ChunkMeta>,
204}
205
206impl std::fmt::Debug for HypertableRegistry {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        let guard = match self.inner.lock() {
209            Ok(g) => g,
210            Err(p) => p.into_inner(),
211        };
212        f.debug_struct("HypertableRegistry")
213            .field("hypertables", &guard.specs.len())
214            .field("chunks", &guard.chunks.len())
215            .finish()
216    }
217}
218
219impl HypertableRegistry {
220    pub fn new() -> Self {
221        Self::default()
222    }
223
224    /// Register a new hypertable. Replaces the previous spec if one
225    /// existed with the same name; chunks for that name are kept —
226    /// the operator is assumed to know what they're doing when they
227    /// redefine (e.g. widening `chunk_interval_ns`).
228    pub fn register(&self, spec: HypertableSpec) {
229        let mut guard = match self.inner.lock() {
230            Ok(g) => g,
231            Err(p) => p.into_inner(),
232        };
233        guard.specs.insert(spec.name.clone(), spec);
234    }
235
236    pub fn get(&self, name: &str) -> Option<HypertableSpec> {
237        let guard = match self.inner.lock() {
238            Ok(g) => g,
239            Err(p) => p.into_inner(),
240        };
241        guard.specs.get(name).cloned()
242    }
243
244    pub fn list(&self) -> Vec<HypertableSpec> {
245        let guard = match self.inner.lock() {
246            Ok(g) => g,
247            Err(p) => p.into_inner(),
248        };
249        guard.specs.values().cloned().collect()
250    }
251
252    /// Drop a hypertable from the registry. Returns the removed spec
253    /// when present, or `None` for unknown names. Callers drop the
254    /// backing collection separately — this is registry housekeeping
255    /// only.
256    pub fn unregister(&self, name: &str) -> Option<HypertableSpec> {
257        let mut guard = match self.inner.lock() {
258            Ok(g) => g,
259            Err(p) => p.into_inner(),
260        };
261        guard.specs.remove(name)
262    }
263
264    /// Route a write: returns the `ChunkId` the row belongs in,
265    /// allocating the chunk on first write. `None` when the
266    /// hypertable is unknown.
267    pub fn route(&self, hypertable: &str, timestamp_ns: u64) -> Option<ChunkId> {
268        let mut guard = match self.inner.lock() {
269            Ok(g) => g,
270            Err(p) => p.into_inner(),
271        };
272        let spec = guard.specs.get(hypertable)?.clone();
273        let start = spec.chunk_start(timestamp_ns);
274        let end = spec.chunk_end_exclusive(timestamp_ns);
275        let id = ChunkId {
276            hypertable: spec.name.clone(),
277            start_ns: start,
278        };
279        let key = (spec.name.clone(), start);
280        let meta = guard
281            .chunks
282            .entry(key)
283            .or_insert_with(|| ChunkMeta::new(id.clone(), end));
284        meta.observe(timestamp_ns);
285        Some(id)
286    }
287
288    /// Return every chunk for `hypertable`, oldest-first.
289    pub fn show_chunks(&self, hypertable: &str) -> Vec<ChunkMeta> {
290        let guard = match self.inner.lock() {
291            Ok(g) => g,
292            Err(p) => p.into_inner(),
293        };
294        guard
295            .chunks
296            .iter()
297            .filter(|((name, _), _)| name == hypertable)
298            .map(|(_, meta)| meta.clone())
299            .collect()
300    }
301
302    /// Drop every chunk of `hypertable` whose `max_ts_ns` is at or
303    /// below `cutoff_ns`. Returns the count dropped — the physical
304    /// storage release is the caller's responsibility (this module
305    /// only owns the metadata).
306    pub fn drop_chunks_before(&self, hypertable: &str, cutoff_ns: u64) -> Vec<ChunkMeta> {
307        let mut guard = match self.inner.lock() {
308            Ok(g) => g,
309            Err(p) => p.into_inner(),
310        };
311        let mut dropped = Vec::new();
312        let keys: Vec<(String, u64)> = guard
313            .chunks
314            .iter()
315            .filter(|((name, _), meta)| name == hypertable && meta.max_ts_ns <= cutoff_ns)
316            .map(|(k, _)| k.clone())
317            .collect();
318        for key in keys {
319            if let Some(meta) = guard.chunks.remove(&key) {
320                dropped.push(meta);
321            }
322        }
323        dropped
324    }
325
326    /// Sweep chunks whose effective TTL has fired. A chunk is
327    /// droppable when `now_ns ≥ max_ts_ns + effective_ttl_ns` — the
328    /// registry hands back every removed `ChunkMeta` so the
329    /// physical-storage callback can release bytes + indexes. Chunks
330    /// without an effective TTL (neither per-chunk override nor
331    /// hypertable default) are never touched.
332    ///
333    /// This is the "TTL applied at the partition level" primitive:
334    /// one O(1) metadata sweep reclaims every row of every expired
335    /// chunk, instead of scanning rows individually like an
336    /// entity-level TTL would. Empty hypertables stay empty.
337    pub fn sweep_expired(&self, hypertable: &str, now_ns: u64) -> Vec<ChunkMeta> {
338        let mut guard = match self.inner.lock() {
339            Ok(g) => g,
340            Err(p) => p.into_inner(),
341        };
342        let Some(spec) = guard.specs.get(hypertable).cloned() else {
343            return Vec::new();
344        };
345        let expired_keys: Vec<(String, u64)> = guard
346            .chunks
347            .iter()
348            .filter(|((name, _), meta)| {
349                name == hypertable && meta.is_expired_at(now_ns, spec.default_ttl_ns)
350            })
351            .map(|(k, _)| k.clone())
352            .collect();
353        let mut dropped = Vec::with_capacity(expired_keys.len());
354        for key in expired_keys {
355            if let Some(meta) = guard.chunks.remove(&key) {
356                dropped.push(meta);
357            }
358        }
359        dropped
360    }
361
362    /// Sweep every registered hypertable in one shot — the loop the
363    /// retention daemon runs every cycle. Returns a flat list of
364    /// `(hypertable_name, chunk_dropped)` pairs.
365    pub fn sweep_all_expired(&self, now_ns: u64) -> Vec<(String, ChunkMeta)> {
366        let names: Vec<String> = {
367            let guard = match self.inner.lock() {
368                Ok(g) => g,
369                Err(p) => p.into_inner(),
370            };
371            guard.specs.keys().cloned().collect()
372        };
373        let mut out = Vec::new();
374        for name in names {
375            for meta in self.sweep_expired(&name, now_ns) {
376                out.push((name.clone(), meta));
377            }
378        }
379        out
380    }
381
382    /// Install / replace the hypertable-wide default TTL. `None`
383    /// disables automatic expiry — chunks live until explicit
384    /// `drop_chunks` / per-chunk override fires.
385    pub fn set_default_ttl_ns(&self, hypertable: &str, ttl_ns: Option<u64>) -> bool {
386        let mut guard = match self.inner.lock() {
387            Ok(g) => g,
388            Err(p) => p.into_inner(),
389        };
390        match guard.specs.get_mut(hypertable) {
391            Some(spec) => {
392                spec.default_ttl_ns = match ttl_ns {
393                    Some(0) | None => None,
394                    Some(v) => Some(v),
395                };
396                true
397            }
398            None => false,
399        }
400    }
401
402    /// Override the TTL for a single chunk. Useful for "keep this
403    /// specific chunk longer because it contains an incident
404    /// replay" or "expire this one faster because it was filled
405    /// from a backfill we're about to redo". `None` removes the
406    /// override and falls back to the hypertable default.
407    pub fn set_chunk_ttl_ns(&self, id: &ChunkId, ttl_ns: Option<u64>) -> bool {
408        let mut guard = match self.inner.lock() {
409            Ok(g) => g,
410            Err(p) => p.into_inner(),
411        };
412        if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
413            meta.ttl_override_ns = ttl_ns;
414            true
415        } else {
416            false
417        }
418    }
419
420    /// Inspect the list of chunks that are *about* to expire within
421    /// `horizon_ns`. Powers preview endpoints ("what will the next
422    /// sweep drop?") without actually dropping anything.
423    pub fn chunks_expiring_within(
424        &self,
425        hypertable: &str,
426        now_ns: u64,
427        horizon_ns: u64,
428    ) -> Vec<ChunkMeta> {
429        let guard = match self.inner.lock() {
430            Ok(g) => g,
431            Err(p) => p.into_inner(),
432        };
433        let Some(spec) = guard.specs.get(hypertable).cloned() else {
434            return Vec::new();
435        };
436        let cutoff = now_ns.saturating_add(horizon_ns);
437        guard
438            .chunks
439            .iter()
440            .filter(|((name, _), _)| name == hypertable)
441            .filter_map(|(_, meta)| {
442                let expiry = meta.expiry_ns(spec.default_ttl_ns)?;
443                if expiry <= cutoff {
444                    Some(meta.clone())
445                } else {
446                    None
447                }
448            })
449            .collect()
450    }
451
452    /// Seal a chunk — future writes to the same `start_ns` bucket
453    /// will still land (open-ended), but the `sealed` flag signals
454    /// the maintenance layer that the chunk can now be compressed /
455    /// uploaded / migrated. Returns `true` if the chunk existed.
456    pub fn seal_chunk(&self, id: &ChunkId) -> bool {
457        let mut guard = match self.inner.lock() {
458            Ok(g) => g,
459            Err(p) => p.into_inner(),
460        };
461        if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
462            meta.sealed = true;
463            true
464        } else {
465            false
466        }
467    }
468
469    /// Total row count across every chunk of `hypertable`. Used by
470    /// catalog views + benchmark harnesses.
471    pub fn total_rows(&self, hypertable: &str) -> u64 {
472        let guard = match self.inner.lock() {
473            Ok(g) => g,
474            Err(p) => p.into_inner(),
475        };
476        guard
477            .chunks
478            .iter()
479            .filter(|((name, _), _)| name == hypertable)
480            .map(|(_, meta)| meta.row_count)
481            .sum()
482    }
483
484    /// List the hypertables the retention daemon should sweep.
485    pub fn names(&self) -> Vec<String> {
486        let guard = match self.inner.lock() {
487            Ok(g) => g,
488            Err(p) => p.into_inner(),
489        };
490        guard.specs.keys().cloned().collect()
491    }
492
493    /// True when no hypertable is registered and no chunk is tracked.
494    /// Lets the durability layer skip the persist step entirely for
495    /// workloads that never declared a hypertable (zero overhead).
496    pub fn is_empty(&self) -> bool {
497        let guard = match self.inner.lock() {
498            Ok(g) => g,
499            Err(p) => p.into_inner(),
500        };
501        guard.specs.is_empty() && guard.chunks.is_empty()
502    }
503
504    /// Snapshot every chunk across all hypertables, ordered by
505    /// `(hypertable, start_ns)` so the persisted form is deterministic.
506    /// Pairs with [`restore_chunk`] on boot. Specs are snapshotted via
507    /// [`list`].
508    pub fn snapshot_chunks(&self) -> Vec<ChunkMeta> {
509        let guard = match self.inner.lock() {
510            Ok(g) => g,
511            Err(p) => p.into_inner(),
512        };
513        guard.chunks.values().cloned().collect()
514    }
515
516    /// Reinstate a chunk verbatim during recovery. Overwrites any
517    /// existing entry for the same `(hypertable, start_ns)`.
518    ///
519    /// Unlike [`route`], this does **not** observe a timestamp — it
520    /// restores the persisted counters (`row_count`, `min_ts_ns`,
521    /// `max_ts_ns`, `sealed`, `ttl_override_ns`) exactly so the
522    /// post-restart registry is identical to the pre-restart one. The
523    /// caller is expected to [`register`] the owning spec first; a
524    /// chunk whose hypertable has no spec is still tracked (routing
525    /// falls back to the spec once it is registered), matching the
526    /// pre-restart invariant that chunks outlive a missing spec only
527    /// transiently.
528    pub fn restore_chunk(&self, meta: ChunkMeta) {
529        let mut guard = match self.inner.lock() {
530            Ok(g) => g,
531            Err(p) => p.into_inner(),
532        };
533        let key = (meta.id.hypertable.clone(), meta.id.start_ns);
534        guard.chunks.insert(key, meta);
535    }
536
537    /// Drop the whole hypertable (spec + every chunk). Returns the
538    /// number of chunks removed.
539    pub fn drop_hypertable(&self, name: &str) -> usize {
540        let mut guard = match self.inner.lock() {
541            Ok(g) => g,
542            Err(p) => p.into_inner(),
543        };
544        guard.specs.remove(name);
545        let keys: Vec<(String, u64)> = guard
546            .chunks
547            .keys()
548            .filter(|(n, _)| n == name)
549            .cloned()
550            .collect();
551        for key in &keys {
552            guard.chunks.remove(key);
553        }
554        keys.len()
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561
562    const DAY_NS: u64 = 86_400_000_000_000;
563    const HOUR_NS: u64 = 3_600_000_000_000;
564
565    #[test]
566    fn chunk_start_aligns_to_interval_floor() {
567        let spec = HypertableSpec::new("m", "ts", DAY_NS);
568        assert_eq!(spec.chunk_start(0), 0);
569        assert_eq!(spec.chunk_start(DAY_NS - 1), 0);
570        assert_eq!(spec.chunk_start(DAY_NS), DAY_NS);
571        assert_eq!(spec.chunk_start(3 * DAY_NS + 123), 3 * DAY_NS);
572    }
573
574    #[test]
575    fn interval_string_accepts_duration_units() {
576        let s = HypertableSpec::from_interval_string("m", "ts", "1d").unwrap();
577        assert_eq!(s.chunk_interval_ns, DAY_NS);
578        let s = HypertableSpec::from_interval_string("m", "ts", "1h").unwrap();
579        assert_eq!(s.chunk_interval_ns, HOUR_NS);
580        assert!(HypertableSpec::from_interval_string("m", "ts", "raw").is_none());
581        assert!(HypertableSpec::from_interval_string("m", "ts", "garbage").is_none());
582    }
583
584    #[test]
585    fn route_allocates_chunk_on_first_write() {
586        let reg = HypertableRegistry::new();
587        reg.register(HypertableSpec::new("metrics", "ts", DAY_NS));
588        let id = reg.route("metrics", DAY_NS + 100).unwrap();
589        assert_eq!(id.hypertable, "metrics");
590        assert_eq!(id.start_ns, DAY_NS);
591        let chunks = reg.show_chunks("metrics");
592        assert_eq!(chunks.len(), 1);
593        assert_eq!(chunks[0].row_count, 1);
594        assert_eq!(chunks[0].min_ts_ns, DAY_NS + 100);
595        assert_eq!(chunks[0].max_ts_ns, DAY_NS + 100);
596        assert_eq!(chunks[0].end_ns_exclusive, 2 * DAY_NS);
597    }
598
599    #[test]
600    fn route_groups_writes_within_same_chunk() {
601        let reg = HypertableRegistry::new();
602        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
603        for offset in [10u64, 100, 1_000, DAY_NS - 1] {
604            let id = reg.route("m", offset).unwrap();
605            assert_eq!(id.start_ns, 0);
606        }
607        let chunks = reg.show_chunks("m");
608        assert_eq!(chunks.len(), 1);
609        assert_eq!(chunks[0].row_count, 4);
610    }
611
612    #[test]
613    fn route_splits_writes_across_adjacent_chunks() {
614        let reg = HypertableRegistry::new();
615        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
616        reg.route("m", DAY_NS - 1).unwrap();
617        reg.route("m", DAY_NS).unwrap();
618        reg.route("m", 2 * DAY_NS).unwrap();
619        let chunks = reg.show_chunks("m");
620        assert_eq!(chunks.len(), 3);
621        assert!(chunks[0].id.start_ns <= chunks[1].id.start_ns);
622        assert!(chunks[1].id.start_ns <= chunks[2].id.start_ns);
623    }
624
625    #[test]
626    fn route_returns_none_for_unknown_hypertable() {
627        let reg = HypertableRegistry::new();
628        assert!(reg.route("nope", 0).is_none());
629    }
630
631    #[test]
632    fn drop_chunks_before_removes_matching_chunks() {
633        let reg = HypertableRegistry::new();
634        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
635        reg.route("m", 0).unwrap(); // chunk start 0, max=0
636        reg.route("m", DAY_NS).unwrap(); // chunk start DAY_NS, max=DAY_NS
637        reg.route("m", 2 * DAY_NS + 5).unwrap(); // chunk start 2*DAY_NS
638
639        let dropped = reg.drop_chunks_before("m", DAY_NS);
640        // max_ts_ns of first chunk is 0, of second chunk is DAY_NS.
641        // cutoff = DAY_NS, so both are "<= cutoff" → both dropped.
642        assert_eq!(dropped.len(), 2);
643        let remaining = reg.show_chunks("m");
644        assert_eq!(remaining.len(), 1);
645        assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
646    }
647
648    #[test]
649    fn show_chunks_is_ordered_by_start() {
650        let reg = HypertableRegistry::new();
651        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
652        for ts in [5 * DAY_NS, 2 * DAY_NS, 7 * DAY_NS, 1 * DAY_NS] {
653            reg.route("m", ts).unwrap();
654        }
655        let starts: Vec<u64> = reg.show_chunks("m").iter().map(|c| c.id.start_ns).collect();
656        assert_eq!(starts, vec![DAY_NS, 2 * DAY_NS, 5 * DAY_NS, 7 * DAY_NS]);
657    }
658
659    #[test]
660    fn seal_chunk_flips_flag() {
661        let reg = HypertableRegistry::new();
662        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
663        let id = reg.route("m", 0).unwrap();
664        assert!(reg.seal_chunk(&id));
665        assert!(reg.show_chunks("m")[0].sealed);
666    }
667
668    #[test]
669    fn drop_hypertable_removes_everything() {
670        let reg = HypertableRegistry::new();
671        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
672        reg.route("m", 0).unwrap();
673        reg.route("m", DAY_NS).unwrap();
674        assert_eq!(reg.drop_hypertable("m"), 2);
675        assert!(reg.get("m").is_none());
676        assert!(reg.show_chunks("m").is_empty());
677    }
678
679    #[test]
680    fn total_rows_sums_every_chunk() {
681        let reg = HypertableRegistry::new();
682        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
683        for ts in 0..1000 {
684            reg.route("m", ts).unwrap();
685        }
686        for ts in DAY_NS..DAY_NS + 500 {
687            reg.route("m", ts).unwrap();
688        }
689        assert_eq!(reg.total_rows("m"), 1500);
690    }
691
692    #[test]
693    fn names_lists_registered_hypertables() {
694        let reg = HypertableRegistry::new();
695        reg.register(HypertableSpec::new("a", "ts", DAY_NS));
696        reg.register(HypertableSpec::new("b", "ts", HOUR_NS));
697        let mut names = reg.names();
698        names.sort();
699        assert_eq!(names, vec!["a", "b"]);
700    }
701
702    // -----------------------------------------------------------------
703    // Partition-level TTL
704    // -----------------------------------------------------------------
705
706    #[test]
707    fn with_ttl_parses_duration_and_sets_default() {
708        let s = HypertableSpec::new("m", "ts", DAY_NS)
709            .with_ttl("7d")
710            .unwrap();
711        assert_eq!(s.default_ttl_ns, Some(7 * DAY_NS));
712        assert!(HypertableSpec::new("m", "ts", DAY_NS)
713            .with_ttl("raw")
714            .is_none());
715        assert!(HypertableSpec::new("m", "ts", DAY_NS)
716            .with_ttl("garbage")
717            .is_none());
718    }
719
720    #[test]
721    fn chunk_with_no_rows_never_expires() {
722        let meta = ChunkMeta::new(
723            ChunkId {
724                hypertable: "m".into(),
725                start_ns: 0,
726            },
727            DAY_NS,
728        );
729        assert!(!meta.is_expired_at(u64::MAX, Some(1)));
730    }
731
732    #[test]
733    fn chunk_expires_when_now_crosses_max_ts_plus_ttl() {
734        let mut meta = ChunkMeta::new(
735            ChunkId {
736                hypertable: "m".into(),
737                start_ns: 0,
738            },
739            DAY_NS,
740        );
741        meta.observe(500);
742        // TTL = 1000, max_ts = 500 → expires at 1500.
743        assert!(!meta.is_expired_at(1000, Some(1000)));
744        assert!(!meta.is_expired_at(1499, Some(1000)));
745        assert!(meta.is_expired_at(1500, Some(1000)));
746    }
747
748    #[test]
749    fn per_chunk_override_wins_over_hypertable_default() {
750        let mut meta = ChunkMeta::new(
751            ChunkId {
752                hypertable: "m".into(),
753                start_ns: 0,
754            },
755            DAY_NS,
756        );
757        meta.observe(500);
758        // Default would say "expire at 500+1000 = 1500"; override
759        // narrows to 500+100 = 600.
760        meta.ttl_override_ns = Some(100);
761        assert!(meta.is_expired_at(600, Some(1000)));
762        assert!(!meta.is_expired_at(599, Some(1000)));
763    }
764
765    #[test]
766    fn sweep_expired_drops_chunks_past_ttl_and_returns_them() {
767        let reg = HypertableRegistry::new();
768        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(2 * DAY_NS));
769        // 3 chunks — at 0, DAY, 2*DAY.
770        for t in [0, DAY_NS, 2 * DAY_NS] {
771            reg.route("m", t).unwrap();
772        }
773        // now = 3 * DAY + 1 → chunks with max_ts in {0, DAY_NS} both
774        // expired (max + 2d ≤ now), chunk at 2*DAY_NS still alive.
775        let dropped = reg.sweep_expired("m", 3 * DAY_NS + 1);
776        let mut starts: Vec<u64> = dropped.iter().map(|m| m.id.start_ns).collect();
777        starts.sort();
778        assert_eq!(starts, vec![0, DAY_NS]);
779        let remaining = reg.show_chunks("m");
780        assert_eq!(remaining.len(), 1);
781        assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
782    }
783
784    #[test]
785    fn sweep_without_ttl_keeps_every_chunk() {
786        let reg = HypertableRegistry::new();
787        reg.register(HypertableSpec::new("m", "ts", DAY_NS)); // no TTL
788        for t in [0, DAY_NS, 2 * DAY_NS] {
789            reg.route("m", t).unwrap();
790        }
791        let dropped = reg.sweep_expired("m", 10_000 * DAY_NS);
792        assert!(dropped.is_empty());
793        assert_eq!(reg.show_chunks("m").len(), 3);
794    }
795
796    #[test]
797    fn sweep_all_expired_iterates_every_hypertable() {
798        let reg = HypertableRegistry::new();
799        reg.register(HypertableSpec::new("fast", "ts", HOUR_NS).with_ttl_ns(HOUR_NS));
800        reg.register(HypertableSpec::new("slow", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
801        // fast: chunk at 0 expires fast; slow: chunk at 0 still
802        // within its 7-day TTL.
803        reg.route("fast", 0).unwrap();
804        reg.route("slow", 0).unwrap();
805        let dropped = reg.sweep_all_expired(2 * HOUR_NS);
806        assert_eq!(dropped.len(), 1);
807        assert_eq!(dropped[0].0, "fast");
808        assert_eq!(reg.show_chunks("slow").len(), 1);
809    }
810
811    #[test]
812    fn set_chunk_ttl_ns_lets_caller_pin_or_shorten() {
813        let reg = HypertableRegistry::new();
814        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
815        let id = reg.route("m", 0).unwrap();
816        // Raise TTL to 100 days — chunk should survive the sweep.
817        assert!(reg.set_chunk_ttl_ns(&id, Some(100 * DAY_NS)));
818        let dropped = reg.sweep_expired("m", 10 * DAY_NS);
819        assert!(dropped.is_empty());
820        // Now shorten to 1 hour and sweep.
821        reg.set_chunk_ttl_ns(&id, Some(HOUR_NS));
822        let dropped = reg.sweep_expired("m", 10 * HOUR_NS);
823        assert_eq!(dropped.len(), 1);
824    }
825
826    #[test]
827    fn snapshot_then_restore_reproduces_registry_identically() {
828        // Pre-restart registry: two hypertables, several chunks, with a
829        // sealed chunk and a per-chunk TTL override — the bits that are
830        // NOT derivable from row data and so must round-trip verbatim.
831        let reg = HypertableRegistry::new();
832        reg.register(HypertableSpec::new("metrics", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
833        reg.register(HypertableSpec::new("events", "ts", HOUR_NS));
834        for t in [0, DAY_NS + 5, DAY_NS + 9, 2 * DAY_NS] {
835            reg.route("metrics", t).unwrap();
836        }
837        let id = reg.route("events", 0).unwrap();
838        reg.seal_chunk(&id);
839        reg.set_chunk_ttl_ns(&id, Some(3 * HOUR_NS));
840
841        let specs = reg.list();
842        let chunks = reg.snapshot_chunks();
843        assert!(!reg.is_empty());
844
845        // Simulated restart: rebuild a fresh registry from the snapshot.
846        let restored = HypertableRegistry::new();
847        assert!(restored.is_empty());
848        for spec in specs {
849            restored.register(spec);
850        }
851        for chunk in chunks {
852            restored.restore_chunk(chunk);
853        }
854
855        // Specs identical.
856        let before = reg.get("metrics").unwrap();
857        let after = restored.get("metrics").unwrap();
858        assert_eq!(after.chunk_interval_ns, before.chunk_interval_ns);
859        assert_eq!(after.time_column, before.time_column);
860        assert_eq!(after.default_ttl_ns, before.default_ttl_ns);
861
862        // Chunk metadata identical (bounds, counts, sealed, TTL override).
863        let m_before = reg.show_chunks("metrics");
864        let m_after = restored.show_chunks("metrics");
865        assert_eq!(m_after.len(), m_before.len());
866        for (a, b) in m_after.iter().zip(m_before.iter()) {
867            assert_eq!(a.id.start_ns, b.id.start_ns);
868            assert_eq!(a.end_ns_exclusive, b.end_ns_exclusive);
869            assert_eq!(a.row_count, b.row_count);
870            assert_eq!(a.min_ts_ns, b.min_ts_ns);
871            assert_eq!(a.max_ts_ns, b.max_ts_ns);
872        }
873        let e_after = restored.show_chunks("events");
874        assert_eq!(e_after.len(), 1);
875        assert!(e_after[0].sealed, "sealed flag must survive restore");
876        assert_eq!(e_after[0].ttl_override_ns, Some(3 * HOUR_NS));
877
878        // A post-restart write routes to the EXISTING chunk — no
879        // duplicate allocation.
880        let routed = restored.route("metrics", DAY_NS + 1).unwrap();
881        assert_eq!(routed.start_ns, DAY_NS);
882        assert_eq!(
883            restored.show_chunks("metrics").len(),
884            m_before.len(),
885            "write after restore must not allocate a new chunk"
886        );
887    }
888
889    #[test]
890    fn chunks_expiring_within_previews_without_dropping() {
891        let reg = HypertableRegistry::new();
892        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
893        // Chunks with max_ts at 0, 1d, 2d → expiries at 1d, 2d, 3d.
894        for t in [0, DAY_NS, 2 * DAY_NS] {
895            reg.route("m", t).unwrap();
896        }
897        // now=0, horizon=1.5d → only the first chunk (expiry 1d)
898        // fits. Tight horizon proves the cutoff math.
899        let preview = reg.chunks_expiring_within("m", 0, DAY_NS + DAY_NS / 2);
900        assert_eq!(preview.len(), 1);
901        assert_eq!(preview[0].id.start_ns, 0);
902        // Wider horizon pulls in the second chunk too.
903        let preview2 = reg.chunks_expiring_within("m", 0, 2 * DAY_NS);
904        assert_eq!(preview2.len(), 2);
905        // Registry still has every chunk — preview never drops.
906        assert_eq!(reg.show_chunks("m").len(), 3);
907    }
908}