Skip to main content

reddb_server/storage/timeseries/
hypertable.rs

1//! Hypertables — time-range-partitioned tables à la TimescaleDB.
2//!
3//! A hypertable is a logical collection that auto-partitions writes
4//! into child chunks, each covering a fixed time interval
5//! (`chunk_interval_ns`). Queries that filter by the time column
6//! see the partition pruner eliminate child chunks whose bounds
7//! fall outside the predicate; drops happen per-chunk so operators
8//! can retain the last N days without a full-table scan.
9//!
10//! This module defines the **metadata + router**. The physical
11//! chunk is the same [`super::chunk::TimeSeriesChunk`] the standalone
12//! time-series path already uses — the hypertable layer just tracks
13//! which chunk a row goes into.
14//!
15//! SQL surface (parsed elsewhere in the sprint):
16//!
17//! ```sql
18//! CREATE HYPERTABLE metrics (
19//!   ts    BIGINT,
20//!   host  TEXT,
21//!   value DOUBLE
22//! ) CHUNK INTERVAL '1 day';
23//!
24//! SELECT drop_chunks('metrics', INTERVAL '90 days');
25//! SELECT show_chunks('metrics');
26//! ```
27
28use std::collections::BTreeMap;
29use std::sync::{Arc, Mutex};
30
31use super::retention::parse_duration_ns;
32
33/// Spec declared by `CREATE HYPERTABLE`.
34#[derive(Debug, Clone)]
35pub struct HypertableSpec {
36    pub name: String,
37    /// Column name that carries the time axis (must be unix-ns BIGINT
38    /// or parseable to one).
39    pub time_column: String,
40    /// Fixed width of a single chunk, in nanoseconds.
41    pub chunk_interval_ns: u64,
42    /// Default TTL applied to every new chunk when the DDL didn't
43    /// request an explicit override. `None` means "no TTL — chunks
44    /// live until explicit `drop_chunks` / retention policy fires".
45    ///
46    /// The effective expiry of a chunk is `max_ts_ns + ttl_ns`, so
47    /// a chunk is safely droppable once `now_ns ≥ expiry`. That
48    /// matches the contract callers already learnt from the
49    /// retention daemon — partition TTL is the **declarative** way
50    /// to say the same thing at CREATE time without a separate
51    /// `add_retention_policy` call.
52    pub default_ttl_ns: Option<u64>,
53}
54
55impl HypertableSpec {
56    pub fn new(
57        name: impl Into<String>,
58        time_column: impl Into<String>,
59        chunk_interval_ns: u64,
60    ) -> Self {
61        Self {
62            name: name.into(),
63            time_column: time_column.into(),
64            chunk_interval_ns: chunk_interval_ns.max(1),
65            default_ttl_ns: None,
66        }
67    }
68
69    /// Convenience: construct from a Timescale-style duration string
70    /// (`"1d"`, `"1h"`, `"30m"`…).
71    pub fn from_interval_string(
72        name: impl Into<String>,
73        time_column: impl Into<String>,
74        interval: &str,
75    ) -> Option<Self> {
76        let ns = parse_duration_ns(interval)?;
77        if ns == 0 {
78            return None;
79        }
80        Some(Self::new(name, time_column, ns))
81    }
82
83    /// Builder-style: attach a default TTL. Uses the same duration
84    /// grammar as `chunk_interval` (`"90d"`, `"30s"`, …).
85    pub fn with_ttl(mut self, ttl: &str) -> Option<Self> {
86        let ns = parse_duration_ns(ttl)?;
87        if ns == 0 {
88            return None;
89        }
90        self.default_ttl_ns = Some(ns);
91        Some(self)
92    }
93
94    /// Direct setter when the TTL is already computed in ns.
95    pub fn with_ttl_ns(mut self, ttl_ns: u64) -> Self {
96        self.default_ttl_ns = if ttl_ns == 0 { None } else { Some(ttl_ns) };
97        self
98    }
99
100    /// Align `timestamp_ns` to the chunk's floor — the chunk that
101    /// row belongs to starts at this timestamp and covers
102    /// `[start, start + chunk_interval_ns)`.
103    pub fn chunk_start(&self, timestamp_ns: u64) -> u64 {
104        (timestamp_ns / self.chunk_interval_ns) * self.chunk_interval_ns
105    }
106
107    pub fn chunk_end_exclusive(&self, timestamp_ns: u64) -> u64 {
108        self.chunk_start(timestamp_ns)
109            .saturating_add(self.chunk_interval_ns)
110    }
111}
112
113/// Identifier of a single child chunk. Stable across restart so
114/// catalog + retention can reference it unambiguously.
115#[derive(Debug, Clone, PartialEq, Eq, Hash)]
116pub struct ChunkId {
117    pub hypertable: String,
118    /// Chunk start (inclusive), aligned to `chunk_interval_ns`.
119    pub start_ns: u64,
120}
121
122/// Metadata tracked per child chunk. Physical storage lives in
123/// `TimeSeriesChunk` keyed by `(hypertable, start_ns)`.
124#[derive(Debug, Clone)]
125pub struct ChunkMeta {
126    pub id: ChunkId,
127    pub end_ns_exclusive: u64,
128    pub row_count: u64,
129    pub min_ts_ns: u64,
130    pub max_ts_ns: u64,
131    pub sealed: bool,
132    /// Optional per-chunk TTL override. `None` means "fall back to
133    /// the hypertable's default TTL". Setting this lets mixed-TTL
134    /// policies live inside the same hypertable — e.g. keep the
135    /// current month of data forever but expire everything older
136    /// than 90 days.
137    pub ttl_override_ns: Option<u64>,
138}
139
140impl ChunkMeta {
141    pub fn new(id: ChunkId, end_ns_exclusive: u64) -> Self {
142        Self {
143            id,
144            end_ns_exclusive,
145            row_count: 0,
146            min_ts_ns: u64::MAX,
147            max_ts_ns: 0,
148            sealed: false,
149            ttl_override_ns: None,
150        }
151    }
152
153    pub fn observe(&mut self, ts_ns: u64) {
154        self.row_count += 1;
155        if ts_ns < self.min_ts_ns {
156            self.min_ts_ns = ts_ns;
157        }
158        if ts_ns > self.max_ts_ns {
159            self.max_ts_ns = ts_ns;
160        }
161    }
162
163    /// Effective TTL = per-chunk override if present, otherwise the
164    /// hypertable default. `None` = the chunk has no automatic
165    /// expiry.
166    pub fn effective_ttl_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
167        self.ttl_override_ns.or(default_ttl_ns)
168    }
169
170    /// Absolute epoch-ns at which the chunk becomes droppable. Uses
171    /// `max_ts_ns` as the baseline — the newest row the chunk has
172    /// ever accepted — so an empty chunk (no rows yet) never
173    /// expires until at least one row lands.
174    pub fn expiry_ns(&self, default_ttl_ns: Option<u64>) -> Option<u64> {
175        let ttl = self.effective_ttl_ns(default_ttl_ns)?;
176        if self.row_count == 0 {
177            return None;
178        }
179        Some(self.max_ts_ns.saturating_add(ttl))
180    }
181
182    pub fn is_expired_at(&self, now_ns: u64, default_ttl_ns: Option<u64>) -> bool {
183        match self.expiry_ns(default_ttl_ns) {
184            Some(expiry) => now_ns >= expiry,
185            None => false,
186        }
187    }
188}
189
190/// In-memory catalog of hypertables and their chunks. Thread-safe
191/// because INSERTs can land from multiple writers simultaneously.
192#[derive(Clone, Default)]
193pub struct HypertableRegistry {
194    inner: Arc<Mutex<RegistryInner>>,
195}
196
197#[derive(Default)]
198struct RegistryInner {
199    specs: BTreeMap<String, HypertableSpec>,
200    /// `(hypertable, start_ns)` → chunk meta. `BTreeMap` so lookups
201    /// by name produce an ordered view (show_chunks must be
202    /// deterministic).
203    chunks: BTreeMap<(String, u64), ChunkMeta>,
204}
205
206impl std::fmt::Debug for HypertableRegistry {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        let guard = match self.inner.lock() {
209            Ok(g) => g,
210            Err(p) => p.into_inner(),
211        };
212        f.debug_struct("HypertableRegistry")
213            .field("hypertables", &guard.specs.len())
214            .field("chunks", &guard.chunks.len())
215            .finish()
216    }
217}
218
219impl HypertableRegistry {
220    pub fn new() -> Self {
221        Self::default()
222    }
223
224    /// Register a new hypertable. Replaces the previous spec if one
225    /// existed with the same name; chunks for that name are kept —
226    /// the operator is assumed to know what they're doing when they
227    /// redefine (e.g. widening `chunk_interval_ns`).
228    pub fn register(&self, spec: HypertableSpec) {
229        let mut guard = match self.inner.lock() {
230            Ok(g) => g,
231            Err(p) => p.into_inner(),
232        };
233        guard.specs.insert(spec.name.clone(), spec);
234    }
235
236    pub fn get(&self, name: &str) -> Option<HypertableSpec> {
237        let guard = match self.inner.lock() {
238            Ok(g) => g,
239            Err(p) => p.into_inner(),
240        };
241        guard.specs.get(name).cloned()
242    }
243
244    pub fn list(&self) -> Vec<HypertableSpec> {
245        let guard = match self.inner.lock() {
246            Ok(g) => g,
247            Err(p) => p.into_inner(),
248        };
249        guard.specs.values().cloned().collect()
250    }
251
252    /// Drop a hypertable from the registry. Returns the removed spec
253    /// when present, or `None` for unknown names. Callers drop the
254    /// backing collection separately — this is registry housekeeping
255    /// only.
256    pub fn unregister(&self, name: &str) -> Option<HypertableSpec> {
257        let mut guard = match self.inner.lock() {
258            Ok(g) => g,
259            Err(p) => p.into_inner(),
260        };
261        guard.specs.remove(name)
262    }
263
264    /// Route a write: returns the `ChunkId` the row belongs in,
265    /// allocating the chunk on first write. `None` when the
266    /// hypertable is unknown.
267    pub fn route(&self, hypertable: &str, timestamp_ns: u64) -> Option<ChunkId> {
268        let mut guard = match self.inner.lock() {
269            Ok(g) => g,
270            Err(p) => p.into_inner(),
271        };
272        let spec = guard.specs.get(hypertable)?.clone();
273        let start = spec.chunk_start(timestamp_ns);
274        let end = spec.chunk_end_exclusive(timestamp_ns);
275        let id = ChunkId {
276            hypertable: spec.name.clone(),
277            start_ns: start,
278        };
279        let key = (spec.name.clone(), start);
280        let meta = guard
281            .chunks
282            .entry(key)
283            .or_insert_with(|| ChunkMeta::new(id.clone(), end));
284        meta.observe(timestamp_ns);
285        Some(id)
286    }
287
288    /// Return every chunk for `hypertable`, oldest-first.
289    pub fn show_chunks(&self, hypertable: &str) -> Vec<ChunkMeta> {
290        let guard = match self.inner.lock() {
291            Ok(g) => g,
292            Err(p) => p.into_inner(),
293        };
294        guard
295            .chunks
296            .iter()
297            .filter(|((name, _), _)| name == hypertable)
298            .map(|(_, meta)| meta.clone())
299            .collect()
300    }
301
302    /// Drop every chunk of `hypertable` whose `max_ts_ns` is at or
303    /// below `cutoff_ns`. Returns the count dropped — the physical
304    /// storage release is the caller's responsibility (this module
305    /// only owns the metadata).
306    pub fn drop_chunks_before(&self, hypertable: &str, cutoff_ns: u64) -> Vec<ChunkMeta> {
307        let mut guard = match self.inner.lock() {
308            Ok(g) => g,
309            Err(p) => p.into_inner(),
310        };
311        let mut dropped = Vec::new();
312        let keys: Vec<(String, u64)> = guard
313            .chunks
314            .iter()
315            .filter(|((name, _), meta)| name == hypertable && meta.max_ts_ns <= cutoff_ns)
316            .map(|(k, _)| k.clone())
317            .collect();
318        for key in keys {
319            if let Some(meta) = guard.chunks.remove(&key) {
320                dropped.push(meta);
321            }
322        }
323        dropped
324    }
325
326    /// Sweep chunks whose effective TTL has fired. A chunk is
327    /// droppable when `now_ns ≥ max_ts_ns + effective_ttl_ns` — the
328    /// registry hands back every removed `ChunkMeta` so the
329    /// physical-storage callback can release bytes + indexes. Chunks
330    /// without an effective TTL (neither per-chunk override nor
331    /// hypertable default) are never touched.
332    ///
333    /// This is the "TTL applied at the partition level" primitive:
334    /// one O(1) metadata sweep reclaims every row of every expired
335    /// chunk, instead of scanning rows individually like an
336    /// entity-level TTL would. Empty hypertables stay empty.
337    pub fn sweep_expired(&self, hypertable: &str, now_ns: u64) -> Vec<ChunkMeta> {
338        let mut guard = match self.inner.lock() {
339            Ok(g) => g,
340            Err(p) => p.into_inner(),
341        };
342        let Some(spec) = guard.specs.get(hypertable).cloned() else {
343            return Vec::new();
344        };
345        let expired_keys: Vec<(String, u64)> = guard
346            .chunks
347            .iter()
348            .filter(|((name, _), meta)| {
349                name == hypertable && meta.is_expired_at(now_ns, spec.default_ttl_ns)
350            })
351            .map(|(k, _)| k.clone())
352            .collect();
353        let mut dropped = Vec::with_capacity(expired_keys.len());
354        for key in expired_keys {
355            if let Some(meta) = guard.chunks.remove(&key) {
356                dropped.push(meta);
357            }
358        }
359        dropped
360    }
361
362    /// Sweep every registered hypertable in one shot — the loop the
363    /// retention daemon runs every cycle. Returns a flat list of
364    /// `(hypertable_name, chunk_dropped)` pairs.
365    pub fn sweep_all_expired(&self, now_ns: u64) -> Vec<(String, ChunkMeta)> {
366        let names: Vec<String> = {
367            let guard = match self.inner.lock() {
368                Ok(g) => g,
369                Err(p) => p.into_inner(),
370            };
371            guard.specs.keys().cloned().collect()
372        };
373        let mut out = Vec::new();
374        for name in names {
375            for meta in self.sweep_expired(&name, now_ns) {
376                out.push((name.clone(), meta));
377            }
378        }
379        out
380    }
381
382    /// Install / replace the hypertable-wide default TTL. `None`
383    /// disables automatic expiry — chunks live until explicit
384    /// `drop_chunks` / per-chunk override fires.
385    pub fn set_default_ttl_ns(&self, hypertable: &str, ttl_ns: Option<u64>) -> bool {
386        let mut guard = match self.inner.lock() {
387            Ok(g) => g,
388            Err(p) => p.into_inner(),
389        };
390        match guard.specs.get_mut(hypertable) {
391            Some(spec) => {
392                spec.default_ttl_ns = match ttl_ns {
393                    Some(0) | None => None,
394                    Some(v) => Some(v),
395                };
396                true
397            }
398            None => false,
399        }
400    }
401
402    /// Override the TTL for a single chunk. Useful for "keep this
403    /// specific chunk longer because it contains an incident
404    /// replay" or "expire this one faster because it was filled
405    /// from a backfill we're about to redo". `None` removes the
406    /// override and falls back to the hypertable default.
407    pub fn set_chunk_ttl_ns(&self, id: &ChunkId, ttl_ns: Option<u64>) -> bool {
408        let mut guard = match self.inner.lock() {
409            Ok(g) => g,
410            Err(p) => p.into_inner(),
411        };
412        if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
413            meta.ttl_override_ns = ttl_ns;
414            true
415        } else {
416            false
417        }
418    }
419
420    /// Inspect the list of chunks that are *about* to expire within
421    /// `horizon_ns`. Powers preview endpoints ("what will the next
422    /// sweep drop?") without actually dropping anything.
423    pub fn chunks_expiring_within(
424        &self,
425        hypertable: &str,
426        now_ns: u64,
427        horizon_ns: u64,
428    ) -> Vec<ChunkMeta> {
429        let guard = match self.inner.lock() {
430            Ok(g) => g,
431            Err(p) => p.into_inner(),
432        };
433        let Some(spec) = guard.specs.get(hypertable).cloned() else {
434            return Vec::new();
435        };
436        let cutoff = now_ns.saturating_add(horizon_ns);
437        guard
438            .chunks
439            .iter()
440            .filter(|((name, _), _)| name == hypertable)
441            .filter_map(|(_, meta)| {
442                let expiry = meta.expiry_ns(spec.default_ttl_ns)?;
443                if expiry <= cutoff {
444                    Some(meta.clone())
445                } else {
446                    None
447                }
448            })
449            .collect()
450    }
451
452    /// Seal a chunk — future writes to the same `start_ns` bucket
453    /// will still land (open-ended), but the `sealed` flag signals
454    /// the maintenance layer that the chunk can now be compressed /
455    /// uploaded / migrated. Returns `true` if the chunk existed.
456    pub fn seal_chunk(&self, id: &ChunkId) -> bool {
457        let mut guard = match self.inner.lock() {
458            Ok(g) => g,
459            Err(p) => p.into_inner(),
460        };
461        if let Some(meta) = guard.chunks.get_mut(&(id.hypertable.clone(), id.start_ns)) {
462            meta.sealed = true;
463            true
464        } else {
465            false
466        }
467    }
468
469    /// Total row count across every chunk of `hypertable`. Used by
470    /// catalog views + benchmark harnesses.
471    pub fn total_rows(&self, hypertable: &str) -> u64 {
472        let guard = match self.inner.lock() {
473            Ok(g) => g,
474            Err(p) => p.into_inner(),
475        };
476        guard
477            .chunks
478            .iter()
479            .filter(|((name, _), _)| name == hypertable)
480            .map(|(_, meta)| meta.row_count)
481            .sum()
482    }
483
484    /// List the hypertables the retention daemon should sweep.
485    pub fn names(&self) -> Vec<String> {
486        let guard = match self.inner.lock() {
487            Ok(g) => g,
488            Err(p) => p.into_inner(),
489        };
490        guard.specs.keys().cloned().collect()
491    }
492
493    /// Drop the whole hypertable (spec + every chunk). Returns the
494    /// number of chunks removed.
495    pub fn drop_hypertable(&self, name: &str) -> usize {
496        let mut guard = match self.inner.lock() {
497            Ok(g) => g,
498            Err(p) => p.into_inner(),
499        };
500        guard.specs.remove(name);
501        let keys: Vec<(String, u64)> = guard
502            .chunks
503            .keys()
504            .filter(|(n, _)| n == name)
505            .cloned()
506            .collect();
507        for key in &keys {
508            guard.chunks.remove(key);
509        }
510        keys.len()
511    }
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517
518    const DAY_NS: u64 = 86_400_000_000_000;
519    const HOUR_NS: u64 = 3_600_000_000_000;
520
521    #[test]
522    fn chunk_start_aligns_to_interval_floor() {
523        let spec = HypertableSpec::new("m", "ts", DAY_NS);
524        assert_eq!(spec.chunk_start(0), 0);
525        assert_eq!(spec.chunk_start(DAY_NS - 1), 0);
526        assert_eq!(spec.chunk_start(DAY_NS), DAY_NS);
527        assert_eq!(spec.chunk_start(3 * DAY_NS + 123), 3 * DAY_NS);
528    }
529
530    #[test]
531    fn interval_string_accepts_duration_units() {
532        let s = HypertableSpec::from_interval_string("m", "ts", "1d").unwrap();
533        assert_eq!(s.chunk_interval_ns, DAY_NS);
534        let s = HypertableSpec::from_interval_string("m", "ts", "1h").unwrap();
535        assert_eq!(s.chunk_interval_ns, HOUR_NS);
536        assert!(HypertableSpec::from_interval_string("m", "ts", "raw").is_none());
537        assert!(HypertableSpec::from_interval_string("m", "ts", "garbage").is_none());
538    }
539
540    #[test]
541    fn route_allocates_chunk_on_first_write() {
542        let reg = HypertableRegistry::new();
543        reg.register(HypertableSpec::new("metrics", "ts", DAY_NS));
544        let id = reg.route("metrics", DAY_NS + 100).unwrap();
545        assert_eq!(id.hypertable, "metrics");
546        assert_eq!(id.start_ns, DAY_NS);
547        let chunks = reg.show_chunks("metrics");
548        assert_eq!(chunks.len(), 1);
549        assert_eq!(chunks[0].row_count, 1);
550        assert_eq!(chunks[0].min_ts_ns, DAY_NS + 100);
551        assert_eq!(chunks[0].max_ts_ns, DAY_NS + 100);
552        assert_eq!(chunks[0].end_ns_exclusive, 2 * DAY_NS);
553    }
554
555    #[test]
556    fn route_groups_writes_within_same_chunk() {
557        let reg = HypertableRegistry::new();
558        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
559        for offset in [10u64, 100, 1_000, DAY_NS - 1] {
560            let id = reg.route("m", offset).unwrap();
561            assert_eq!(id.start_ns, 0);
562        }
563        let chunks = reg.show_chunks("m");
564        assert_eq!(chunks.len(), 1);
565        assert_eq!(chunks[0].row_count, 4);
566    }
567
568    #[test]
569    fn route_splits_writes_across_adjacent_chunks() {
570        let reg = HypertableRegistry::new();
571        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
572        reg.route("m", DAY_NS - 1).unwrap();
573        reg.route("m", DAY_NS).unwrap();
574        reg.route("m", 2 * DAY_NS).unwrap();
575        let chunks = reg.show_chunks("m");
576        assert_eq!(chunks.len(), 3);
577        assert!(chunks[0].id.start_ns <= chunks[1].id.start_ns);
578        assert!(chunks[1].id.start_ns <= chunks[2].id.start_ns);
579    }
580
581    #[test]
582    fn route_returns_none_for_unknown_hypertable() {
583        let reg = HypertableRegistry::new();
584        assert!(reg.route("nope", 0).is_none());
585    }
586
587    #[test]
588    fn drop_chunks_before_removes_matching_chunks() {
589        let reg = HypertableRegistry::new();
590        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
591        reg.route("m", 0).unwrap(); // chunk start 0, max=0
592        reg.route("m", DAY_NS).unwrap(); // chunk start DAY_NS, max=DAY_NS
593        reg.route("m", 2 * DAY_NS + 5).unwrap(); // chunk start 2*DAY_NS
594
595        let dropped = reg.drop_chunks_before("m", DAY_NS);
596        // max_ts_ns of first chunk is 0, of second chunk is DAY_NS.
597        // cutoff = DAY_NS, so both are "<= cutoff" → both dropped.
598        assert_eq!(dropped.len(), 2);
599        let remaining = reg.show_chunks("m");
600        assert_eq!(remaining.len(), 1);
601        assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
602    }
603
604    #[test]
605    fn show_chunks_is_ordered_by_start() {
606        let reg = HypertableRegistry::new();
607        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
608        for ts in [5 * DAY_NS, 2 * DAY_NS, 7 * DAY_NS, 1 * DAY_NS] {
609            reg.route("m", ts).unwrap();
610        }
611        let starts: Vec<u64> = reg.show_chunks("m").iter().map(|c| c.id.start_ns).collect();
612        assert_eq!(starts, vec![DAY_NS, 2 * DAY_NS, 5 * DAY_NS, 7 * DAY_NS]);
613    }
614
615    #[test]
616    fn seal_chunk_flips_flag() {
617        let reg = HypertableRegistry::new();
618        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
619        let id = reg.route("m", 0).unwrap();
620        assert!(reg.seal_chunk(&id));
621        assert!(reg.show_chunks("m")[0].sealed);
622    }
623
624    #[test]
625    fn drop_hypertable_removes_everything() {
626        let reg = HypertableRegistry::new();
627        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
628        reg.route("m", 0).unwrap();
629        reg.route("m", DAY_NS).unwrap();
630        assert_eq!(reg.drop_hypertable("m"), 2);
631        assert!(reg.get("m").is_none());
632        assert!(reg.show_chunks("m").is_empty());
633    }
634
635    #[test]
636    fn total_rows_sums_every_chunk() {
637        let reg = HypertableRegistry::new();
638        reg.register(HypertableSpec::new("m", "ts", DAY_NS));
639        for ts in 0..1000 {
640            reg.route("m", ts).unwrap();
641        }
642        for ts in DAY_NS..DAY_NS + 500 {
643            reg.route("m", ts).unwrap();
644        }
645        assert_eq!(reg.total_rows("m"), 1500);
646    }
647
648    #[test]
649    fn names_lists_registered_hypertables() {
650        let reg = HypertableRegistry::new();
651        reg.register(HypertableSpec::new("a", "ts", DAY_NS));
652        reg.register(HypertableSpec::new("b", "ts", HOUR_NS));
653        let mut names = reg.names();
654        names.sort();
655        assert_eq!(names, vec!["a", "b"]);
656    }
657
658    // -----------------------------------------------------------------
659    // Partition-level TTL
660    // -----------------------------------------------------------------
661
662    #[test]
663    fn with_ttl_parses_duration_and_sets_default() {
664        let s = HypertableSpec::new("m", "ts", DAY_NS)
665            .with_ttl("7d")
666            .unwrap();
667        assert_eq!(s.default_ttl_ns, Some(7 * DAY_NS));
668        assert!(HypertableSpec::new("m", "ts", DAY_NS)
669            .with_ttl("raw")
670            .is_none());
671        assert!(HypertableSpec::new("m", "ts", DAY_NS)
672            .with_ttl("garbage")
673            .is_none());
674    }
675
676    #[test]
677    fn chunk_with_no_rows_never_expires() {
678        let meta = ChunkMeta::new(
679            ChunkId {
680                hypertable: "m".into(),
681                start_ns: 0,
682            },
683            DAY_NS,
684        );
685        assert!(!meta.is_expired_at(u64::MAX, Some(1)));
686    }
687
688    #[test]
689    fn chunk_expires_when_now_crosses_max_ts_plus_ttl() {
690        let mut meta = ChunkMeta::new(
691            ChunkId {
692                hypertable: "m".into(),
693                start_ns: 0,
694            },
695            DAY_NS,
696        );
697        meta.observe(500);
698        // TTL = 1000, max_ts = 500 → expires at 1500.
699        assert!(!meta.is_expired_at(1000, Some(1000)));
700        assert!(!meta.is_expired_at(1499, Some(1000)));
701        assert!(meta.is_expired_at(1500, Some(1000)));
702    }
703
704    #[test]
705    fn per_chunk_override_wins_over_hypertable_default() {
706        let mut meta = ChunkMeta::new(
707            ChunkId {
708                hypertable: "m".into(),
709                start_ns: 0,
710            },
711            DAY_NS,
712        );
713        meta.observe(500);
714        // Default would say "expire at 500+1000 = 1500"; override
715        // narrows to 500+100 = 600.
716        meta.ttl_override_ns = Some(100);
717        assert!(meta.is_expired_at(600, Some(1000)));
718        assert!(!meta.is_expired_at(599, Some(1000)));
719    }
720
721    #[test]
722    fn sweep_expired_drops_chunks_past_ttl_and_returns_them() {
723        let reg = HypertableRegistry::new();
724        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(2 * DAY_NS));
725        // 3 chunks — at 0, DAY, 2*DAY.
726        for t in [0, DAY_NS, 2 * DAY_NS] {
727            reg.route("m", t).unwrap();
728        }
729        // now = 3 * DAY + 1 → chunks with max_ts in {0, DAY_NS} both
730        // expired (max + 2d ≤ now), chunk at 2*DAY_NS still alive.
731        let dropped = reg.sweep_expired("m", 3 * DAY_NS + 1);
732        let mut starts: Vec<u64> = dropped.iter().map(|m| m.id.start_ns).collect();
733        starts.sort();
734        assert_eq!(starts, vec![0, DAY_NS]);
735        let remaining = reg.show_chunks("m");
736        assert_eq!(remaining.len(), 1);
737        assert_eq!(remaining[0].id.start_ns, 2 * DAY_NS);
738    }
739
740    #[test]
741    fn sweep_without_ttl_keeps_every_chunk() {
742        let reg = HypertableRegistry::new();
743        reg.register(HypertableSpec::new("m", "ts", DAY_NS)); // no TTL
744        for t in [0, DAY_NS, 2 * DAY_NS] {
745            reg.route("m", t).unwrap();
746        }
747        let dropped = reg.sweep_expired("m", 10_000 * DAY_NS);
748        assert!(dropped.is_empty());
749        assert_eq!(reg.show_chunks("m").len(), 3);
750    }
751
752    #[test]
753    fn sweep_all_expired_iterates_every_hypertable() {
754        let reg = HypertableRegistry::new();
755        reg.register(HypertableSpec::new("fast", "ts", HOUR_NS).with_ttl_ns(HOUR_NS));
756        reg.register(HypertableSpec::new("slow", "ts", DAY_NS).with_ttl_ns(7 * DAY_NS));
757        // fast: chunk at 0 expires fast; slow: chunk at 0 still
758        // within its 7-day TTL.
759        reg.route("fast", 0).unwrap();
760        reg.route("slow", 0).unwrap();
761        let dropped = reg.sweep_all_expired(2 * HOUR_NS);
762        assert_eq!(dropped.len(), 1);
763        assert_eq!(dropped[0].0, "fast");
764        assert_eq!(reg.show_chunks("slow").len(), 1);
765    }
766
767    #[test]
768    fn set_chunk_ttl_ns_lets_caller_pin_or_shorten() {
769        let reg = HypertableRegistry::new();
770        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
771        let id = reg.route("m", 0).unwrap();
772        // Raise TTL to 100 days — chunk should survive the sweep.
773        assert!(reg.set_chunk_ttl_ns(&id, Some(100 * DAY_NS)));
774        let dropped = reg.sweep_expired("m", 10 * DAY_NS);
775        assert!(dropped.is_empty());
776        // Now shorten to 1 hour and sweep.
777        reg.set_chunk_ttl_ns(&id, Some(HOUR_NS));
778        let dropped = reg.sweep_expired("m", 10 * HOUR_NS);
779        assert_eq!(dropped.len(), 1);
780    }
781
782    #[test]
783    fn chunks_expiring_within_previews_without_dropping() {
784        let reg = HypertableRegistry::new();
785        reg.register(HypertableSpec::new("m", "ts", DAY_NS).with_ttl_ns(DAY_NS));
786        // Chunks with max_ts at 0, 1d, 2d → expiries at 1d, 2d, 3d.
787        for t in [0, DAY_NS, 2 * DAY_NS] {
788            reg.route("m", t).unwrap();
789        }
790        // now=0, horizon=1.5d → only the first chunk (expiry 1d)
791        // fits. Tight horizon proves the cutoff math.
792        let preview = reg.chunks_expiring_within("m", 0, DAY_NS + DAY_NS / 2);
793        assert_eq!(preview.len(), 1);
794        assert_eq!(preview[0].id.start_ns, 0);
795        // Wider horizon pulls in the second chunk too.
796        let preview2 = reg.chunks_expiring_within("m", 0, 2 * DAY_NS);
797        assert_eq!(preview2.len(), 2);
798        // Registry still has every chunk — preview never drops.
799        assert_eq!(reg.show_chunks("m").len(), 3);
800    }
801}