tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::*;

impl ChunkStorage {
    pub(super) fn accumulate_max_timestamp(current: &mut Option<i64>, candidate: Option<i64>) {
        if let Some(candidate) = candidate {
            *current = Some(
                current
                    .map(|existing| existing.max(candidate))
                    .unwrap_or(candidate),
            );
        }
    }

    pub(super) fn normalize_series_ids(mut series_ids: Vec<SeriesId>) -> Vec<SeriesId> {
        series_ids.sort_unstable();
        series_ids.dedup();
        series_ids
    }

    pub(super) fn with_lifecycle_persisted_index_publication<R>(
        publication: LifecyclePublicationContext<'_>,
        mutate: impl FnOnce(&mut PersistedIndexState) -> R,
    ) -> R {
        let mut persisted_index = publication.persisted_index.write();
        let before_included = if publication.accounting_enabled {
            Self::persisted_index_included_memory_usage_bytes(&persisted_index)
        } else {
            0
        };
        let before_mmap = if publication.accounting_enabled {
            Self::persisted_index_persisted_mmap_bytes(&persisted_index)
        } else {
            0
        };
        let result = mutate(&mut persisted_index);
        let account = |component: &std::sync::atomic::AtomicU64, before: usize, after: usize| {
            if !publication.accounting_enabled || before == after {
                return;
            }
            let delta = saturating_u64_from_usize(before.abs_diff(after));
            if after >= before {
                component.fetch_add(delta, Ordering::AcqRel);
                publication
                    .shared_used_bytes
                    .fetch_add(delta, Ordering::AcqRel);
                publication.used_bytes.fetch_add(delta, Ordering::AcqRel);
            } else {
                component.fetch_sub(delta, Ordering::AcqRel);
                publication
                    .shared_used_bytes
                    .fetch_sub(delta, Ordering::AcqRel);
                publication.used_bytes.fetch_sub(delta, Ordering::AcqRel);
            }
        };
        account(
            publication.persisted_index_used_bytes,
            before_included,
            Self::persisted_index_included_memory_usage_bytes(&persisted_index),
        );
        account(
            publication.persisted_mmap_used_bytes,
            before_mmap,
            Self::persisted_index_persisted_mmap_bytes(&persisted_index),
        );
        result
    }

    pub(super) fn apply_loaded_segments_state_update(
        &self,
        loaded_segments: Vec<IndexedSegment>,
        loaded_series: &[PersistedSeries],
    ) -> Result<PersistedSegmentsStateUpdate> {
        let publication = self.lifecycle_publication_context();
        let metadata_delta = publication.runtime_metadata_delta;
        let state_update = Self::with_lifecycle_persisted_index_publication(
            publication,
            move |persisted_index| -> Result<PersistedSegmentsStateUpdate> {
                let mut state_update = PersistedSegmentsStateUpdate::default();
                for segment in loaded_segments {
                    let (segment_lane, segment_tier) =
                        self.persisted_segment_location_for_root(&segment.root)?;
                    let (segment_series, segment_max_ts, _segment_bounded_max_ts) = self
                        .insert_indexed_segment_into_state(
                            segment_lane,
                            segment_tier,
                            persisted_index,
                            segment,
                        )?;
                    state_update.affected_series.extend(segment_series);
                    Self::accumulate_max_timestamp(
                        &mut state_update.loaded_max_timestamp,
                        segment_max_ts,
                    );
                }

                state_update.affected_series =
                    Self::normalize_series_ids(state_update.affected_series);
                Self::insert_loaded_series_into_merged_postings(persisted_index, loaded_series);
                metadata_delta.reconcile_series_ids_locked(
                    persisted_index,
                    state_update.affected_series.iter().copied(),
                );
                Ok(state_update)
            },
        )?;
        Ok(state_update)
    }

    pub(super) fn apply_segment_root_removal_state_update(
        &self,
        roots: &[PathBuf],
    ) -> Result<PersistedSegmentRemovalStateUpdate> {
        let publication = self.lifecycle_publication_context();
        let metadata_delta = publication.runtime_metadata_delta;
        let state_update = Self::with_lifecycle_persisted_index_publication(
            publication,
            |persisted_index| -> Result<PersistedSegmentRemovalStateUpdate> {
                let mut affected_series = Vec::new();
                let mut fully_removed_series = BTreeSet::new();
                let mut removed_any = false;

                for root in roots {
                    let (removed_series, removed_from_postings) =
                        Self::remove_segment_root_from_state(persisted_index, root);
                    removed_any |= !removed_series.is_empty();
                    affected_series.extend(removed_series);
                    fully_removed_series.extend(removed_from_postings);
                }

                let affected_series = Self::normalize_series_ids(affected_series);
                let fully_removed_series =
                    fully_removed_series.into_iter().collect::<Vec<SeriesId>>();
                self.remove_series_ids_from_merged_postings(
                    persisted_index,
                    &fully_removed_series,
                )?;
                metadata_delta
                    .reconcile_series_ids_locked(persisted_index, affected_series.iter().copied());
                Ok(PersistedSegmentRemovalStateUpdate {
                    removed_any,
                    affected_series,
                })
            },
        )?;
        Ok(state_update)
    }

    pub(super) fn prepare_persisted_index_install(
        &self,
        loaded: LoadedSegmentIndexes,
    ) -> Result<PreparedPersistedIndexInstall> {
        let LoadedSegmentIndexes {
            indexed_segments,
            next_segment_id,
            ..
        } = loaded;
        let mut persisted_index = PersistedIndexState::default();
        let mut loaded_max_timestamp = None;
        for indexed_segment in indexed_segments {
            let (segment_lane, segment_tier) =
                self.persisted_segment_location_for_root(&indexed_segment.root)?;
            let (_, segment_max_ts, _segment_bounded_max_ts) = self
                .insert_indexed_segment_into_state(
                    segment_lane,
                    segment_tier,
                    &mut persisted_index,
                    indexed_segment,
                )?;
            Self::accumulate_max_timestamp(&mut loaded_max_timestamp, segment_max_ts);
        }

        let persisted_series_ids =
            Self::normalize_series_ids(persisted_index.chunk_refs.keys().copied().collect());
        Ok(PreparedPersistedIndexInstall {
            persisted_index,
            persisted_series_ids,
            loaded_max_timestamp,
            next_segment_id: next_segment_id.max(1),
        })
    }

    pub(super) fn reconcile_registry_with_persisted_series_ids(&self, series_ids: &[SeriesId]) {
        let keep = series_ids.iter().copied().collect::<BTreeSet<_>>();
        self.lifecycle_publication_context()
            .mutate_registry(|registry| registry.retain_series_ids(&keep));
    }

    pub(super) fn install_persisted_index_rebuild(
        &self,
        prepared: PreparedPersistedIndexInstall,
    ) -> Result<()> {
        let PreparedPersistedIndexInstall {
            persisted_index,
            persisted_series_ids,
            loaded_max_timestamp,
            next_segment_id,
        } = prepared;
        let publication = self.lifecycle_publication_context();
        let metadata_delta = publication.runtime_metadata_delta;
        Self::with_lifecycle_persisted_index_publication(
            publication,
            move |current_persisted_index| {
                *current_persisted_index = persisted_index;
                metadata_delta.rebuild_locked(current_persisted_index);
            },
        );

        self.install_visible_persisted_state(PersistedVisibleStateInstall {
            refresh_series_ids: persisted_series_ids,
            mark_refreshed_series_materialized: true,
            loaded_max_timestamp,
            next_segment_id: Some(next_segment_id),
        })
    }

    fn insert_indexed_segment_into_state(
        &self,
        segment_lane: SegmentLaneFamily,
        segment_tier: PersistedSegmentTier,
        persisted_index: &mut PersistedIndexState,
        indexed_segment: IndexedSegment,
    ) -> Result<(Vec<SeriesId>, Option<i64>, Option<i64>)> {
        debug_assert!(
            indexed_segment.manifest.series_count == 0
                || indexed_segment.postings.series_postings.is_empty()
                || indexed_segment.series.is_empty()
                || !indexed_segment.postings.metric_postings.is_empty(),
            "segments with loaded postings should carry metric postings"
        );
        let root = indexed_segment.root;
        if persisted_index.segments_by_root.contains_key(&root) {
            return Ok((Vec::new(), None, None));
        }
        let segment_manifest = indexed_segment.manifest;
        let segment_level = segment_manifest.level;
        let chunk_map = Arc::new(indexed_segment.chunks_mmap);
        let chunk_entries = indexed_segment.chunk_index.entries;

        let segment_slot = persisted_index.next_segment_slot;
        let mut refs_by_series = HashMap::<SeriesId, Vec<PersistedChunkRef>>::new();
        let mut series_time_summaries =
            HashMap::<SeriesId, state::PersistedSeriesTimeRangeSummary>::new();
        let mut time_bucket_postings = segment_manifest
            .min_ts
            .zip(segment_manifest.max_ts)
            .map(|(min_ts, max_ts)| state::PersistedSegmentTimeBucketIndex::new(min_ts, max_ts));
        let mut affected_series = BTreeSet::new();
        let mut loaded_max_timestamp: Option<i64> = None;
        let mut loaded_bounded_max_timestamp: Option<i64> = None;
        let future_skew_cutoff = self.current_future_skew_cutoff();
        let mut next_chunk_sequence = persisted_index.next_chunk_sequence.max(1);

        for entry in chunk_entries {
            let sequence = next_chunk_sequence;
            next_chunk_sequence = sequence.saturating_add(1);
            Self::accumulate_max_timestamp(&mut loaded_max_timestamp, Some(entry.max_ts));
            let bounded_candidate = if entry.max_ts <= future_skew_cutoff {
                Some(entry.max_ts)
            } else if entry.min_ts <= future_skew_cutoff {
                Some(entry.min_ts)
            } else {
                None
            };
            Self::accumulate_max_timestamp(&mut loaded_bounded_max_timestamp, bounded_candidate);

            let chunk_ref = PersistedChunkRef {
                level: segment_level,
                min_ts: entry.min_ts,
                max_ts: entry.max_ts,
                point_count: entry.point_count,
                sequence,
                chunk_offset: entry.chunk_offset,
                chunk_len: entry.chunk_len,
                lane: entry.lane,
                ts_codec: entry.ts_codec,
                value_codec: entry.value_codec,
                segment_slot,
            };

            refs_by_series
                .entry(entry.series_id)
                .or_default()
                .push(chunk_ref);
            if let Some(time_bucket_postings) = time_bucket_postings.as_mut() {
                time_bucket_postings.insert_series_range(
                    entry.series_id,
                    entry.min_ts,
                    entry.max_ts,
                );
            }
            series_time_summaries
                .entry(entry.series_id)
                .and_modify(|summary| {
                    summary.min_ts = summary.min_ts.min(entry.min_ts);
                    summary.max_ts = summary.max_ts.max(entry.max_ts);
                })
                .or_insert(state::PersistedSeriesTimeRangeSummary {
                    min_ts: entry.min_ts,
                    max_ts: entry.max_ts,
                });
            persisted_index
                .chunk_refs
                .entry(entry.series_id)
                .or_default()
                .push(chunk_ref);
            persisted_index
                .chunk_timestamp_indexes
                .insert(sequence, std::sync::OnceLock::new());
            affected_series.insert(entry.series_id);
        }

        persisted_index.next_segment_slot = persisted_index.next_segment_slot.saturating_add(1);
        persisted_index.next_chunk_sequence = next_chunk_sequence;
        persisted_index
            .segment_maps
            .insert(segment_slot, Arc::clone(&chunk_map));
        persisted_index
            .segment_tiers
            .insert(segment_slot, segment_tier);

        for refs in refs_by_series.values_mut() {
            Self::sort_persisted_chunk_refs(refs.as_mut_slice());
        }
        for series_id in &affected_series {
            if let Some(refs) = persisted_index.chunk_refs.get_mut(series_id) {
                Self::sort_persisted_chunk_refs(refs.as_mut_slice());
            }
        }

        persisted_index.segments_by_root.insert(
            root,
            state::PersistedSegmentState {
                segment_slot,
                lane: segment_lane,
                tier: segment_tier,
                manifest: segment_manifest,
                time_bucket_postings,
                series_time_summaries,
                chunk_refs_by_series: refs_by_series,
            },
        );

        Ok((
            affected_series.into_iter().collect(),
            loaded_max_timestamp,
            loaded_bounded_max_timestamp,
        ))
    }

    fn remove_segment_root_from_state(
        persisted_index: &mut PersistedIndexState,
        root: &Path,
    ) -> (Vec<SeriesId>, Vec<SeriesId>) {
        let Some(mut removed_segment) = persisted_index.segments_by_root.remove(root) else {
            return (Vec::new(), Vec::new());
        };

        persisted_index
            .segment_maps
            .remove(&removed_segment.segment_slot);
        persisted_index
            .segment_tiers
            .remove(&removed_segment.segment_slot);
        let mut affected_series = BTreeSet::new();
        let mut fully_removed_series = Vec::new();
        for (series_id, removed_refs) in std::mem::take(&mut removed_segment.chunk_refs_by_series) {
            affected_series.insert(series_id);
            for removed_ref in &removed_refs {
                persisted_index
                    .chunk_timestamp_indexes
                    .remove(&removed_ref.sequence);
            }
            let mut remove_series = false;
            if let Some(existing_refs) = persisted_index.chunk_refs.get_mut(&series_id) {
                existing_refs
                    .retain(|candidate| !removed_refs.iter().any(|removed| removed == candidate));
                remove_series = existing_refs.is_empty();
            }
            if remove_series {
                persisted_index.chunk_refs.remove(&series_id);
                fully_removed_series.push(series_id);
            }
        }

        (affected_series.into_iter().collect(), fully_removed_series)
    }
}