tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::analysis::{analyze_series_read_sources, SeriesReadAnalysis};
use super::*;

#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub(super) struct MergePathShardSnapshotStats {
    snapshots: u64,
    wait_nanos: u64,
    hold_nanos: u64,
}

impl MergePathShardSnapshotStats {
    pub(super) fn single(wait_nanos: u64, hold_nanos: u64) -> Self {
        Self {
            snapshots: 1,
            wait_nanos,
            hold_nanos,
        }
    }

    #[cfg(test)]
    pub(super) fn snapshot_count(self) -> u64 {
        self.snapshots
    }
}

#[derive(Default)]
pub(super) struct PersistedSeriesSourceSnapshot {
    pub(super) chunks: Vec<PersistedChunkRef>,
    pub(super) segment_maps: HashMap<usize, Arc<PlatformMmap>>,
    pub(super) segment_tiers: HashMap<usize, PersistedSegmentTier>,
}

impl PersistedSeriesSourceSnapshot {
    pub(super) fn chunk_tier(&self, chunk_ref: &PersistedChunkRef) -> PersistedSegmentTier {
        self.segment_tiers
            .get(&chunk_ref.segment_slot)
            .copied()
            .unwrap_or(PersistedSegmentTier::Hot)
    }
}

#[derive(Default)]
pub(super) struct InMemorySeriesSourceSnapshot {
    sealed_chunks: Vec<Arc<Chunk>>,
    active_points: ActiveSeriesSnapshot,
}

impl InMemorySeriesSourceSnapshot {
    #[cfg(test)]
    pub(super) fn active_point_count(&self) -> usize {
        self.active_points.point_count()
    }
}

pub(super) struct SeriesReadSnapshot {
    pub(super) persisted: PersistedSeriesSourceSnapshot,
    pub(super) sealed_chunks: Vec<Arc<Chunk>>,
    pub(super) active_points: ActiveSeriesSnapshot,
    pub(super) analysis: SeriesReadAnalysis,
}

impl QuerySnapshotContext<'_> {
    fn snapshot_persisted_series_sources(
        self,
        series_id: SeriesId,
        start: i64,
        end: i64,
        plan: TieredQueryPlan,
    ) -> Result<PersistedSeriesSourceSnapshot> {
        let mut snapshot = PersistedSeriesSourceSnapshot::default();
        let mut segment_slots = BTreeSet::<usize>::new();
        let persisted_index = self.persisted_index.read();

        if let Some(chunks) = persisted_index.chunk_refs.get(&series_id) {
            let end_idx = chunks.partition_point(|chunk| chunk.min_ts < end);
            snapshot.chunks.reserve(end_idx);
            for chunk_ref in &chunks[..end_idx] {
                if chunk_ref.max_ts < start {
                    continue;
                }
                let chunk_tier = ChunkStorage::persisted_chunk_tier(&persisted_index, chunk_ref);
                if !ChunkStorage::plan_includes_persisted_tier(plan, chunk_tier) {
                    continue;
                }

                segment_slots.insert(chunk_ref.segment_slot);
                snapshot.chunks.push(*chunk_ref);
            }
        }

        snapshot.segment_maps.reserve(segment_slots.len());
        snapshot.segment_tiers.reserve(segment_slots.len());
        for slot in &segment_slots {
            let Some(segment_map) = persisted_index.segment_maps.get(slot) else {
                return Err(TsinkError::DataCorruption(format!(
                    "missing mapped segment slot {}",
                    slot
                )));
            };
            snapshot.segment_maps.insert(*slot, Arc::clone(segment_map));
            snapshot.segment_tiers.insert(
                *slot,
                persisted_index
                    .segment_tiers
                    .get(slot)
                    .copied()
                    .unwrap_or(PersistedSegmentTier::Hot),
            );
        }

        Ok(snapshot)
    }

    pub(super) fn snapshot_in_memory_series_sources(
        self,
        series_id: SeriesId,
        start: i64,
        end: i64,
    ) -> (InMemorySeriesSourceSnapshot, MergePathShardSnapshotStats) {
        let lock_wait_started = Instant::now();
        let active = self.chunks.active_shard(series_id).read();
        let sealed = self.chunks.sealed_shard(series_id).read();
        let lock_wait_nanos = elapsed_nanos_u64(lock_wait_started);
        let lock_hold_started = Instant::now();

        let mut snapshot = InMemorySeriesSourceSnapshot::default();
        if let Some(chunks) = sealed.get(&series_id) {
            let end_bound = SealedChunkKey::upper_bound_for_min_ts(end);
            snapshot.sealed_chunks.extend(
                chunks
                    .range(..end_bound)
                    .filter(|(_, chunk)| chunk.header.max_ts >= start)
                    .map(|(_, chunk)| Arc::clone(chunk)),
            );
        }
        if let Some(state) = active.get(&series_id) {
            snapshot.active_points = state.snapshot_in_range(start, end, self.partition_window);
        }

        let lock_hold_nanos = elapsed_nanos_u64(lock_hold_started);
        drop(sealed);
        drop(active);

        (
            snapshot,
            MergePathShardSnapshotStats::single(lock_wait_nanos, lock_hold_nanos),
        )
    }

    pub(super) fn snapshot_series_read_sources(
        self,
        series_id: SeriesId,
        start: i64,
        end: i64,
        plan: TieredQueryPlan,
    ) -> Result<(SeriesReadSnapshot, MergePathShardSnapshotStats)> {
        let visibility_guard = self.visibility_read_fence();
        let persisted = self.snapshot_persisted_series_sources(series_id, start, end, plan)?;
        let (in_memory, shard_snapshot_stats) =
            self.snapshot_in_memory_series_sources(series_id, start, end);
        drop(visibility_guard);

        let InMemorySeriesSourceSnapshot {
            sealed_chunks,
            active_points,
        } = in_memory;
        let analysis = analyze_series_read_sources(
            &persisted.chunks,
            &sealed_chunks,
            &active_points,
            start,
            end,
        );

        Ok((
            SeriesReadSnapshot {
                persisted,
                sealed_chunks,
                active_points,
                analysis,
            },
            shard_snapshot_stats,
        ))
    }
}

impl ChunkStorage {
    fn persisted_chunk_tier(
        persisted_index: &PersistedIndexState,
        chunk_ref: &PersistedChunkRef,
    ) -> PersistedSegmentTier {
        persisted_index
            .segment_tiers
            .get(&chunk_ref.segment_slot)
            .copied()
            .unwrap_or(PersistedSegmentTier::Hot)
    }

    pub(in super::super) fn plan_includes_persisted_tier(
        plan: TieredQueryPlan,
        tier: PersistedSegmentTier,
    ) -> bool {
        match tier {
            PersistedSegmentTier::Hot => true,
            PersistedSegmentTier::Warm => plan.includes_warm(),
            PersistedSegmentTier::Cold => plan.includes_cold(),
        }
    }

    pub(super) fn record_merge_path_shard_snapshot_stats(
        &self,
        stats: MergePathShardSnapshotStats,
    ) {
        self.observability
            .query
            .merge_path_shard_snapshots_total
            .fetch_add(stats.snapshots, Ordering::Relaxed);
        self.observability
            .query
            .merge_path_shard_snapshot_wait_nanos_total
            .fetch_add(stats.wait_nanos, Ordering::Relaxed);
        self.observability
            .query
            .merge_path_shard_snapshot_hold_nanos_total
            .fetch_add(stats.hold_nanos, Ordering::Relaxed);
    }

    #[cfg(test)]
    pub(super) fn snapshot_in_memory_series_sources(
        &self,
        series_id: SeriesId,
        start: i64,
        end: i64,
    ) -> (InMemorySeriesSourceSnapshot, MergePathShardSnapshotStats) {
        self.query_snapshot_context()
            .snapshot_in_memory_series_sources(series_id, start, end)
    }

    #[cfg(test)]
    pub(super) fn snapshot_series_read_sources(
        &self,
        series_id: SeriesId,
        start: i64,
        end: i64,
        plan: TieredQueryPlan,
    ) -> Result<(SeriesReadSnapshot, MergePathShardSnapshotStats)> {
        self.query_snapshot_context()
            .snapshot_series_read_sources(series_id, start, end, plan)
    }
}