tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use std::collections::HashMap;

use roaring::RoaringTreemap;

use crate::engine::query::TieredQueryPlan;

use super::state::{PersistedChunkRef, PersistedSegmentState};
use super::{ChunkStorage, SeriesId};

#[derive(Default)]
pub(super) struct PersistedTimeRangePruneResult {
    pub(super) overlapping_series_ids: RoaringTreemap,
    pub(super) exact_scan_chunk_refs: HashMap<SeriesId, Vec<PersistedChunkRef>>,
}

fn bitmap_cardinality(bitmap: &RoaringTreemap) -> usize {
    usize::try_from(bitmap.len()).unwrap_or(usize::MAX)
}

fn persisted_segment_overlaps_time_range(
    segment: &PersistedSegmentState,
    start: i64,
    end: i64,
    plan: TieredQueryPlan,
) -> bool {
    let (Some(min_ts), Some(max_ts)) = (segment.manifest.min_ts, segment.manifest.max_ts) else {
        return false;
    };
    max_ts >= start
        && min_ts < end
        && ChunkStorage::plan_includes_persisted_tier(plan, segment.tier)
}

fn persisted_segment_candidate_series_ids_for_time_range(
    segment: &PersistedSegmentState,
    start: i64,
    end: i64,
) -> RoaringTreemap {
    segment
        .time_bucket_postings
        .as_ref()
        .map(|time_bucket_postings| time_bucket_postings.series_ids_in_time_range(start, end))
        .unwrap_or_else(|| {
            let mut series_ids = RoaringTreemap::new();
            for series_id in segment.chunk_refs_by_series.keys().copied() {
                series_ids.insert(series_id);
            }
            series_ids
        })
}

fn append_overlapping_persisted_chunk_refs(
    exact_scan_chunk_refs: &mut HashMap<SeriesId, Vec<PersistedChunkRef>>,
    segment: &PersistedSegmentState,
    series_id: SeriesId,
    start: i64,
    end: i64,
) -> bool {
    let Some(chunk_refs) = segment.chunk_refs_by_series.get(&series_id) else {
        return false;
    };

    let mut appended = false;
    for chunk_ref in chunk_refs {
        if chunk_ref.max_ts < start || chunk_ref.min_ts >= end {
            continue;
        }
        exact_scan_chunk_refs
            .entry(series_id)
            .or_default()
            .push(*chunk_ref);
        appended = true;
    }
    appended
}

pub(super) fn prune_persisted_exact_scan_candidates<'a>(
    segments: impl IntoIterator<Item = &'a PersistedSegmentState>,
    series_ids: &RoaringTreemap,
    start: i64,
    end: i64,
    plan: TieredQueryPlan,
) -> PersistedTimeRangePruneResult {
    if series_ids.is_empty() {
        return PersistedTimeRangePruneResult::default();
    }

    let mut result = PersistedTimeRangePruneResult::default();
    let candidate_count = bitmap_cardinality(series_ids);
    for segment in segments {
        if !persisted_segment_overlaps_time_range(segment, start, end, plan) {
            continue;
        }

        let mut overlapping_candidates =
            persisted_segment_candidate_series_ids_for_time_range(segment, start, end);
        overlapping_candidates &= series_ids;
        if overlapping_candidates.is_empty() {
            continue;
        }

        let mut overlapping_series_in_segment = RoaringTreemap::new();
        for series_id in overlapping_candidates {
            if append_overlapping_persisted_chunk_refs(
                &mut result.exact_scan_chunk_refs,
                segment,
                series_id,
                start,
                end,
            ) {
                overlapping_series_in_segment.insert(series_id);
            }
        }
        if overlapping_series_in_segment.is_empty() {
            continue;
        }

        result.overlapping_series_ids |= overlapping_series_in_segment;
        if result.exact_scan_chunk_refs.len() >= candidate_count {
            break;
        }
    }

    result
}

#[cfg(test)]
mod tests {
    use std::collections::HashMap;

    use roaring::RoaringTreemap;

    use super::super::state::PersistedSegmentTimeBucketIndex;
    use super::super::tiering::{PersistedSegmentTier, SegmentLaneFamily};
    use super::*;
    use crate::engine::chunk::{TimestampCodecId, ValueCodecId, ValueLane};
    use crate::engine::segment::{SegmentManifest, WalHighWatermark};

    fn bitmap(series_ids: &[SeriesId]) -> RoaringTreemap {
        series_ids.iter().copied().collect()
    }

    fn chunk_ref(segment_slot: usize, min_ts: i64, max_ts: i64) -> PersistedChunkRef {
        PersistedChunkRef {
            level: 0,
            min_ts,
            max_ts,
            point_count: 1,
            sequence: 0,
            chunk_offset: 0,
            chunk_len: 0,
            lane: ValueLane::Numeric,
            ts_codec: TimestampCodecId::DeltaVarint,
            value_codec: ValueCodecId::GorillaXorF64,
            segment_slot,
        }
    }

    fn segment(
        slot: usize,
        tier: PersistedSegmentTier,
        min_ts: i64,
        max_ts: i64,
        chunk_refs_by_series: HashMap<SeriesId, Vec<PersistedChunkRef>>,
        time_bucket_members: Option<&[(SeriesId, i64, i64)]>,
    ) -> PersistedSegmentState {
        let time_bucket_postings = time_bucket_members.map(|members| {
            let mut index = PersistedSegmentTimeBucketIndex::new(min_ts, max_ts);
            for (series_id, series_min_ts, series_max_ts) in members {
                index.insert_series_range(*series_id, *series_min_ts, *series_max_ts);
            }
            index
        });

        PersistedSegmentState {
            segment_slot: slot,
            lane: SegmentLaneFamily::Numeric,
            tier,
            manifest: SegmentManifest {
                segment_id: slot as u64,
                level: 0,
                chunk_count: chunk_refs_by_series.values().map(Vec::len).sum(),
                point_count: 0,
                series_count: chunk_refs_by_series.len(),
                min_ts: Some(min_ts),
                max_ts: Some(max_ts),
                wal_highwater: WalHighWatermark::default(),
            },
            time_bucket_postings,
            series_time_summaries: HashMap::new(),
            chunk_refs_by_series,
        }
    }

    #[test]
    fn prune_result_stays_bounded_to_candidate_series_ids() {
        let series_id = 7;
        let spill_series_id = 9;
        let segment = segment(
            1,
            PersistedSegmentTier::Hot,
            0,
            100,
            HashMap::from([
                (series_id, vec![chunk_ref(1, 40, 50)]),
                (spill_series_id, vec![chunk_ref(1, 40, 50)]),
            ]),
            Some(&[(series_id, 40, 50), (spill_series_id, 40, 50)]),
        );
        let candidates = bitmap(&[series_id]);

        let result = prune_persisted_exact_scan_candidates(
            [&segment],
            &candidates,
            45,
            46,
            TieredQueryPlan::from_cutoffs(45, 46, None, None),
        );

        assert_eq!(
            result.overlapping_series_ids.iter().collect::<Vec<_>>(),
            vec![series_id],
        );
        assert_eq!(
            result
                .exact_scan_chunk_refs
                .keys()
                .copied()
                .collect::<Vec<_>>(),
            vec![series_id],
        );
    }

    #[test]
    fn prune_skips_segments_outside_the_query_tier_plan() {
        let hot_series = 1;
        let cold_series = 2;
        let hot_segment = segment(
            1,
            PersistedSegmentTier::Hot,
            0,
            100,
            HashMap::from([(hot_series, vec![chunk_ref(1, 40, 50)])]),
            Some(&[(hot_series, 40, 50)]),
        );
        let cold_segment = segment(
            2,
            PersistedSegmentTier::Cold,
            0,
            100,
            HashMap::from([(cold_series, vec![chunk_ref(2, 40, 50)])]),
            Some(&[(cold_series, 40, 50)]),
        );
        let candidates = bitmap(&[hot_series, cold_series]);

        let result = prune_persisted_exact_scan_candidates(
            [&hot_segment, &cold_segment],
            &candidates,
            45,
            46,
            TieredQueryPlan::from_cutoffs(45, 46, None, None),
        );

        assert_eq!(
            result.overlapping_series_ids.iter().collect::<Vec<_>>(),
            vec![hot_series],
        );
        assert_eq!(
            result
                .exact_scan_chunk_refs
                .keys()
                .copied()
                .collect::<Vec<_>>(),
            vec![hot_series],
        );
    }
}