tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::*;
use crate::engine::tombstone;

#[path = "core_impl/contexts.rs"]
mod contexts;
#[path = "core_impl/metadata_publication.rs"]
mod metadata_publication;
#[path = "core_impl/utilities.rs"]
mod utilities;
#[path = "core_impl/write_capabilities.rs"]
mod write_capabilities;
#[path = "core_impl/write_memory.rs"]
mod write_memory;
#[path = "core_impl/write_visibility.rs"]
mod write_visibility;

pub(super) use self::contexts::{
    CatalogContext, ChunkContext, PersistedRefreshContext, WriteApplyContext, WriteCommitContext,
    WritePrepareContext, WriteResolveContext, WriteSeriesValidationContext,
};
pub(super) use self::metadata_publication::*;
pub(super) use self::utilities::{
    current_unix_millis_u64, current_unix_timestamp_units, duration_to_timestamp_units,
    elapsed_nanos_u64, lane_for_value, partition_id_for_timestamp, persisted_chunk_payload,
    saturating_u64_from_usize, value_heap_bytes, MemoryDeltaBytes,
};
pub(super) use self::write_capabilities::*;
pub(super) use self::write_memory::*;
pub(super) use self::write_visibility::*;

impl ChunkStorage {
    pub(super) fn existing_series_ids_for_metric_series<I>(&self, series: I) -> Vec<SeriesId>
    where
        I: IntoIterator<Item = MetricSeries>,
    {
        let registry = self.catalog.registry.read();
        let mut series_ids = series
            .into_iter()
            .filter_map(|series| {
                registry
                    .resolve_existing(&series.name, &series.labels)
                    .map(|resolution| resolution.series_id)
            })
            .collect::<Vec<_>>();
        series_ids.sort_unstable();
        series_ids.dedup();
        series_ids
    }

    pub fn flush(&self) -> Result<()> {
        self.ensure_open()?;
        self.flush_pipeline_once()
    }

    pub(super) fn metric_series_for_ids<I>(&self, series_ids: I) -> Vec<MetricSeries>
    where
        I: IntoIterator<Item = SeriesId>,
    {
        let registry = self.catalog.registry.read();
        series_ids
            .into_iter()
            .filter_map(|series_id| {
                registry
                    .decode_series_key(series_id)
                    .and_then(|series_key| {
                        (!rollups::is_internal_rollup_metric(&series_key.metric)).then_some(
                            MetricSeries {
                                name: series_key.metric,
                                labels: series_key.labels,
                            },
                        )
                    })
            })
            .collect()
    }

    pub(super) fn latest_visible_timestamp_for_series_locked(
        &self,
        series_id: SeriesId,
        persisted_index: &PersistedIndexState,
        tombstone_ranges: Option<&[tombstone::TombstoneRange]>,
    ) -> Result<Option<i64>> {
        self.latest_visible_timestamp_for_series_locked_with_ceiling(
            series_id,
            persisted_index,
            tombstone_ranges,
            i64::MAX,
        )
    }

    pub(super) fn latest_visible_bounded_timestamp_for_series_locked(
        &self,
        series_id: SeriesId,
        persisted_index: &PersistedIndexState,
        tombstone_ranges: Option<&[tombstone::TombstoneRange]>,
        ceiling_inclusive: i64,
    ) -> Result<Option<i64>> {
        self.latest_visible_timestamp_for_series_locked_with_ceiling(
            series_id,
            persisted_index,
            tombstone_ranges,
            ceiling_inclusive,
        )
    }

    fn latest_visible_timestamp_for_series_locked_with_ceiling(
        &self,
        series_id: SeriesId,
        persisted_index: &PersistedIndexState,
        tombstone_ranges: Option<&[tombstone::TombstoneRange]>,
        ceiling_inclusive: i64,
    ) -> Result<Option<i64>> {
        let mut latest = i64::MIN;

        {
            let active = self.active_shard(series_id).read();
            let sealed = self.sealed_shard(series_id).read();
            if let Some(state) = active.get(&series_id) {
                if let Some(active_latest) = Self::latest_visible_timestamp_in_points(
                    state.points_in_partition_order(),
                    tombstone_ranges,
                    latest,
                    ceiling_inclusive,
                ) {
                    latest = latest.max(active_latest);
                }
            }
            if let Some(chunks) = sealed.get(&series_id) {
                for chunk in chunks.values() {
                    if let Some(chunk_latest) = Self::latest_visible_timestamp_in_chunk(
                        chunk,
                        tombstone_ranges,
                        latest,
                        ceiling_inclusive,
                    )? {
                        latest = latest.max(chunk_latest);
                    }
                }
            }
        }

        if let Some(chunks) = persisted_index.chunk_refs.get(&series_id) {
            for chunk_ref in chunks {
                if let Some(chunk_latest) = self.latest_visible_timestamp_in_persisted_chunk(
                    persisted_index,
                    chunk_ref,
                    tombstone_ranges,
                    latest,
                    ceiling_inclusive,
                )? {
                    latest = latest.max(chunk_latest);
                }
            }
        }

        Ok((latest != i64::MIN).then_some(latest))
    }

    fn latest_visible_timestamp_in_points<'a, I>(
        points: I,
        tombstone_ranges: Option<&[tombstone::TombstoneRange]>,
        floor_exclusive: i64,
        ceiling_inclusive: i64,
    ) -> Option<i64>
    where
        I: IntoIterator<Item = &'a ChunkPoint>,
    {
        points
            .into_iter()
            .filter_map(|point| {
                (point.ts > floor_exclusive
                    && point.ts <= ceiling_inclusive
                    && Self::timestamp_survives_tombstones(point.ts, tombstone_ranges))
                .then_some(point.ts)
            })
            .max()
    }

    fn latest_visible_timestamp_in_decoded_chunk_points(
        chunk: &Chunk,
        tombstone_ranges: Option<&[tombstone::TombstoneRange]>,
        floor_exclusive: i64,
        ceiling_inclusive: i64,
    ) -> Result<Option<i64>> {
        let decoded = chunk.decode_points()?;
        Ok(Self::latest_visible_timestamp_in_points(
            decoded.iter(),
            tombstone_ranges,
            floor_exclusive,
            ceiling_inclusive,
        ))
    }

    pub(super) fn latest_visible_timestamp_in_chunk(
        chunk: &Chunk,
        tombstone_ranges: Option<&[tombstone::TombstoneRange]>,
        floor_exclusive: i64,
        ceiling_inclusive: i64,
    ) -> Result<Option<i64>> {
        if chunk.header.max_ts <= floor_exclusive || chunk.header.min_ts > ceiling_inclusive {
            return Ok(None);
        }

        let Some(tombstone_ranges) = tombstone_ranges else {
            if chunk.header.max_ts <= ceiling_inclusive {
                return Ok(Some(chunk.header.max_ts));
            }
            return Self::latest_visible_timestamp_in_decoded_chunk_points(
                chunk,
                None,
                floor_exclusive,
                ceiling_inclusive,
            );
        };
        if tombstone::interval_fully_tombstoned(
            chunk.header.min_ts,
            chunk.header.max_ts,
            tombstone_ranges,
        ) {
            return Ok(None);
        }
        if chunk.header.max_ts <= ceiling_inclusive
            && !tombstone::timestamp_is_tombstoned(chunk.header.max_ts, tombstone_ranges)
        {
            return Ok(Some(chunk.header.max_ts));
        }

        Self::latest_visible_timestamp_in_decoded_chunk_points(
            chunk,
            Some(tombstone_ranges),
            floor_exclusive,
            ceiling_inclusive,
        )
    }

    fn persisted_chunk_timestamp_search_index<'a>(
        &self,
        persisted_index: &'a PersistedIndexState,
        chunk_ref: &PersistedChunkRef,
    ) -> Result<&'a crate::engine::encoder::TimestampSearchIndex> {
        let timestamp_search_index = persisted_index
            .chunk_timestamp_indexes
            .get(&chunk_ref.sequence)
            .ok_or_else(|| {
                TsinkError::DataCorruption(format!(
                    "missing timestamp search index for persisted chunk sequence {}",
                    chunk_ref.sequence
                ))
            })?;
        if let Some(timestamp_search_index) = timestamp_search_index.get() {
            return Ok(timestamp_search_index);
        }

        let payload = persisted_chunk_payload(&persisted_index.segment_maps, chunk_ref)?;
        let built = Encoder::build_timestamp_search_index_from_payload(
            chunk_ref.ts_codec,
            chunk_ref.point_count as usize,
            payload.as_ref(),
        )?;
        let built_bytes = built.memory_usage_bytes();
        if timestamp_search_index.set(built).is_ok() {
            self.add_included_memory_component_bytes(
                &self.memory.persisted_index_used_bytes,
                built_bytes,
            );
        }
        Ok(timestamp_search_index
            .get()
            .expect("timestamp search index must be initialized after set"))
    }

    fn latest_visible_timestamp_in_persisted_chunk_payload(
        &self,
        chunk_ref: &PersistedChunkRef,
        payload: &[u8],
        timestamp_search_index: &crate::engine::encoder::TimestampSearchIndex,
        tombstone_ranges: Option<&[tombstone::TombstoneRange]>,
        floor_exclusive: i64,
        ceiling_inclusive: i64,
    ) -> Result<Option<i64>> {
        let ts_payload = Encoder::timestamp_payload_from_chunk_payload(payload)?;
        let Some(mut block_idx) = timestamp_search_index
            .candidate_block_for_ceiling(chunk_ref.point_count as usize, ceiling_inclusive)
        else {
            return Ok(None);
        };

        loop {
            let timestamps = timestamp_search_index.decode_block(
                chunk_ref.point_count as usize,
                ts_payload,
                block_idx,
            )?;
            let block_floor_reached = timestamps
                .first()
                .copied()
                .is_some_and(|timestamp| timestamp <= floor_exclusive);

            for timestamp in timestamps.into_iter().rev() {
                if timestamp > ceiling_inclusive {
                    continue;
                }
                if timestamp <= floor_exclusive {
                    return Ok(None);
                }
                if Self::timestamp_survives_tombstones(timestamp, tombstone_ranges) {
                    return Ok(Some(timestamp));
                }
            }

            if block_floor_reached || block_idx == 0 {
                return Ok(None);
            }
            block_idx = block_idx.saturating_sub(1);
        }
    }

    pub(super) fn latest_visible_timestamp_in_persisted_chunk(
        &self,
        persisted_index: &PersistedIndexState,
        chunk_ref: &PersistedChunkRef,
        tombstone_ranges: Option<&[tombstone::TombstoneRange]>,
        floor_exclusive: i64,
        ceiling_inclusive: i64,
    ) -> Result<Option<i64>> {
        if chunk_ref.max_ts <= floor_exclusive || chunk_ref.min_ts > ceiling_inclusive {
            return Ok(None);
        }

        let Some(tombstone_ranges) = tombstone_ranges else {
            if chunk_ref.max_ts <= ceiling_inclusive {
                return Ok(Some(chunk_ref.max_ts));
            }
            let timestamp_search_index =
                self.persisted_chunk_timestamp_search_index(persisted_index, chunk_ref)?;
            let payload = persisted_chunk_payload(&persisted_index.segment_maps, chunk_ref)?;
            return self.latest_visible_timestamp_in_persisted_chunk_payload(
                chunk_ref,
                payload.as_ref(),
                timestamp_search_index,
                None,
                floor_exclusive,
                ceiling_inclusive,
            );
        };
        if tombstone::interval_fully_tombstoned(
            chunk_ref.min_ts,
            chunk_ref.max_ts,
            tombstone_ranges,
        ) {
            return Ok(None);
        }
        if chunk_ref.max_ts <= ceiling_inclusive
            && !tombstone::timestamp_is_tombstoned(chunk_ref.max_ts, tombstone_ranges)
        {
            return Ok(Some(chunk_ref.max_ts));
        }

        let payload = persisted_chunk_payload(&persisted_index.segment_maps, chunk_ref)?;
        let timestamp_search_index =
            self.persisted_chunk_timestamp_search_index(persisted_index, chunk_ref)?;
        self.latest_visible_timestamp_in_persisted_chunk_payload(
            chunk_ref,
            payload.as_ref(),
            timestamp_search_index,
            Some(tombstone_ranges),
            floor_exclusive,
            ceiling_inclusive,
        )
    }
}