use std::collections::{BTreeMap, HashSet};
use std::ops::Bound;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::storage::index::{BloomSegment, HasBloom, IndexBase, IndexKind, IndexStats};
const BRIN_CHUNKS_PER_RANGE: usize = 128;
#[derive(Debug, Clone, Copy)]
struct BrinRange {
min_ts: u64,
max_ts: u64,
chunk_count: usize,
}
impl BrinRange {
#[inline]
fn overlaps(&self, start: u64, end: u64) -> bool {
self.max_ts >= start && self.min_ts <= end
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ChunkHandle {
pub series_id: u64,
pub chunk_id: u64,
pub min_ts: u64,
pub max_ts: u64,
}
impl ChunkHandle {
#[inline]
pub fn overlaps(&self, start: u64, end: u64) -> bool {
self.max_ts >= start && self.min_ts <= end
}
#[inline]
pub fn contains(&self, ts: u64) -> bool {
ts >= self.min_ts && ts <= self.max_ts
}
}
struct IndexState {
entries: BTreeMap<u64, Vec<ChunkHandle>>,
brin_ranges: Vec<BrinRange>,
count: usize,
monotonic_inserts: u64,
total_inserts: u64,
last_max_min_ts: u64,
}
pub struct TemporalIndex {
state: parking_lot::RwLock<IndexState>,
bloom: parking_lot::RwLock<BloomSegment>,
global_max: AtomicU64,
}
impl TemporalIndex {
pub fn new(expected_chunks: usize) -> Self {
Self {
state: parking_lot::RwLock::new(IndexState {
entries: BTreeMap::new(),
brin_ranges: Vec::new(),
count: 0,
monotonic_inserts: 0,
total_inserts: 0,
last_max_min_ts: 0,
}),
bloom: parking_lot::RwLock::new(BloomSegment::with_capacity(expected_chunks.max(1024))),
global_max: AtomicU64::new(0),
}
}
pub fn register(&self, handle: ChunkHandle) {
{
let mut s = self.state.write();
s.entries.entry(handle.min_ts).or_default().push(handle);
s.count += 1;
s.total_inserts += 1;
if handle.min_ts >= s.last_max_min_ts {
s.monotonic_inserts += 1;
}
if handle.min_ts > s.last_max_min_ts {
s.last_max_min_ts = handle.min_ts;
}
if let Some(last) = s.brin_ranges.last_mut() {
if handle.min_ts < last.min_ts {
last.min_ts = handle.min_ts;
}
if handle.max_ts > last.max_ts {
last.max_ts = handle.max_ts;
}
last.chunk_count += 1;
if last.chunk_count >= BRIN_CHUNKS_PER_RANGE {
s.brin_ranges.push(BrinRange {
min_ts: u64::MAX,
max_ts: 0,
chunk_count: 0,
});
}
} else {
s.brin_ranges.push(BrinRange {
min_ts: handle.min_ts,
max_ts: handle.max_ts,
chunk_count: 1,
});
}
}
self.bloom.write().insert(&handle.min_ts.to_le_bytes());
let mut current = self.global_max.load(Ordering::Relaxed);
while handle.max_ts > current {
match self.global_max.compare_exchange_weak(
current,
handle.max_ts,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
pub fn unregister(&self, chunk_id: u64) -> usize {
let mut removed = 0usize;
let mut s = self.state.write();
s.entries.retain(|_, handles| {
let before = handles.len();
handles.retain(|h| h.chunk_id != chunk_id);
removed += before - handles.len();
!handles.is_empty()
});
if removed > 0 {
s.count = s.count.saturating_sub(removed);
}
removed
}
pub fn chunks_overlapping(&self, start: u64, end: u64) -> Vec<ChunkHandle> {
if start > end {
return Vec::new();
}
let s = self.state.read();
let mut out = Vec::new();
if s.brin_ranges.is_empty() {
for (_, handles) in s.entries.range((Bound::Unbounded, Bound::Included(end))) {
for h in handles {
if h.max_ts >= start {
out.push(*h);
}
}
}
return out;
}
let mut surviving_windows: Vec<(u64, u64)> = Vec::new();
for r in s.brin_ranges.iter() {
if r.chunk_count == 0 {
continue;
}
if r.overlaps(start, end) {
surviving_windows.push((r.min_ts, r.max_ts));
}
}
if surviving_windows.is_empty() {
return Vec::new();
}
let mut seen: HashSet<(u64, u64)> = HashSet::new();
for (win_min, win_max) in surviving_windows {
let probe_end = win_max.min(end);
for (_, handles) in s
.entries
.range((Bound::Included(win_min), Bound::Included(probe_end)))
{
for h in handles {
if h.max_ts >= start && seen.insert((h.series_id, h.chunk_id)) {
out.push(*h);
}
}
}
}
out
}
pub fn chunks_at_timestamp(&self, ts: u64) -> Vec<ChunkHandle> {
self.chunks_overlapping(ts, ts)
}
pub fn min_ts_possibly_registered(&self, ts: u64) -> bool {
self.bloom.read().contains(&ts.to_le_bytes())
}
pub fn len(&self) -> usize {
self.state.read().count
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn global_max_timestamp(&self) -> u64 {
self.global_max.load(Ordering::Acquire)
}
pub fn brin_range_count(&self) -> usize {
self.state
.read()
.brin_ranges
.iter()
.filter(|r| r.chunk_count > 0)
.count()
}
pub fn index_correlation(&self) -> f64 {
let s = self.state.read();
if s.total_inserts == 0 {
1.0
} else {
s.monotonic_inserts as f64 / s.total_inserts as f64
}
}
pub fn clear(&self) {
let mut s = self.state.write();
s.entries.clear();
s.brin_ranges.clear();
s.count = 0;
s.monotonic_inserts = 0;
s.total_inserts = 0;
s.last_max_min_ts = 0;
drop(s);
*self.bloom.write() = BloomSegment::with_capacity(1024);
self.global_max.store(0, Ordering::Release);
}
}
impl Default for TemporalIndex {
fn default() -> Self {
Self::new(1024)
}
}
impl HasBloom for TemporalIndex {
fn bloom_segment(&self) -> Option<&BloomSegment> {
None
}
fn definitely_absent(&self, key: &[u8]) -> bool {
self.bloom.read().definitely_absent(key)
}
}
impl IndexBase for TemporalIndex {
fn name(&self) -> &str {
"timeseries.temporal"
}
fn kind(&self) -> IndexKind {
IndexKind::Temporal
}
fn stats(&self) -> IndexStats {
let s = self.state.read();
let entries = s.count;
let distinct_keys = s.entries.len();
let brin_ranges = s.brin_ranges.iter().filter(|r| r.chunk_count > 0).count();
let correlation = if s.total_inserts == 0 {
1.0
} else {
s.monotonic_inserts as f64 / s.total_inserts as f64
};
IndexStats {
entries,
distinct_keys,
approx_bytes: brin_ranges * 24 + distinct_keys * 48,
kind: IndexKind::Temporal,
has_bloom: true,
index_correlation: correlation,
}
}
fn definitely_absent(&self, key_bytes: &[u8]) -> bool {
<Self as HasBloom>::definitely_absent(self, key_bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn handle(series: u64, chunk: u64, min_ts: u64, max_ts: u64) -> ChunkHandle {
ChunkHandle {
series_id: series,
chunk_id: chunk,
min_ts,
max_ts,
}
}
#[test]
fn overlaps_helper() {
let h = handle(1, 1, 100, 200);
assert!(h.overlaps(50, 150));
assert!(h.overlaps(150, 250));
assert!(h.overlaps(100, 200));
assert!(h.overlaps(120, 130));
assert!(!h.overlaps(0, 99));
assert!(!h.overlaps(201, 300));
assert!(h.contains(100));
assert!(h.contains(200));
assert!(!h.contains(201));
}
#[test]
fn register_and_overlap_query() {
let idx = TemporalIndex::new(16);
idx.register(handle(1, 1, 0, 100));
idx.register(handle(1, 2, 100, 200));
idx.register(handle(1, 3, 200, 300));
idx.register(handle(2, 4, 50, 150));
let hits = idx.chunks_overlapping(120, 180);
let hit_ids: Vec<u64> = hits.iter().map(|h| h.chunk_id).collect();
assert_eq!(hits.len(), 2);
assert!(hit_ids.contains(&2));
assert!(hit_ids.contains(&4));
let hits = idx.chunks_overlapping(80, 220);
let ids: Vec<u64> = hits.iter().map(|h| h.chunk_id).collect();
assert!(ids.contains(&1));
assert!(ids.contains(&2));
assert!(ids.contains(&3));
assert!(ids.contains(&4));
assert!(idx.chunks_overlapping(500, 600).is_empty());
}
#[test]
fn point_lookup() {
let idx = TemporalIndex::new(16);
idx.register(handle(1, 1, 1000, 2000));
idx.register(handle(2, 2, 1500, 3000));
let at_1800 = idx.chunks_at_timestamp(1800);
assert_eq!(at_1800.len(), 2);
let at_2500 = idx.chunks_at_timestamp(2500);
assert_eq!(at_2500.len(), 1);
assert_eq!(at_2500[0].chunk_id, 2);
assert!(idx.chunks_at_timestamp(9999).is_empty());
}
#[test]
fn unregister_removes_handles() {
let idx = TemporalIndex::new(16);
idx.register(handle(1, 10, 0, 100));
idx.register(handle(1, 11, 100, 200));
assert_eq!(idx.len(), 2);
let removed = idx.unregister(10);
assert_eq!(removed, 1);
assert_eq!(idx.len(), 1);
assert!(idx.chunks_at_timestamp(50).is_empty());
assert_eq!(idx.chunks_at_timestamp(150).len(), 1);
}
#[test]
fn bloom_guards_min_ts_lookup() {
let idx = TemporalIndex::new(16);
idx.register(handle(1, 1, 5000, 6000));
assert!(idx.min_ts_possibly_registered(5000));
assert!(idx.chunks_at_timestamp(999_999).is_empty());
}
#[test]
fn global_max_tracks_highest() {
let idx = TemporalIndex::new(16);
idx.register(handle(1, 1, 0, 100));
idx.register(handle(1, 2, 200, 500));
idx.register(handle(1, 3, 100, 300));
assert_eq!(idx.global_max_timestamp(), 500);
}
#[test]
fn stats_reflect_registrations() {
let idx = TemporalIndex::new(16);
idx.register(handle(1, 1, 0, 10));
idx.register(handle(1, 2, 0, 20));
idx.register(handle(1, 3, 100, 200));
let s = idx.stats();
assert_eq!(s.entries, 3);
assert_eq!(s.distinct_keys, 2);
assert_eq!(s.kind, IndexKind::Temporal);
assert!(s.has_bloom);
}
#[test]
fn clear_resets() {
let idx = TemporalIndex::new(16);
idx.register(handle(1, 1, 0, 100));
idx.clear();
assert!(idx.is_empty());
assert_eq!(idx.global_max_timestamp(), 0);
assert!(idx.chunks_at_timestamp(50).is_empty());
}
#[test]
fn reversed_range_returns_empty() {
let idx = TemporalIndex::new(16);
idx.register(handle(1, 1, 100, 200));
assert!(idx.chunks_overlapping(500, 100).is_empty());
}
#[test]
fn dedup_overlapping_brin_windows() {
let idx = TemporalIndex::new(16);
for i in 0..128u64 {
idx.register(handle(1, i, 1000 + i, 1000 + i + 5));
}
idx.register(handle(1, 200, 1050, 1300)); idx.register(handle(1, 201, 1100, 1400));
let hits = idx.chunks_overlapping(1100, 1200);
let unique: HashSet<_> = hits.iter().map(|h| h.chunk_id).collect();
assert_eq!(hits.len(), unique.len(), "duplicates leaked through");
}
#[test]
fn correlation_starts_optimistic_then_tracks_inserts() {
let idx = TemporalIndex::new(16);
assert_eq!(idx.index_correlation(), 1.0);
for i in 0..10u64 {
idx.register(handle(1, i, i * 100, i * 100 + 50));
}
assert!((idx.index_correlation() - 1.0).abs() < 1e-9);
idx.register(handle(1, 99, 50, 100));
let c = idx.index_correlation();
assert!(c < 1.0 && c > 0.0, "correlation = {c}");
}
#[test]
fn brin_block_seal_boundary() {
let idx = TemporalIndex::new(BRIN_CHUNKS_PER_RANGE * 2);
for i in 0..BRIN_CHUNKS_PER_RANGE as u64 {
idx.register(handle(1, i, i * 10, i * 10 + 5));
}
assert_eq!(idx.brin_range_count(), 1);
idx.register(handle(1, 999, 99_999, 100_000));
assert_eq!(idx.brin_range_count(), 2);
let hits = idx.chunks_overlapping(99_999, 100_000);
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].chunk_id, 999);
}
#[test]
fn concurrent_register() {
use std::sync::Arc;
use std::thread;
let idx = Arc::new(TemporalIndex::new(1024));
let mut handles = vec![];
for t in 0..4u64 {
let idx_c = Arc::clone(&idx);
handles.push(thread::spawn(move || {
for i in 0..100u64 {
idx_c.register(handle(t, t * 1000 + i, i * 10, i * 10 + 9));
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(idx.len(), 400);
let hits = idx.chunks_at_timestamp(45);
assert_eq!(hits.len(), 4);
}
}