tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use std::sync::atomic::Ordering;

use parking_lot::{RwLock, RwLockReadGuard};
use roaring::RoaringTreemap;

use crate::engine::query::TieredQueryPlan;
use crate::engine::series::SeriesId;
use crate::query_matcher::CompiledSeriesMatcher;
use crate::storage::{
    MetadataShardScope, QueryRowsPage, QueryRowsScanOptions, ShardWindowDigest,
    ShardWindowRowsPage, ShardWindowScanOptions,
};
use crate::{DataPoint, Label, MetricSeries, Result, SeriesSelection};

use super::core_impl::VisibilityCacheReadContext;
use super::query_read::{
    apply_offset_limit_in_place, dedupe_last_value_per_timestamp, PersistedTierFetchStats,
    RawSeriesPagination, RawSeriesScanPage,
};
use super::rollups::RollupQueryCandidate;
#[cfg(test)]
use super::tiering;
use super::tiering::RetentionTierPolicy;
#[cfg(test)]
use super::IngestCommitHook;
use super::{
    elapsed_nanos_u64, rollups, saturating_u64_from_usize, state, Chunk, ChunkContext,
    ChunkStorage, PersistedChunkRef, PersistedIndexState, SealedChunkKey, SeriesRegistry,
    SeriesVisibilitySummary,
};

#[path = "query_exec/candidate_planner.rs"]
mod candidate_planner;
#[path = "query_exec/metadata_api.rs"]
mod metadata_api;
#[path = "query_exec/metadata_context.rs"]
mod metadata_context;
#[path = "query_exec/metadata_postings.rs"]
mod metadata_postings;
#[path = "query_exec/metadata_series_selection.rs"]
mod metadata_series_selection;
#[path = "query_exec/read_context.rs"]
mod read_context;
#[path = "query_exec/scan_api.rs"]
mod scan_api;
#[path = "query_exec/select_api.rs"]
mod select_api;
#[path = "query_exec/time_range_filter.rs"]
mod time_range_filter;
#[path = "query_exec/time_range_planner.rs"]
mod time_range_planner;

use metadata_context::{MetadataSelectionContext, RuntimeMetadataCandidatePlan};
use read_context::TimeRangeFilterContext;

impl ChunkStorage {
    pub(super) fn query_tier_plan(&self, start: i64, end: i64) -> TieredQueryPlan {
        self.query_planning_context().query_tier_plan(start, end)
    }

    fn record_query_tier_plan(&self, plan: TieredQueryPlan) {
        if plan.is_hot_only() {
            self.observability
                .query
                .hot_only_query_plans_total
                .fetch_add(1, Ordering::Relaxed);
        }
        if plan.includes_warm() {
            self.observability
                .query
                .warm_tier_query_plans_total
                .fetch_add(1, Ordering::Relaxed);
        }
        if plan.includes_cold() {
            self.observability
                .query
                .cold_tier_query_plans_total
                .fetch_add(1, Ordering::Relaxed);
        }
    }

    fn record_persisted_tier_fetch_stats(&self, stats: PersistedTierFetchStats) {
        self.observability
            .query
            .hot_tier_persisted_chunks_read_total
            .fetch_add(stats.hot_persisted_chunks_read, Ordering::Relaxed);
        self.observability
            .query
            .warm_tier_persisted_chunks_read_total
            .fetch_add(stats.warm_persisted_chunks_read, Ordering::Relaxed);
        self.observability
            .query
            .cold_tier_persisted_chunks_read_total
            .fetch_add(stats.cold_persisted_chunks_read, Ordering::Relaxed);
        self.observability
            .query
            .warm_tier_fetch_duration_nanos_total
            .fetch_add(stats.warm_fetch_duration_nanos, Ordering::Relaxed);
        self.observability
            .query
            .cold_tier_fetch_duration_nanos_total
            .fetch_add(stats.cold_fetch_duration_nanos, Ordering::Relaxed);
    }

    fn request_background_persisted_refresh_if_needed(&self) {
        if self.persisted.persisted_index_dirty.load(Ordering::SeqCst)
            || self.should_refresh_remote_catalog()
        {
            self.notify_persisted_refresh_thread();
        }
    }
}

const SHARD_WINDOW_FNV_OFFSET_BASIS: u64 = crate::storage::SHARD_WINDOW_FNV_OFFSET_BASIS;

fn validate_shard_window_request(
    shard: u32,
    shard_count: u32,
    window_start: i64,
    window_end: i64,
) -> Result<()> {
    crate::storage::validate_shard_window_request(shard, shard_count, window_start, window_end)
}

fn validate_shard_window_scan_options(options: ShardWindowScanOptions) -> Result<()> {
    crate::storage::validate_shard_window_scan_options(options)
}

fn validate_query_rows_scan_options(options: QueryRowsScanOptions) -> Result<()> {
    crate::storage::validate_query_rows_scan_options(options)
}

fn shard_window_series_identity_key(metric: &str, labels: &[Label]) -> String {
    crate::storage::shard_window_series_identity_key(metric, labels)
}

fn shard_window_hash_data_point(point: &DataPoint) -> u64 {
    crate::storage::shard_window_hash_data_point(point)
}

fn shard_window_fnv1a_update(hash: &mut u64, bytes: &[u8]) {
    crate::storage::shard_window_fnv1a_update(hash, bytes)
}

fn sort_data_points_for_shard_window(points: &mut [DataPoint]) {
    crate::storage::sort_data_points_for_shard_window(points)
}