tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{Instant, SystemTime, UNIX_EPOCH};

use serde::{Deserialize, Serialize};

use super::metrics::{QueryObservabilityCounters, RollupObservabilityCounters};
use super::*;
use crate::engine::fs_utils::write_file_atomically_and_sync_parent;
use crate::engine::query::TieredQueryPlan;
use crate::engine::tombstone::{TombstoneMap, TombstoneRange};
use crate::query_aggregation::{bucket_start_for_origin, downsample_points_with_origin};
use crate::storage::{RollupObservabilitySnapshot, RollupPolicy, RollupPolicyStatus};
use crate::validation::{validate_labels, validate_metric};
use crate::Aggregation;

#[path = "rollups/materialization.rs"]
mod materialization;
#[path = "rollups/policy.rs"]
mod policy;
#[path = "rollups/runtime.rs"]
mod runtime;

#[cfg(test)]
use self::runtime::{load_rollup_state, persist_rollup_state};

pub(super) const ROLLUP_DIR_NAME: &str = ".rollups";
const ROLLUP_POLICIES_FILE_NAME: &str = "policies.json";
const ROLLUP_STATE_FILE_NAME: &str = "state.json";
const ROLLUP_POLICIES_MAGIC: &str = "tsink-rollup-policies";
const ROLLUP_STATE_MAGIC: &str = "tsink-rollup-state";
const ROLLUP_SCHEMA_VERSION: u16 = 1;
pub(super) const INTERNAL_ROLLUP_METRIC_PREFIX: &str = "__tsink_rollup__:";

#[cfg(test)]
type RollupPolicyStartHook = dyn Fn(&RollupPolicy) + Send + Sync + 'static;

#[cfg(test)]
type RollupStatePersistHook = dyn Fn() -> Result<()> + Send + Sync + 'static;

#[cfg(test)]
#[derive(Default)]
struct RollupTestHooks {
    policy_start_hook: RwLock<Option<Arc<RollupPolicyStartHook>>>,
    state_persist_hook: RwLock<Option<Arc<RollupStatePersistHook>>>,
}

#[cfg(test)]
impl std::fmt::Debug for RollupTestHooks {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        formatter.debug_struct("RollupTestHooks").finish()
    }
}

#[derive(Debug)]
pub(super) struct RollupRuntimeState {
    dir_path: Option<PathBuf>,
    policies_path: Option<PathBuf>,
    state_path: Option<PathBuf>,
    policies: RwLock<Vec<RollupPolicy>>,
    checkpoints: RwLock<HashMap<String, BTreeMap<String, i64>>>,
    pending_materializations:
        RwLock<HashMap<String, BTreeMap<String, PendingRollupMaterialization>>>,
    pending_delete_invalidations: RwLock<Vec<PendingRollupDeleteInvalidation>>,
    generations: RwLock<HashMap<String, u64>>,
    policy_stats: RwLock<BTreeMap<String, PolicyRunState>>,
    #[cfg(test)]
    test_hooks: RollupTestHooks,
}

#[derive(Debug, Clone, Default)]
struct PolicyRunState {
    matched_series: u64,
    materialized_series: u64,
    materialized_through: Option<i64>,
    last_run_started_at_ms: Option<u64>,
    last_run_completed_at_ms: Option<u64>,
    last_run_duration_nanos: u64,
    last_error: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedRollupPoliciesFile {
    magic: String,
    version: u16,
    policies: Vec<RollupPolicy>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedRollupStateFile {
    magic: String,
    version: u16,
    checkpoints: Vec<PersistedRollupCheckpoint>,
    #[serde(default)]
    pending_materializations: Vec<PersistedPendingRollupMaterialization>,
    #[serde(default)]
    pending_delete_invalidations: Vec<PersistedPendingRollupDeleteInvalidation>,
    #[serde(default)]
    generations: Vec<PersistedRollupGeneration>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedRollupCheckpoint {
    policy_id: String,
    source_key: String,
    materialized_through: i64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedRollupGeneration {
    policy_id: String,
    generation: u64,
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct PendingRollupMaterialization {
    checkpoint: i64,
    materialized_through: i64,
    generation: u64,
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct PendingRollupDeleteInvalidation {
    tombstone: TombstoneRange,
    series_ids: Vec<SeriesId>,
    affected_policy_ids: Vec<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedPendingRollupMaterialization {
    policy_id: String,
    source_key: String,
    checkpoint: i64,
    materialized_through: i64,
    generation: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedPendingRollupDeleteInvalidation {
    tombstone: TombstoneRange,
    series_ids: Vec<SeriesId>,
    affected_policy_ids: Vec<String>,
}

#[derive(Debug, Default)]
struct LoadedRollupState {
    checkpoints: HashMap<String, BTreeMap<String, i64>>,
    pending_materializations: HashMap<String, BTreeMap<String, PendingRollupMaterialization>>,
    pending_delete_invalidations: Vec<PendingRollupDeleteInvalidation>,
    generations: HashMap<String, u64>,
}

#[derive(Debug, Clone)]
struct RollupRuntimeSnapshot {
    policies: Vec<RollupPolicy>,
    checkpoints: HashMap<String, BTreeMap<String, i64>>,
    pending_materializations: HashMap<String, BTreeMap<String, PendingRollupMaterialization>>,
    pending_delete_invalidations: Vec<PendingRollupDeleteInvalidation>,
    generations: HashMap<String, u64>,
    policy_stats: BTreeMap<String, PolicyRunState>,
}

#[derive(Debug, Clone)]
struct RollupSourceSeries {
    series_id: SeriesId,
    labels: Vec<Label>,
    source_key: String,
}

#[derive(Debug, Clone)]
pub(in crate::engine) struct RollupQueryCandidate {
    pub(in crate::engine) policy: RollupPolicy,
    pub(in crate::engine) metric: String,
    pub(in crate::engine) materialized_through: i64,
}

trait RollupSourceReadOps {
    fn live_series_ids(
        &self,
        candidate_series_ids: Vec<SeriesId>,
        prune_dead: bool,
    ) -> Result<Vec<SeriesId>>;

    fn query_tier_plan(&self, start: i64, end: i64) -> TieredQueryPlan;

    fn collect_points_for_series_with_plan(
        &self,
        series_id: SeriesId,
        start: i64,
        end: i64,
        plan: TieredQueryPlan,
    ) -> Result<Vec<DataPoint>>;

    fn bounded_recency_reference_timestamp(&self) -> Option<i64>;
}

#[derive(Clone, Copy)]
struct RollupStateStoreContext<'a> {
    state: &'a RollupRuntimeState,
}

#[derive(Clone, Copy)]
struct RollupRegistryReadContext<'a> {
    registry: &'a RwLock<SeriesRegistry>,
}

#[derive(Clone, Copy)]
struct RollupDeleteRepairContext<'a> {
    store: RollupStateStoreContext<'a>,
    tombstones: &'a RwLock<TombstoneMap>,
}

#[derive(Clone, Copy)]
struct RollupInvalidationContext<'a> {
    store: RollupStateStoreContext<'a>,
    registry: RollupRegistryReadContext<'a>,
}

#[derive(Clone, Copy)]
struct RollupSourceReadContext<'a> {
    registry: RollupRegistryReadContext<'a>,
    ops: &'a dyn RollupSourceReadOps,
}

trait RollupMaterializedWriteOps {
    fn insert_rows(&self, rows: &[Row]) -> Result<WriteResult>;
}

#[derive(Clone, Copy)]
struct RollupMaterializedWriteContext<'a> {
    ops: &'a dyn RollupMaterializedWriteOps,
}

#[derive(Clone, Copy)]
struct RollupQuerySelectionContext<'a> {
    store: RollupStateStoreContext<'a>,
    registry: RollupRegistryReadContext<'a>,
    source_reads: RollupSourceReadContext<'a>,
    rollup_observability: &'a RollupObservabilityCounters,
    query_observability: &'a QueryObservabilityCounters,
}

#[derive(Clone, Copy)]
struct RollupRunCoordinationContext<'a> {
    run_lock: &'a Mutex<()>,
}

#[derive(Debug, Default)]
struct PolicyRunReport {
    matched_series: u64,
    materialized_series: u64,
    materialized_through: Option<i64>,
    buckets_materialized: u64,
    points_materialized: u64,
    checkpoint_changed: bool,
}

pub(super) fn is_internal_rollup_metric(metric: &str) -> bool {
    metric.starts_with(INTERNAL_ROLLUP_METRIC_PREFIX)
}

pub(super) fn rollup_metric_name(policy: &RollupPolicy, generation: u64) -> String {
    if generation == 0 {
        return format!(
            "{INTERNAL_ROLLUP_METRIC_PREFIX}{}:{}",
            policy.id, policy.metric
        );
    }

    format!(
        "{INTERNAL_ROLLUP_METRIC_PREFIX}{}:g{}:{}",
        policy.id, generation, policy.metric
    )
}

fn policy_matches_source(policy: &RollupPolicy, metric: &str, labels: &[Label]) -> bool {
    policy.metric == metric
        && policy
            .match_labels
            .iter()
            .all(|required| labels.iter().any(|candidate| candidate == required))
}

fn source_series_key(metric: &str, labels: &[Label]) -> String {
    crate::storage::shard_window_series_identity_key(metric, labels)
}

fn aligned_materialized_end(policy: &RollupPolicy, max_observed: i64) -> Option<i64> {
    (max_observed != i64::MIN)
        .then(|| bucket_start_for_origin(max_observed, policy.bucket_origin, policy.interval))
}

fn is_bucket_aligned(timestamp: i64, policy: &RollupPolicy) -> bool {
    bucket_start_for_origin(timestamp, policy.bucket_origin, policy.interval) == timestamp
}

impl RollupSourceReadOps for ChunkStorage {
    fn live_series_ids(
        &self,
        candidate_series_ids: Vec<SeriesId>,
        prune_dead: bool,
    ) -> Result<Vec<SeriesId>> {
        ChunkStorage::live_series_ids(self, candidate_series_ids, prune_dead)
    }

    fn query_tier_plan(&self, start: i64, end: i64) -> TieredQueryPlan {
        ChunkStorage::query_tier_plan(self, start, end)
    }

    fn collect_points_for_series_with_plan(
        &self,
        series_id: SeriesId,
        start: i64,
        end: i64,
        plan: TieredQueryPlan,
    ) -> Result<Vec<DataPoint>> {
        ChunkStorage::collect_points_for_series_with_plan(self, series_id, start, end, plan)
            .map(|(points, _)| points)
    }

    fn bounded_recency_reference_timestamp(&self) -> Option<i64> {
        ChunkStorage::bounded_recency_reference_timestamp(self)
    }
}

impl RollupMaterializedWriteOps for ChunkStorage {
    fn insert_rows(&self, rows: &[Row]) -> Result<WriteResult> {
        ChunkStorage::insert_rows_impl(self, rows)
    }
}

impl ChunkStorage {
    fn rollup_state_store_context(&self) -> RollupStateStoreContext<'_> {
        RollupStateStoreContext {
            state: &self.rollups.runtime,
        }
    }

    fn rollup_registry_read_context(&self) -> RollupRegistryReadContext<'_> {
        RollupRegistryReadContext {
            registry: &self.catalog.registry,
        }
    }

    fn rollup_delete_repair_context(&self) -> RollupDeleteRepairContext<'_> {
        RollupDeleteRepairContext {
            store: self.rollup_state_store_context(),
            tombstones: &self.visibility.tombstones,
        }
    }

    fn rollup_invalidation_context(&self) -> RollupInvalidationContext<'_> {
        RollupInvalidationContext {
            store: self.rollup_state_store_context(),
            registry: self.rollup_registry_read_context(),
        }
    }

    fn rollup_source_read_context(&self) -> RollupSourceReadContext<'_> {
        RollupSourceReadContext {
            registry: self.rollup_registry_read_context(),
            ops: self,
        }
    }

    fn rollup_materialized_write_context(&self) -> RollupMaterializedWriteContext<'_> {
        RollupMaterializedWriteContext { ops: self }
    }

    fn rollup_query_selection_context(&self) -> RollupQuerySelectionContext<'_> {
        RollupQuerySelectionContext {
            store: self.rollup_state_store_context(),
            registry: self.rollup_registry_read_context(),
            source_reads: self.rollup_source_read_context(),
            rollup_observability: &self.observability.rollup,
            query_observability: &self.observability.query,
        }
    }

    fn rollup_run_coordination_context(&self) -> RollupRunCoordinationContext<'_> {
        RollupRunCoordinationContext {
            run_lock: &self.rollups.run_lock,
        }
    }
}

#[cfg(test)]
#[path = "rollups/tests.rs"]
mod tests;