Skip to main content

reddb_server/storage/timeseries/
temporal_index.rs

1//! Global temporal index over timeseries chunks.
2//!
3//! Today each [`super::chunk::TimeSeriesChunk`] tracks its own
4//! `min_timestamp` / `max_timestamp`, but there is no structure above the
5//! chunk set that answers "which chunks overlap `[start, end]`?" efficiently.
6//! Callers fall back to a linear scan over every chunk.
7//!
8//! This module introduces [`TemporalIndex`] — a `BTreeMap` keyed by each
9//! chunk's `min_ts`, paired with a bloom filter of registered chunk start
10//! timestamps. It powers:
11//!
12//! - `chunks_overlapping(start, end)` → O(log n + k) interval probe
13//! - `chunks_at_timestamp(ts)`        → point lookup
14//! - cross-structure **temporal join** (tables ↔ timeseries) once the
15//!   planner can ask "give me chunks around this row's event_ts"
16//!
17//! The index stores opaque [`ChunkHandle`]s — callers decide what `chunk_id`
18//! and `series_id` mean. That keeps this module independent of the chunk
19//! lifecycle (growing/sealed/flushed) and the on-disk layout.
20//!
21//! Implements [`crate::storage::index::IndexBase`] and exposes a bloom via
22//! [`crate::storage::index::HasBloom`] so the query planner and segment
23//! layer can prune uniformly.
24
25use std::collections::{BTreeMap, HashSet};
26use std::ops::Bound;
27use std::sync::atomic::{AtomicU64, Ordering};
28
29use crate::storage::index::{BloomSegment, HasBloom, IndexBase, IndexKind, IndexStats};
30
31/// Number of chunks grouped into a single BRIN block range.
32/// Coarser granularity = fewer range entries, faster pre-filter at the cost
33/// of reduced precision (may read slightly more chunks than necessary).
34/// Equivalent to PostgreSQL's `pages_per_range` parameter.
35const BRIN_CHUNKS_PER_RANGE: usize = 128;
36
37/// A BRIN block range: summarises min/max timestamp across up to
38/// `BRIN_CHUNKS_PER_RANGE` consecutive chunks. Query planning skips an
39/// entire range in O(1) when the range's `[min_ts, max_ts]` does not
40/// intersect the query window — identical to PostgreSQL's BRIN MINMAX scan.
41#[derive(Debug, Clone, Copy)]
42struct BrinRange {
43    /// Minimum timestamp across all chunks in this range.
44    min_ts: u64,
45    /// Maximum timestamp across all chunks in this range.
46    max_ts: u64,
47    /// How many chunk handles fall in this range (≤ BRIN_CHUNKS_PER_RANGE).
48    chunk_count: usize,
49}
50
51impl BrinRange {
52    #[inline]
53    fn overlaps(&self, start: u64, end: u64) -> bool {
54        self.max_ts >= start && self.min_ts <= end
55    }
56}
57
58/// Opaque handle describing a chunk the index tracks.
59///
60/// Callers are free to interpret `series_id`/`chunk_id`. Only `min_ts` and
61/// `max_ts` are consulted for query planning.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
63pub struct ChunkHandle {
64    /// Opaque identifier for the series (metric + tag combination hash, for
65    /// example). Passed through unchanged.
66    pub series_id: u64,
67    /// Opaque identifier for the chunk within its series.
68    pub chunk_id: u64,
69    /// Earliest timestamp present in the chunk, inclusive.
70    pub min_ts: u64,
71    /// Latest timestamp present in the chunk, inclusive.
72    pub max_ts: u64,
73}
74
75impl ChunkHandle {
76    /// Returns true iff the handle's `[min_ts, max_ts]` interval intersects
77    /// `[start, end]` (all inclusive).
78    #[inline]
79    pub fn overlaps(&self, start: u64, end: u64) -> bool {
80        self.max_ts >= start && self.min_ts <= end
81    }
82
83    /// Returns true iff `ts` falls within the handle's interval.
84    #[inline]
85    pub fn contains(&self, ts: u64) -> bool {
86        ts >= self.min_ts && ts <= self.max_ts
87    }
88}
89
90/// Internal state grouped under a single lock so `register()` takes one
91/// write-lock instead of three. BTree, BRIN ranges, count, and correlation
92/// stats are always updated together — splitting them only added contention.
93struct IndexState {
94    /// BTree keyed by `min_ts`. Multiple chunks may share the same `min_ts`,
95    /// so each key maps to a `Vec`.
96    entries: BTreeMap<u64, Vec<ChunkHandle>>,
97    /// BRIN block-range summaries. Last entry is the "open" range being
98    /// filled; earlier entries are sealed with `BRIN_CHUNKS_PER_RANGE` chunks.
99    brin_ranges: Vec<BrinRange>,
100    /// Number of registered handles. Decremented on `unregister`.
101    count: usize,
102    /// Number of monotonic insertions: `handle.min_ts >= prev_max_min_ts`.
103    /// Used to compute `index_correlation` — PostgreSQL's BRIN planner cost
104    /// model relies on this to decide whether BRIN is worth using.
105    monotonic_inserts: u64,
106    /// Total `register()` calls — denominator for correlation.
107    total_inserts: u64,
108    /// Highest `min_ts` seen so far (for monotonic check on next insert).
109    last_max_min_ts: u64,
110}
111
112/// Temporal BTree index keyed by `min_ts`, with a BRIN block-range layer.
113///
114/// Two-level structure mirrors PostgreSQL's BRIN architecture:
115///
116/// **Level 1 — BRIN block ranges**: each entry summarises the min/max
117/// timestamps of up to `BRIN_CHUNKS_PER_RANGE` consecutive registered
118/// chunks. A query first scans this tiny array (O(R/N) where R = chunks,
119/// N = BRIN_CHUNKS_PER_RANGE) and skips entire blocks that cannot intersect
120/// the query window.
121///
122/// **Level 2 — BTree**: keyed by `min_ts`, provides the precise
123/// O(log n + k) probe for surviving blocks.
124///
125/// **Locking strategy:** the BTree, BRIN ranges, count, and correlation
126/// stats live under a single `IndexState` lock so `register()` takes one
127/// write-lock per insert. Bloom has its own lock (separable write pattern),
128/// `global_max` is `AtomicU64` so unbounded range queries never block.
129pub struct TemporalIndex {
130    state: parking_lot::RwLock<IndexState>,
131    /// Bloom filter over registered `min_ts` values. Cheap negative check
132    /// before touching the BTree.
133    bloom: parking_lot::RwLock<BloomSegment>,
134    /// Highest `max_ts` seen so far. Atomic — lock-free for unbounded range
135    /// queries ("everything after T").
136    global_max: AtomicU64,
137}
138
139impl TemporalIndex {
140    /// Create an empty index sized for `expected_chunks` entries.
141    pub fn new(expected_chunks: usize) -> Self {
142        Self {
143            state: parking_lot::RwLock::new(IndexState {
144                entries: BTreeMap::new(),
145                brin_ranges: Vec::new(),
146                count: 0,
147                monotonic_inserts: 0,
148                total_inserts: 0,
149                last_max_min_ts: 0,
150            }),
151            bloom: parking_lot::RwLock::new(BloomSegment::with_capacity(expected_chunks.max(1024))),
152            global_max: AtomicU64::new(0),
153        }
154    }
155
156    /// Register a chunk handle. Safe to call from multiple threads.
157    ///
158    /// Single state write-lock + bloom lock + atomic CAS for `global_max`.
159    /// Updates BTree, BRIN block-range summary, count, and correlation
160    /// tracking atomically.
161    pub fn register(&self, handle: ChunkHandle) {
162        // Single state write-lock — covers BTree, BRIN ranges, count, correlation.
163        {
164            let mut s = self.state.write();
165            s.entries.entry(handle.min_ts).or_default().push(handle);
166            s.count += 1;
167
168            // Correlation tracking: monotonic if min_ts didn't go backwards.
169            s.total_inserts += 1;
170            if handle.min_ts >= s.last_max_min_ts {
171                s.monotonic_inserts += 1;
172            }
173            if handle.min_ts > s.last_max_min_ts {
174                s.last_max_min_ts = handle.min_ts;
175            }
176
177            // BRIN block-range update: widen open range, seal when full.
178            if let Some(last) = s.brin_ranges.last_mut() {
179                if handle.min_ts < last.min_ts {
180                    last.min_ts = handle.min_ts;
181                }
182                if handle.max_ts > last.max_ts {
183                    last.max_ts = handle.max_ts;
184                }
185                last.chunk_count += 1;
186                if last.chunk_count >= BRIN_CHUNKS_PER_RANGE {
187                    s.brin_ranges.push(BrinRange {
188                        min_ts: u64::MAX,
189                        max_ts: 0,
190                        chunk_count: 0,
191                    });
192                }
193            } else {
194                s.brin_ranges.push(BrinRange {
195                    min_ts: handle.min_ts,
196                    max_ts: handle.max_ts,
197                    chunk_count: 1,
198                });
199            }
200        }
201
202        // Bloom: separate lock (different read/write pattern).
203        self.bloom.write().insert(&handle.min_ts.to_le_bytes());
204
205        // Global max via atomic CAS — lock-free for unbounded range queries.
206        let mut current = self.global_max.load(Ordering::Relaxed);
207        while handle.max_ts > current {
208            match self.global_max.compare_exchange_weak(
209                current,
210                handle.max_ts,
211                Ordering::Release,
212                Ordering::Relaxed,
213            ) {
214                Ok(_) => break,
215                Err(actual) => current = actual,
216            }
217        }
218    }
219
220    /// Forget every handle with the given `chunk_id`. Does not touch the
221    /// bloom (bloom filters don't support removal — stale positives cost
222    /// at most an extra BTree probe that finds no match).
223    ///
224    /// BRIN ranges are NOT reconstructed on unregister (no desummarization,
225    /// same as PostgreSQL). Ranges may become slightly over-wide after
226    /// removals — acceptable, false positives only add a cheap BTree probe.
227    pub fn unregister(&self, chunk_id: u64) -> usize {
228        let mut removed = 0usize;
229        let mut s = self.state.write();
230        s.entries.retain(|_, handles| {
231            let before = handles.len();
232            handles.retain(|h| h.chunk_id != chunk_id);
233            removed += before - handles.len();
234            !handles.is_empty()
235        });
236        if removed > 0 {
237            s.count = s.count.saturating_sub(removed);
238        }
239        removed
240    }
241
242    /// Return every handle whose interval overlaps `[start, end]` (inclusive).
243    ///
244    /// Two-phase scan — mirrors PostgreSQL's BRIN scan path:
245    ///
246    /// **Phase 1 (BRIN pre-filter):** iterate the coarse block-range array.
247    /// Skip any block whose `[min_ts, max_ts]` does not intersect `[start, end]`.
248    /// For append-only workloads this prunes the majority of blocks in O(R/N).
249    ///
250    /// **Phase 2 (BTree probe):** for each surviving block, scan BTree keys
251    /// in `[range.min_ts, min(range.max_ts, end)]` and verify `max_ts >= start`.
252    ///
253    /// **Dedup:** out-of-order inserts can produce overlapping BRIN ranges
254    /// where a BTree entry falls under two surviving windows. We dedup by
255    /// `(series_id, chunk_id)` so callers never see duplicates.
256    pub fn chunks_overlapping(&self, start: u64, end: u64) -> Vec<ChunkHandle> {
257        if start > end {
258            return Vec::new();
259        }
260
261        let s = self.state.read();
262        let mut out = Vec::new();
263
264        if s.brin_ranges.is_empty() {
265            // No BRIN ranges yet — plain BTree scan (startup / low-volume path).
266            for (_, handles) in s.entries.range((Bound::Unbounded, Bound::Included(end))) {
267                for h in handles {
268                    if h.max_ts >= start {
269                        out.push(*h);
270                    }
271                }
272            }
273            return out;
274        }
275
276        // Phase 1: collect surviving block windows.
277        let mut surviving_windows: Vec<(u64, u64)> = Vec::new();
278        for r in s.brin_ranges.iter() {
279            if r.chunk_count == 0 {
280                continue;
281            }
282            if r.overlaps(start, end) {
283                surviving_windows.push((r.min_ts, r.max_ts));
284            }
285        }
286
287        if surviving_windows.is_empty() {
288            return Vec::new();
289        }
290
291        // Phase 2: BTree probe restricted to surviving windows + dedup.
292        // Out-of-order inserts can cause overlapping BRIN ranges → same
293        // BTree entry hit by multiple probes. Dedup by (series_id, chunk_id).
294        let mut seen: HashSet<(u64, u64)> = HashSet::new();
295        for (win_min, win_max) in surviving_windows {
296            let probe_end = win_max.min(end);
297            for (_, handles) in s
298                .entries
299                .range((Bound::Included(win_min), Bound::Included(probe_end)))
300            {
301                for h in handles {
302                    if h.max_ts >= start && seen.insert((h.series_id, h.chunk_id)) {
303                        out.push(*h);
304                    }
305                }
306            }
307        }
308        out
309    }
310
311    /// Return every handle whose interval contains `ts`.
312    pub fn chunks_at_timestamp(&self, ts: u64) -> Vec<ChunkHandle> {
313        self.chunks_overlapping(ts, ts)
314    }
315
316    /// Bloom-backed fast path: returns `false` iff no chunk with
317    /// `min_ts == ts` has ever been registered. Useful for dedup checks.
318    pub fn min_ts_possibly_registered(&self, ts: u64) -> bool {
319        self.bloom.read().contains(&ts.to_le_bytes())
320    }
321
322    /// Number of registered chunks.
323    pub fn len(&self) -> usize {
324        self.state.read().count
325    }
326
327    /// Is the index empty?
328    pub fn is_empty(&self) -> bool {
329        self.len() == 0
330    }
331
332    /// Highest `max_ts` seen so far. Cheap upper bound for unbounded range
333    /// queries ("everything after T"). Lock-free atomic load.
334    pub fn global_max_timestamp(&self) -> u64 {
335        self.global_max.load(Ordering::Acquire)
336    }
337
338    /// Number of BRIN block ranges (useful for diagnostics / EXPLAIN output).
339    pub fn brin_range_count(&self) -> usize {
340        self.state
341            .read()
342            .brin_ranges
343            .iter()
344            .filter(|r| r.chunk_count > 0)
345            .count()
346    }
347
348    /// Empirical correlation between insertion order and `min_ts` order,
349    /// in `[0.0, 1.0]`. PostgreSQL's planner uses this to decide whether
350    /// BRIN is worth using — values near 1.0 mean append-only monotonic
351    /// inserts (BRIN very effective), values near 0.0 mean random inserts
352    /// (BRIN degrades to a full scan and should be skipped).
353    ///
354    /// Returns `1.0` for an empty index (matches the historical hardcoded
355    /// optimistic default).
356    pub fn index_correlation(&self) -> f64 {
357        let s = self.state.read();
358        if s.total_inserts == 0 {
359            1.0
360        } else {
361            s.monotonic_inserts as f64 / s.total_inserts as f64
362        }
363    }
364
365    /// Reset the index. Used by tests and deserialize paths.
366    pub fn clear(&self) {
367        let mut s = self.state.write();
368        s.entries.clear();
369        s.brin_ranges.clear();
370        s.count = 0;
371        s.monotonic_inserts = 0;
372        s.total_inserts = 0;
373        s.last_max_min_ts = 0;
374        drop(s);
375        *self.bloom.write() = BloomSegment::with_capacity(1024);
376        self.global_max.store(0, Ordering::Release);
377    }
378}
379
380impl Default for TemporalIndex {
381    fn default() -> Self {
382        Self::new(1024)
383    }
384}
385
386impl HasBloom for TemporalIndex {
387    fn bloom_segment(&self) -> Option<&BloomSegment> {
388        // parking_lot RwLock still precludes handing out a raw reference.
389        None
390    }
391
392    fn definitely_absent(&self, key: &[u8]) -> bool {
393        self.bloom.read().definitely_absent(key)
394    }
395}
396
397impl IndexBase for TemporalIndex {
398    fn name(&self) -> &str {
399        "timeseries.temporal"
400    }
401
402    fn kind(&self) -> IndexKind {
403        IndexKind::Temporal
404    }
405
406    fn stats(&self) -> IndexStats {
407        let s = self.state.read();
408        let entries = s.count;
409        let distinct_keys = s.entries.len();
410        let brin_ranges = s.brin_ranges.iter().filter(|r| r.chunk_count > 0).count();
411        let correlation = if s.total_inserts == 0 {
412            1.0
413        } else {
414            s.monotonic_inserts as f64 / s.total_inserts as f64
415        };
416        IndexStats {
417            entries,
418            distinct_keys,
419            // Each BRIN range: 24 bytes (min_ts u64 + max_ts u64 + count usize).
420            // Each BTree entry: ~48 bytes (key u64 + Vec header + pointer).
421            approx_bytes: brin_ranges * 24 + distinct_keys * 48,
422            kind: IndexKind::Temporal,
423            has_bloom: true,
424            index_correlation: correlation,
425        }
426    }
427
428    fn definitely_absent(&self, key_bytes: &[u8]) -> bool {
429        <Self as HasBloom>::definitely_absent(self, key_bytes)
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436
437    fn handle(series: u64, chunk: u64, min_ts: u64, max_ts: u64) -> ChunkHandle {
438        ChunkHandle {
439            series_id: series,
440            chunk_id: chunk,
441            min_ts,
442            max_ts,
443        }
444    }
445
446    #[test]
447    fn overlaps_helper() {
448        let h = handle(1, 1, 100, 200);
449        assert!(h.overlaps(50, 150));
450        assert!(h.overlaps(150, 250));
451        assert!(h.overlaps(100, 200));
452        assert!(h.overlaps(120, 130));
453        assert!(!h.overlaps(0, 99));
454        assert!(!h.overlaps(201, 300));
455        assert!(h.contains(100));
456        assert!(h.contains(200));
457        assert!(!h.contains(201));
458    }
459
460    #[test]
461    fn register_and_overlap_query() {
462        let idx = TemporalIndex::new(16);
463        idx.register(handle(1, 1, 0, 100));
464        idx.register(handle(1, 2, 100, 200));
465        idx.register(handle(1, 3, 200, 300));
466        idx.register(handle(2, 4, 50, 150));
467
468        // Window [120, 180] overlaps chunk 2 (100..200) AND chunk 4
469        // (50..150). Chunk 4's max_ts=150 >= 120, so it's a real hit.
470        let hits = idx.chunks_overlapping(120, 180);
471        let hit_ids: Vec<u64> = hits.iter().map(|h| h.chunk_id).collect();
472        assert_eq!(hits.len(), 2);
473        assert!(hit_ids.contains(&2));
474        assert!(hit_ids.contains(&4));
475
476        // Window spanning multiple chunks
477        let hits = idx.chunks_overlapping(80, 220);
478        let ids: Vec<u64> = hits.iter().map(|h| h.chunk_id).collect();
479        assert!(ids.contains(&1));
480        assert!(ids.contains(&2));
481        assert!(ids.contains(&3));
482        assert!(ids.contains(&4));
483
484        // Window outside everything
485        assert!(idx.chunks_overlapping(500, 600).is_empty());
486    }
487
488    #[test]
489    fn point_lookup() {
490        let idx = TemporalIndex::new(16);
491        idx.register(handle(1, 1, 1000, 2000));
492        idx.register(handle(2, 2, 1500, 3000));
493
494        let at_1800 = idx.chunks_at_timestamp(1800);
495        assert_eq!(at_1800.len(), 2);
496
497        let at_2500 = idx.chunks_at_timestamp(2500);
498        assert_eq!(at_2500.len(), 1);
499        assert_eq!(at_2500[0].chunk_id, 2);
500
501        assert!(idx.chunks_at_timestamp(9999).is_empty());
502    }
503
504    #[test]
505    fn unregister_removes_handles() {
506        let idx = TemporalIndex::new(16);
507        idx.register(handle(1, 10, 0, 100));
508        idx.register(handle(1, 11, 100, 200));
509        assert_eq!(idx.len(), 2);
510
511        let removed = idx.unregister(10);
512        assert_eq!(removed, 1);
513        assert_eq!(idx.len(), 1);
514        assert!(idx.chunks_at_timestamp(50).is_empty());
515        assert_eq!(idx.chunks_at_timestamp(150).len(), 1);
516    }
517
518    #[test]
519    fn bloom_guards_min_ts_lookup() {
520        let idx = TemporalIndex::new(16);
521        idx.register(handle(1, 1, 5000, 6000));
522        assert!(idx.min_ts_possibly_registered(5000));
523        // Bloom may false-positive but the BTree lookup returns nothing.
524        assert!(idx.chunks_at_timestamp(999_999).is_empty());
525    }
526
527    #[test]
528    fn global_max_tracks_highest() {
529        let idx = TemporalIndex::new(16);
530        idx.register(handle(1, 1, 0, 100));
531        idx.register(handle(1, 2, 200, 500));
532        idx.register(handle(1, 3, 100, 300));
533        assert_eq!(idx.global_max_timestamp(), 500);
534    }
535
536    #[test]
537    fn stats_reflect_registrations() {
538        let idx = TemporalIndex::new(16);
539        idx.register(handle(1, 1, 0, 10));
540        idx.register(handle(1, 2, 0, 20));
541        idx.register(handle(1, 3, 100, 200));
542        let s = idx.stats();
543        assert_eq!(s.entries, 3);
544        // Two distinct min_ts keys: 0 and 100
545        assert_eq!(s.distinct_keys, 2);
546        assert_eq!(s.kind, IndexKind::Temporal);
547        assert!(s.has_bloom);
548    }
549
550    #[test]
551    fn clear_resets() {
552        let idx = TemporalIndex::new(16);
553        idx.register(handle(1, 1, 0, 100));
554        idx.clear();
555        assert!(idx.is_empty());
556        assert_eq!(idx.global_max_timestamp(), 0);
557        assert!(idx.chunks_at_timestamp(50).is_empty());
558    }
559
560    #[test]
561    fn reversed_range_returns_empty() {
562        let idx = TemporalIndex::new(16);
563        idx.register(handle(1, 1, 100, 200));
564        assert!(idx.chunks_overlapping(500, 100).is_empty());
565    }
566
567    #[test]
568    fn dedup_overlapping_brin_windows() {
569        // Out-of-order inserts that produce two BRIN windows whose [min,max]
570        // overlap. Without dedup the same chunk would be returned twice.
571        let idx = TemporalIndex::new(16);
572        // Force boundary at 128 chunks: register 130 chunks where the 129th
573        // has out-of-order min_ts that overlaps the previous range.
574        for i in 0..128u64 {
575            idx.register(handle(1, i, 1000 + i, 1000 + i + 5));
576        }
577        // Range 1 is sealed at [1000, 1132]. Range 2 opens.
578        idx.register(handle(1, 200, 1050, 1300)); // out-of-order, in range 2
579        idx.register(handle(1, 201, 1100, 1400));
580
581        // Query overlaps both ranges; chunk 200 (min_ts=1050) lives in range 2,
582        // but range 1 also covers min_ts=1050. Probes for both windows would
583        // return it without dedup.
584        let hits = idx.chunks_overlapping(1100, 1200);
585        let unique: HashSet<_> = hits.iter().map(|h| h.chunk_id).collect();
586        assert_eq!(hits.len(), unique.len(), "duplicates leaked through");
587    }
588
589    #[test]
590    fn correlation_starts_optimistic_then_tracks_inserts() {
591        let idx = TemporalIndex::new(16);
592        assert_eq!(idx.index_correlation(), 1.0); // empty default
593
594        // Pure monotonic = 1.0
595        for i in 0..10u64 {
596            idx.register(handle(1, i, i * 100, i * 100 + 50));
597        }
598        assert!((idx.index_correlation() - 1.0).abs() < 1e-9);
599
600        // Backfill an out-of-order chunk → drop below 1.0.
601        idx.register(handle(1, 99, 50, 100));
602        let c = idx.index_correlation();
603        assert!(c < 1.0 && c > 0.0, "correlation = {c}");
604    }
605
606    #[test]
607    fn brin_block_seal_boundary() {
608        // Verify the chunk at exactly BRIN_CHUNKS_PER_RANGE seals and the
609        // next register opens a fresh range correctly initialized.
610        let idx = TemporalIndex::new(BRIN_CHUNKS_PER_RANGE * 2);
611        for i in 0..BRIN_CHUNKS_PER_RANGE as u64 {
612            idx.register(handle(1, i, i * 10, i * 10 + 5));
613        }
614        assert_eq!(idx.brin_range_count(), 1);
615
616        // Next register opens range 2.
617        idx.register(handle(1, 999, 99_999, 100_000));
618        assert_eq!(idx.brin_range_count(), 2);
619
620        // Range 2 must be initialized with the new chunk's bounds (not u64::MAX/0).
621        let hits = idx.chunks_overlapping(99_999, 100_000);
622        assert_eq!(hits.len(), 1);
623        assert_eq!(hits[0].chunk_id, 999);
624    }
625
626    #[test]
627    fn concurrent_register() {
628        use std::sync::Arc;
629        use std::thread;
630
631        let idx = Arc::new(TemporalIndex::new(1024));
632        let mut handles = vec![];
633        for t in 0..4u64 {
634            let idx_c = Arc::clone(&idx);
635            handles.push(thread::spawn(move || {
636                for i in 0..100u64 {
637                    idx_c.register(handle(t, t * 1000 + i, i * 10, i * 10 + 9));
638                }
639            }));
640        }
641        for h in handles {
642            h.join().unwrap();
643        }
644        assert_eq!(idx.len(), 400);
645        // At ts=45 every thread's chunk with i=4 contains it (40..49)
646        let hits = idx.chunks_at_timestamp(45);
647        assert_eq!(hits.len(), 4);
648    }
649}