reddb_server/storage/timeseries/temporal_index.rs
1//! Global temporal index over timeseries chunks.
2//!
3//! Today each [`super::chunk::TimeSeriesChunk`] tracks its own
4//! `min_timestamp` / `max_timestamp`, but there is no structure above the
5//! chunk set that answers "which chunks overlap `[start, end]`?" efficiently.
6//! Callers fall back to a linear scan over every chunk.
7//!
8//! This module introduces [`TemporalIndex`] — a `BTreeMap` keyed by each
9//! chunk's `min_ts`, paired with a bloom filter of registered chunk start
10//! timestamps. It powers:
11//!
12//! - `chunks_overlapping(start, end)` → O(log n + k) interval probe
13//! - `chunks_at_timestamp(ts)` → point lookup
14//! - cross-structure **temporal join** (tables ↔ timeseries) once the
15//! planner can ask "give me chunks around this row's event_ts"
16//!
17//! The index stores opaque [`ChunkHandle`]s — callers decide what `chunk_id`
18//! and `series_id` mean. That keeps this module independent of the chunk
19//! lifecycle (growing/sealed/flushed) and the on-disk layout.
20//!
21//! Implements [`crate::storage::index::IndexBase`] and exposes a bloom via
22//! [`crate::storage::index::HasBloom`] so the query planner and segment
23//! layer can prune uniformly.
24
25use std::collections::{BTreeMap, HashSet};
26use std::ops::Bound;
27use std::sync::atomic::{AtomicU64, Ordering};
28
29use crate::storage::index::{BloomSegment, HasBloom, IndexBase, IndexKind, IndexStats};
30
31/// Number of chunks grouped into a single BRIN block range.
32/// Coarser granularity = fewer range entries, faster pre-filter at the cost
33/// of reduced precision (may read slightly more chunks than necessary).
34/// Equivalent to PostgreSQL's `pages_per_range` parameter.
35const BRIN_CHUNKS_PER_RANGE: usize = 128;
36
37/// A BRIN block range: summarises min/max timestamp across up to
38/// `BRIN_CHUNKS_PER_RANGE` consecutive chunks. Query planning skips an
39/// entire range in O(1) when the range's `[min_ts, max_ts]` does not
40/// intersect the query window — identical to PostgreSQL's BRIN MINMAX scan.
41#[derive(Debug, Clone, Copy)]
42struct BrinRange {
43 /// Minimum timestamp across all chunks in this range.
44 min_ts: u64,
45 /// Maximum timestamp across all chunks in this range.
46 max_ts: u64,
47 /// How many chunk handles fall in this range (≤ BRIN_CHUNKS_PER_RANGE).
48 chunk_count: usize,
49}
50
51impl BrinRange {
52 #[inline]
53 fn overlaps(&self, start: u64, end: u64) -> bool {
54 self.max_ts >= start && self.min_ts <= end
55 }
56}
57
58/// Opaque handle describing a chunk the index tracks.
59///
60/// Callers are free to interpret `series_id`/`chunk_id`. Only `min_ts` and
61/// `max_ts` are consulted for query planning.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
63pub struct ChunkHandle {
64 /// Opaque identifier for the series (metric + tag combination hash, for
65 /// example). Passed through unchanged.
66 pub series_id: u64,
67 /// Opaque identifier for the chunk within its series.
68 pub chunk_id: u64,
69 /// Earliest timestamp present in the chunk, inclusive.
70 pub min_ts: u64,
71 /// Latest timestamp present in the chunk, inclusive.
72 pub max_ts: u64,
73}
74
75impl ChunkHandle {
76 /// Returns true iff the handle's `[min_ts, max_ts]` interval intersects
77 /// `[start, end]` (all inclusive).
78 #[inline]
79 pub fn overlaps(&self, start: u64, end: u64) -> bool {
80 self.max_ts >= start && self.min_ts <= end
81 }
82
83 /// Returns true iff `ts` falls within the handle's interval.
84 #[inline]
85 pub fn contains(&self, ts: u64) -> bool {
86 ts >= self.min_ts && ts <= self.max_ts
87 }
88}
89
90/// Internal state grouped under a single lock so `register()` takes one
91/// write-lock instead of three. BTree, BRIN ranges, count, and correlation
92/// stats are always updated together — splitting them only added contention.
93struct IndexState {
94 /// BTree keyed by `min_ts`. Multiple chunks may share the same `min_ts`,
95 /// so each key maps to a `Vec`.
96 entries: BTreeMap<u64, Vec<ChunkHandle>>,
97 /// BRIN block-range summaries. Last entry is the "open" range being
98 /// filled; earlier entries are sealed with `BRIN_CHUNKS_PER_RANGE` chunks.
99 brin_ranges: Vec<BrinRange>,
100 /// Number of registered handles. Decremented on `unregister`.
101 count: usize,
102 /// Number of monotonic insertions: `handle.min_ts >= prev_max_min_ts`.
103 /// Used to compute `index_correlation` — PostgreSQL's BRIN planner cost
104 /// model relies on this to decide whether BRIN is worth using.
105 monotonic_inserts: u64,
106 /// Total `register()` calls — denominator for correlation.
107 total_inserts: u64,
108 /// Highest `min_ts` seen so far (for monotonic check on next insert).
109 last_max_min_ts: u64,
110}
111
112/// Temporal BTree index keyed by `min_ts`, with a BRIN block-range layer.
113///
114/// Two-level structure mirrors PostgreSQL's BRIN architecture:
115///
116/// **Level 1 — BRIN block ranges**: each entry summarises the min/max
117/// timestamps of up to `BRIN_CHUNKS_PER_RANGE` consecutive registered
118/// chunks. A query first scans this tiny array (O(R/N) where R = chunks,
119/// N = BRIN_CHUNKS_PER_RANGE) and skips entire blocks that cannot intersect
120/// the query window.
121///
122/// **Level 2 — BTree**: keyed by `min_ts`, provides the precise
123/// O(log n + k) probe for surviving blocks.
124///
125/// **Locking strategy:** the BTree, BRIN ranges, count, and correlation
126/// stats live under a single `IndexState` lock so `register()` takes one
127/// write-lock per insert. Bloom has its own lock (separable write pattern),
128/// `global_max` is `AtomicU64` so unbounded range queries never block.
129pub struct TemporalIndex {
130 state: parking_lot::RwLock<IndexState>,
131 /// Bloom filter over registered `min_ts` values. Cheap negative check
132 /// before touching the BTree.
133 bloom: parking_lot::RwLock<BloomSegment>,
134 /// Highest `max_ts` seen so far. Atomic — lock-free for unbounded range
135 /// queries ("everything after T").
136 global_max: AtomicU64,
137}
138
139impl TemporalIndex {
140 /// Create an empty index sized for `expected_chunks` entries.
141 pub fn new(expected_chunks: usize) -> Self {
142 Self {
143 state: parking_lot::RwLock::new(IndexState {
144 entries: BTreeMap::new(),
145 brin_ranges: Vec::new(),
146 count: 0,
147 monotonic_inserts: 0,
148 total_inserts: 0,
149 last_max_min_ts: 0,
150 }),
151 bloom: parking_lot::RwLock::new(BloomSegment::with_capacity(expected_chunks.max(1024))),
152 global_max: AtomicU64::new(0),
153 }
154 }
155
156 /// Register a chunk handle. Safe to call from multiple threads.
157 ///
158 /// Single state write-lock + bloom lock + atomic CAS for `global_max`.
159 /// Updates BTree, BRIN block-range summary, count, and correlation
160 /// tracking atomically.
161 pub fn register(&self, handle: ChunkHandle) {
162 // Single state write-lock — covers BTree, BRIN ranges, count, correlation.
163 {
164 let mut s = self.state.write();
165 s.entries.entry(handle.min_ts).or_default().push(handle);
166 s.count += 1;
167
168 // Correlation tracking: monotonic if min_ts didn't go backwards.
169 s.total_inserts += 1;
170 if handle.min_ts >= s.last_max_min_ts {
171 s.monotonic_inserts += 1;
172 }
173 if handle.min_ts > s.last_max_min_ts {
174 s.last_max_min_ts = handle.min_ts;
175 }
176
177 // BRIN block-range update: widen open range, seal when full.
178 if let Some(last) = s.brin_ranges.last_mut() {
179 if handle.min_ts < last.min_ts {
180 last.min_ts = handle.min_ts;
181 }
182 if handle.max_ts > last.max_ts {
183 last.max_ts = handle.max_ts;
184 }
185 last.chunk_count += 1;
186 if last.chunk_count >= BRIN_CHUNKS_PER_RANGE {
187 s.brin_ranges.push(BrinRange {
188 min_ts: u64::MAX,
189 max_ts: 0,
190 chunk_count: 0,
191 });
192 }
193 } else {
194 s.brin_ranges.push(BrinRange {
195 min_ts: handle.min_ts,
196 max_ts: handle.max_ts,
197 chunk_count: 1,
198 });
199 }
200 }
201
202 // Bloom: separate lock (different read/write pattern).
203 self.bloom.write().insert(&handle.min_ts.to_le_bytes());
204
205 // Global max via atomic CAS — lock-free for unbounded range queries.
206 let mut current = self.global_max.load(Ordering::Relaxed);
207 while handle.max_ts > current {
208 match self.global_max.compare_exchange_weak(
209 current,
210 handle.max_ts,
211 Ordering::Release,
212 Ordering::Relaxed,
213 ) {
214 Ok(_) => break,
215 Err(actual) => current = actual,
216 }
217 }
218 }
219
220 /// Forget every handle with the given `chunk_id`. Does not touch the
221 /// bloom (bloom filters don't support removal — stale positives cost
222 /// at most an extra BTree probe that finds no match).
223 ///
224 /// BRIN ranges are NOT reconstructed on unregister (no desummarization,
225 /// same as PostgreSQL). Ranges may become slightly over-wide after
226 /// removals — acceptable, false positives only add a cheap BTree probe.
227 pub fn unregister(&self, chunk_id: u64) -> usize {
228 let mut removed = 0usize;
229 let mut s = self.state.write();
230 s.entries.retain(|_, handles| {
231 let before = handles.len();
232 handles.retain(|h| h.chunk_id != chunk_id);
233 removed += before - handles.len();
234 !handles.is_empty()
235 });
236 if removed > 0 {
237 s.count = s.count.saturating_sub(removed);
238 }
239 removed
240 }
241
242 /// Return every handle whose interval overlaps `[start, end]` (inclusive).
243 ///
244 /// Two-phase scan — mirrors PostgreSQL's BRIN scan path:
245 ///
246 /// **Phase 1 (BRIN pre-filter):** iterate the coarse block-range array.
247 /// Skip any block whose `[min_ts, max_ts]` does not intersect `[start, end]`.
248 /// For append-only workloads this prunes the majority of blocks in O(R/N).
249 ///
250 /// **Phase 2 (BTree probe):** for each surviving block, scan BTree keys
251 /// in `[range.min_ts, min(range.max_ts, end)]` and verify `max_ts >= start`.
252 ///
253 /// **Dedup:** out-of-order inserts can produce overlapping BRIN ranges
254 /// where a BTree entry falls under two surviving windows. We dedup by
255 /// `(series_id, chunk_id)` so callers never see duplicates.
256 pub fn chunks_overlapping(&self, start: u64, end: u64) -> Vec<ChunkHandle> {
257 if start > end {
258 return Vec::new();
259 }
260
261 let s = self.state.read();
262 let mut out = Vec::new();
263
264 if s.brin_ranges.is_empty() {
265 // No BRIN ranges yet — plain BTree scan (startup / low-volume path).
266 for (_, handles) in s.entries.range((Bound::Unbounded, Bound::Included(end))) {
267 for h in handles {
268 if h.max_ts >= start {
269 out.push(*h);
270 }
271 }
272 }
273 return out;
274 }
275
276 // Phase 1: collect surviving block windows.
277 let mut surviving_windows: Vec<(u64, u64)> = Vec::new();
278 for r in s.brin_ranges.iter() {
279 if r.chunk_count == 0 {
280 continue;
281 }
282 if r.overlaps(start, end) {
283 surviving_windows.push((r.min_ts, r.max_ts));
284 }
285 }
286
287 if surviving_windows.is_empty() {
288 return Vec::new();
289 }
290
291 // Phase 2: BTree probe restricted to surviving windows + dedup.
292 // Out-of-order inserts can cause overlapping BRIN ranges → same
293 // BTree entry hit by multiple probes. Dedup by (series_id, chunk_id).
294 let mut seen: HashSet<(u64, u64)> = HashSet::new();
295 for (win_min, win_max) in surviving_windows {
296 let probe_end = win_max.min(end);
297 for (_, handles) in s
298 .entries
299 .range((Bound::Included(win_min), Bound::Included(probe_end)))
300 {
301 for h in handles {
302 if h.max_ts >= start && seen.insert((h.series_id, h.chunk_id)) {
303 out.push(*h);
304 }
305 }
306 }
307 }
308 out
309 }
310
311 /// Return every handle whose interval contains `ts`.
312 pub fn chunks_at_timestamp(&self, ts: u64) -> Vec<ChunkHandle> {
313 self.chunks_overlapping(ts, ts)
314 }
315
316 /// Bloom-backed fast path: returns `false` iff no chunk with
317 /// `min_ts == ts` has ever been registered. Useful for dedup checks.
318 pub fn min_ts_possibly_registered(&self, ts: u64) -> bool {
319 self.bloom.read().contains(&ts.to_le_bytes())
320 }
321
322 /// Number of registered chunks.
323 pub fn len(&self) -> usize {
324 self.state.read().count
325 }
326
327 /// Is the index empty?
328 pub fn is_empty(&self) -> bool {
329 self.len() == 0
330 }
331
332 /// Highest `max_ts` seen so far. Cheap upper bound for unbounded range
333 /// queries ("everything after T"). Lock-free atomic load.
334 pub fn global_max_timestamp(&self) -> u64 {
335 self.global_max.load(Ordering::Acquire)
336 }
337
338 /// Number of BRIN block ranges (useful for diagnostics / EXPLAIN output).
339 pub fn brin_range_count(&self) -> usize {
340 self.state
341 .read()
342 .brin_ranges
343 .iter()
344 .filter(|r| r.chunk_count > 0)
345 .count()
346 }
347
348 /// Empirical correlation between insertion order and `min_ts` order,
349 /// in `[0.0, 1.0]`. PostgreSQL's planner uses this to decide whether
350 /// BRIN is worth using — values near 1.0 mean append-only monotonic
351 /// inserts (BRIN very effective), values near 0.0 mean random inserts
352 /// (BRIN degrades to a full scan and should be skipped).
353 ///
354 /// Returns `1.0` for an empty index (matches the historical hardcoded
355 /// optimistic default).
356 pub fn index_correlation(&self) -> f64 {
357 let s = self.state.read();
358 if s.total_inserts == 0 {
359 1.0
360 } else {
361 s.monotonic_inserts as f64 / s.total_inserts as f64
362 }
363 }
364
365 /// Reset the index. Used by tests and deserialize paths.
366 pub fn clear(&self) {
367 let mut s = self.state.write();
368 s.entries.clear();
369 s.brin_ranges.clear();
370 s.count = 0;
371 s.monotonic_inserts = 0;
372 s.total_inserts = 0;
373 s.last_max_min_ts = 0;
374 drop(s);
375 *self.bloom.write() = BloomSegment::with_capacity(1024);
376 self.global_max.store(0, Ordering::Release);
377 }
378}
379
380impl Default for TemporalIndex {
381 fn default() -> Self {
382 Self::new(1024)
383 }
384}
385
386impl HasBloom for TemporalIndex {
387 fn bloom_segment(&self) -> Option<&BloomSegment> {
388 // parking_lot RwLock still precludes handing out a raw reference.
389 None
390 }
391
392 fn definitely_absent(&self, key: &[u8]) -> bool {
393 self.bloom.read().definitely_absent(key)
394 }
395}
396
397impl IndexBase for TemporalIndex {
398 fn name(&self) -> &str {
399 "timeseries.temporal"
400 }
401
402 fn kind(&self) -> IndexKind {
403 IndexKind::Temporal
404 }
405
406 fn stats(&self) -> IndexStats {
407 let s = self.state.read();
408 let entries = s.count;
409 let distinct_keys = s.entries.len();
410 let brin_ranges = s.brin_ranges.iter().filter(|r| r.chunk_count > 0).count();
411 let correlation = if s.total_inserts == 0 {
412 1.0
413 } else {
414 s.monotonic_inserts as f64 / s.total_inserts as f64
415 };
416 IndexStats {
417 entries,
418 distinct_keys,
419 // Each BRIN range: 24 bytes (min_ts u64 + max_ts u64 + count usize).
420 // Each BTree entry: ~48 bytes (key u64 + Vec header + pointer).
421 approx_bytes: brin_ranges * 24 + distinct_keys * 48,
422 kind: IndexKind::Temporal,
423 has_bloom: true,
424 index_correlation: correlation,
425 }
426 }
427
428 fn definitely_absent(&self, key_bytes: &[u8]) -> bool {
429 <Self as HasBloom>::definitely_absent(self, key_bytes)
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 fn handle(series: u64, chunk: u64, min_ts: u64, max_ts: u64) -> ChunkHandle {
438 ChunkHandle {
439 series_id: series,
440 chunk_id: chunk,
441 min_ts,
442 max_ts,
443 }
444 }
445
446 #[test]
447 fn overlaps_helper() {
448 let h = handle(1, 1, 100, 200);
449 assert!(h.overlaps(50, 150));
450 assert!(h.overlaps(150, 250));
451 assert!(h.overlaps(100, 200));
452 assert!(h.overlaps(120, 130));
453 assert!(!h.overlaps(0, 99));
454 assert!(!h.overlaps(201, 300));
455 assert!(h.contains(100));
456 assert!(h.contains(200));
457 assert!(!h.contains(201));
458 }
459
460 #[test]
461 fn register_and_overlap_query() {
462 let idx = TemporalIndex::new(16);
463 idx.register(handle(1, 1, 0, 100));
464 idx.register(handle(1, 2, 100, 200));
465 idx.register(handle(1, 3, 200, 300));
466 idx.register(handle(2, 4, 50, 150));
467
468 // Window [120, 180] overlaps chunk 2 (100..200) AND chunk 4
469 // (50..150). Chunk 4's max_ts=150 >= 120, so it's a real hit.
470 let hits = idx.chunks_overlapping(120, 180);
471 let hit_ids: Vec<u64> = hits.iter().map(|h| h.chunk_id).collect();
472 assert_eq!(hits.len(), 2);
473 assert!(hit_ids.contains(&2));
474 assert!(hit_ids.contains(&4));
475
476 // Window spanning multiple chunks
477 let hits = idx.chunks_overlapping(80, 220);
478 let ids: Vec<u64> = hits.iter().map(|h| h.chunk_id).collect();
479 assert!(ids.contains(&1));
480 assert!(ids.contains(&2));
481 assert!(ids.contains(&3));
482 assert!(ids.contains(&4));
483
484 // Window outside everything
485 assert!(idx.chunks_overlapping(500, 600).is_empty());
486 }
487
488 #[test]
489 fn point_lookup() {
490 let idx = TemporalIndex::new(16);
491 idx.register(handle(1, 1, 1000, 2000));
492 idx.register(handle(2, 2, 1500, 3000));
493
494 let at_1800 = idx.chunks_at_timestamp(1800);
495 assert_eq!(at_1800.len(), 2);
496
497 let at_2500 = idx.chunks_at_timestamp(2500);
498 assert_eq!(at_2500.len(), 1);
499 assert_eq!(at_2500[0].chunk_id, 2);
500
501 assert!(idx.chunks_at_timestamp(9999).is_empty());
502 }
503
504 #[test]
505 fn unregister_removes_handles() {
506 let idx = TemporalIndex::new(16);
507 idx.register(handle(1, 10, 0, 100));
508 idx.register(handle(1, 11, 100, 200));
509 assert_eq!(idx.len(), 2);
510
511 let removed = idx.unregister(10);
512 assert_eq!(removed, 1);
513 assert_eq!(idx.len(), 1);
514 assert!(idx.chunks_at_timestamp(50).is_empty());
515 assert_eq!(idx.chunks_at_timestamp(150).len(), 1);
516 }
517
518 #[test]
519 fn bloom_guards_min_ts_lookup() {
520 let idx = TemporalIndex::new(16);
521 idx.register(handle(1, 1, 5000, 6000));
522 assert!(idx.min_ts_possibly_registered(5000));
523 // Bloom may false-positive but the BTree lookup returns nothing.
524 assert!(idx.chunks_at_timestamp(999_999).is_empty());
525 }
526
527 #[test]
528 fn global_max_tracks_highest() {
529 let idx = TemporalIndex::new(16);
530 idx.register(handle(1, 1, 0, 100));
531 idx.register(handle(1, 2, 200, 500));
532 idx.register(handle(1, 3, 100, 300));
533 assert_eq!(idx.global_max_timestamp(), 500);
534 }
535
536 #[test]
537 fn stats_reflect_registrations() {
538 let idx = TemporalIndex::new(16);
539 idx.register(handle(1, 1, 0, 10));
540 idx.register(handle(1, 2, 0, 20));
541 idx.register(handle(1, 3, 100, 200));
542 let s = idx.stats();
543 assert_eq!(s.entries, 3);
544 // Two distinct min_ts keys: 0 and 100
545 assert_eq!(s.distinct_keys, 2);
546 assert_eq!(s.kind, IndexKind::Temporal);
547 assert!(s.has_bloom);
548 }
549
550 #[test]
551 fn clear_resets() {
552 let idx = TemporalIndex::new(16);
553 idx.register(handle(1, 1, 0, 100));
554 idx.clear();
555 assert!(idx.is_empty());
556 assert_eq!(idx.global_max_timestamp(), 0);
557 assert!(idx.chunks_at_timestamp(50).is_empty());
558 }
559
560 #[test]
561 fn reversed_range_returns_empty() {
562 let idx = TemporalIndex::new(16);
563 idx.register(handle(1, 1, 100, 200));
564 assert!(idx.chunks_overlapping(500, 100).is_empty());
565 }
566
567 #[test]
568 fn dedup_overlapping_brin_windows() {
569 // Out-of-order inserts that produce two BRIN windows whose [min,max]
570 // overlap. Without dedup the same chunk would be returned twice.
571 let idx = TemporalIndex::new(16);
572 // Force boundary at 128 chunks: register 130 chunks where the 129th
573 // has out-of-order min_ts that overlaps the previous range.
574 for i in 0..128u64 {
575 idx.register(handle(1, i, 1000 + i, 1000 + i + 5));
576 }
577 // Range 1 is sealed at [1000, 1132]. Range 2 opens.
578 idx.register(handle(1, 200, 1050, 1300)); // out-of-order, in range 2
579 idx.register(handle(1, 201, 1100, 1400));
580
581 // Query overlaps both ranges; chunk 200 (min_ts=1050) lives in range 2,
582 // but range 1 also covers min_ts=1050. Probes for both windows would
583 // return it without dedup.
584 let hits = idx.chunks_overlapping(1100, 1200);
585 let unique: HashSet<_> = hits.iter().map(|h| h.chunk_id).collect();
586 assert_eq!(hits.len(), unique.len(), "duplicates leaked through");
587 }
588
589 #[test]
590 fn correlation_starts_optimistic_then_tracks_inserts() {
591 let idx = TemporalIndex::new(16);
592 assert_eq!(idx.index_correlation(), 1.0); // empty default
593
594 // Pure monotonic = 1.0
595 for i in 0..10u64 {
596 idx.register(handle(1, i, i * 100, i * 100 + 50));
597 }
598 assert!((idx.index_correlation() - 1.0).abs() < 1e-9);
599
600 // Backfill an out-of-order chunk → drop below 1.0.
601 idx.register(handle(1, 99, 50, 100));
602 let c = idx.index_correlation();
603 assert!(c < 1.0 && c > 0.0, "correlation = {c}");
604 }
605
606 #[test]
607 fn brin_block_seal_boundary() {
608 // Verify the chunk at exactly BRIN_CHUNKS_PER_RANGE seals and the
609 // next register opens a fresh range correctly initialized.
610 let idx = TemporalIndex::new(BRIN_CHUNKS_PER_RANGE * 2);
611 for i in 0..BRIN_CHUNKS_PER_RANGE as u64 {
612 idx.register(handle(1, i, i * 10, i * 10 + 5));
613 }
614 assert_eq!(idx.brin_range_count(), 1);
615
616 // Next register opens range 2.
617 idx.register(handle(1, 999, 99_999, 100_000));
618 assert_eq!(idx.brin_range_count(), 2);
619
620 // Range 2 must be initialized with the new chunk's bounds (not u64::MAX/0).
621 let hits = idx.chunks_overlapping(99_999, 100_000);
622 assert_eq!(hits.len(), 1);
623 assert_eq!(hits[0].chunk_id, 999);
624 }
625
626 #[test]
627 fn concurrent_register() {
628 use std::sync::Arc;
629 use std::thread;
630
631 let idx = Arc::new(TemporalIndex::new(1024));
632 let mut handles = vec![];
633 for t in 0..4u64 {
634 let idx_c = Arc::clone(&idx);
635 handles.push(thread::spawn(move || {
636 for i in 0..100u64 {
637 idx_c.register(handle(t, t * 1000 + i, i * 10, i * 10 + 9));
638 }
639 }));
640 }
641 for h in handles {
642 h.join().unwrap();
643 }
644 assert_eq!(idx.len(), 400);
645 // At ts=45 every thread's chunk with i=4 contains it (40..49)
646 let hits = idx.chunks_at_timestamp(45);
647 assert_eq!(hits.len(), 4);
648 }
649}