tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::*;

impl ChunkStorage {
    pub(super) fn insert_series_into_merged_postings(
        merged_postings: &mut SegmentPostingsIndex,
        series_id: SeriesId,
        metric: &str,
        labels: &[Label],
    ) {
        merged_postings.series_postings.insert(series_id);
        merged_postings
            .metric_postings
            .entry(metric.to_string())
            .or_default()
            .insert(series_id);
        for label in labels {
            merged_postings
                .label_name_postings
                .entry(label.name.clone())
                .or_default()
                .insert(series_id);
            merged_postings
                .label_postings
                .entry((label.name.clone(), label.value.clone()))
                .or_default()
                .insert(series_id);
        }
    }

    pub(super) fn remove_series_from_merged_postings(
        merged_postings: &mut SegmentPostingsIndex,
        series_id: SeriesId,
        metric: &str,
        labels: &[Label],
    ) {
        merged_postings.series_postings.remove(series_id);

        let remove_metric =
            merged_postings
                .metric_postings
                .get_mut(metric)
                .is_some_and(|series_ids| {
                    series_ids.remove(series_id);
                    series_ids.is_empty()
                });
        if remove_metric {
            merged_postings.metric_postings.remove(metric);
        }

        for label in labels {
            let remove_label_name = merged_postings
                .label_name_postings
                .get_mut(label.name.as_str())
                .is_some_and(|series_ids| {
                    series_ids.remove(series_id);
                    series_ids.is_empty()
                });
            if remove_label_name {
                merged_postings
                    .label_name_postings
                    .remove(label.name.as_str());
            }

            let label_key = (label.name.clone(), label.value.clone());
            let remove_label = merged_postings
                .label_postings
                .get_mut(&label_key)
                .is_some_and(|series_ids| {
                    series_ids.remove(series_id);
                    series_ids.is_empty()
                });
            if remove_label {
                merged_postings.label_postings.remove(&label_key);
            }
        }
    }

    pub(super) fn rebuild_merged_postings_from_registry(
        &self,
        persisted_index: &mut PersistedIndexState,
    ) -> Result<()> {
        let publication = self.lifecycle_publication_context();
        let registry = publication.registry.read();
        let mut merged_postings = SegmentPostingsIndex::default();
        for series_id in persisted_index.chunk_refs.keys().copied() {
            let Some(series_key) = registry.decode_series_key(series_id) else {
                return Err(TsinkError::DataCorruption(format!(
                    "persisted series id {} is missing from the runtime registry",
                    series_id
                )));
            };
            Self::insert_series_into_merged_postings(
                &mut merged_postings,
                series_id,
                &series_key.metric,
                &series_key.labels,
            );
        }
        persisted_index.merged_postings = merged_postings;
        Ok(())
    }

    pub(super) fn insert_loaded_series_into_merged_postings(
        persisted_index: &mut PersistedIndexState,
        series: &[PersistedSeries],
    ) {
        if series.is_empty() {
            return;
        }

        persisted_index
            .merged_postings
            .clear_missing_label_postings_cache();
        for series in series {
            Self::insert_series_into_merged_postings(
                &mut persisted_index.merged_postings,
                series.series_id,
                &series.metric,
                &series.labels,
            );
        }
    }

    pub(super) fn remove_series_ids_from_merged_postings(
        &self,
        persisted_index: &mut PersistedIndexState,
        series_ids: &[SeriesId],
    ) -> Result<()> {
        if series_ids.is_empty() {
            return Ok(());
        }

        let publication = self.lifecycle_publication_context();
        let registry = publication.registry.read();
        persisted_index
            .merged_postings
            .clear_missing_label_postings_cache();
        for &series_id in series_ids {
            let Some(series_key) = registry.decode_series_key(series_id) else {
                return Err(TsinkError::DataCorruption(format!(
                    "persisted series id {} is missing from the runtime registry",
                    series_id
                )));
            };
            Self::remove_series_from_merged_postings(
                &mut persisted_index.merged_postings,
                series_id,
                &series_key.metric,
                &series_key.labels,
            );
        }

        Ok(())
    }
}