tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use std::collections::BTreeSet;
use std::sync::atomic::Ordering;
use std::time::Instant;

use crate::storage::SeriesSelection;

use super::{elapsed_nanos_u64, saturating_u64_from_usize, ChunkStorage, MetricSeries, Result};

const METADATA_LIST_PAGE_SIZE: usize = 4_096;

impl ChunkStorage {
    pub(in crate::engine::storage_engine) fn list_metrics_api(&self) -> Result<Vec<MetricSeries>> {
        let context = self.metadata_listing_context();
        self.ensure_open()?;
        self.request_background_persisted_refresh_if_needed();
        let generation_before = context.live_series_pruning_generation();
        let mut listed = Vec::new();
        let mut dead_series_ids = Vec::new();
        let mut cursor = None;

        loop {
            let page = context.materialized_series_page_after(cursor, METADATA_LIST_PAGE_SIZE);
            if page.is_empty() {
                break;
            }

            cursor = page.last().copied();
            context.append_live_metric_series_page(&page, &mut listed, &mut dead_series_ids)?;

            if page.len() < METADATA_LIST_PAGE_SIZE {
                break;
            }
        }

        context
            .prune_dead_materialized_series_ids_if_stable(dead_series_ids, Some(generation_before));

        Ok(listed)
    }

    pub(in crate::engine::storage_engine) fn list_metrics_with_wal_api(
        &self,
    ) -> Result<Vec<MetricSeries>> {
        let context = self.metadata_listing_context();
        self.ensure_open()?;
        let mut series = self
            .list_metrics_api()?
            .into_iter()
            .collect::<BTreeSet<_>>();
        for definition in context.wal_metric_series()? {
            series.insert(definition);
        }
        Ok(series.into_iter().collect())
    }

    pub(in crate::engine::storage_engine) fn list_metrics_in_shards_api(
        &self,
        scope: &crate::storage::MetadataShardScope,
    ) -> Result<Vec<MetricSeries>> {
        let context = self.metadata_shard_scope_context();
        self.ensure_open()?;
        self.request_background_persisted_refresh_if_needed();
        let scope = scope.normalized()?;
        context.live_metric_series_for_scope(&scope, "list_metrics_in_shards")
    }
    pub(in crate::engine::storage_engine) fn select_series_api(
        &self,
        selection: &SeriesSelection,
    ) -> Result<Vec<MetricSeries>> {
        self.select_series_with_optional_scope_api(selection, None)
    }

    pub(in crate::engine::storage_engine) fn select_series_in_shards_api(
        &self,
        selection: &SeriesSelection,
        scope: &crate::storage::MetadataShardScope,
    ) -> Result<Vec<MetricSeries>> {
        let scope = scope.normalized()?;
        self.select_series_with_optional_scope_api(selection, Some(scope))
    }

    fn select_series_with_optional_scope_api(
        &self,
        selection: &SeriesSelection,
        scope: Option<crate::storage::MetadataShardScope>,
    ) -> Result<Vec<MetricSeries>> {
        self.observability
            .query
            .select_series_calls_total
            .fetch_add(1, Ordering::Relaxed);
        let started = Instant::now();

        let result = (|| -> Result<Vec<MetricSeries>> {
            self.ensure_open()?;
            self.request_background_persisted_refresh_if_needed();
            if let Some((start, end)) = selection.normalized_time_range()? {
                self.record_query_tier_plan(self.query_tier_plan(start, end));
                #[cfg(test)]
                self.invoke_metadata_query_time_range_summary_hook();
            }
            match scope.as_ref() {
                Some(scope) => self.select_series_in_shards_impl(selection, scope),
                None => self.select_series_impl(selection),
            }
        })();

        self.observability
            .query
            .select_series_duration_nanos_total
            .fetch_add(elapsed_nanos_u64(started), Ordering::Relaxed);

        match result {
            Ok(series) => {
                self.observability
                    .query
                    .select_series_returned_total
                    .fetch_add(saturating_u64_from_usize(series.len()), Ordering::Relaxed);
                Ok(series)
            }
            Err(err) => {
                self.observability
                    .query
                    .select_series_errors_total
                    .fetch_add(1, Ordering::Relaxed);
                Err(err)
            }
        }
    }
}