tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::runtime::pending_delete_blocks_rollup_candidate;
use super::*;

pub(super) fn normalize_policy(mut policy: RollupPolicy) -> Result<RollupPolicy> {
    if policy.id.trim().is_empty() {
        return Err(TsinkError::InvalidConfiguration(
            "rollup policy id must not be empty".to_string(),
        ));
    }
    if policy.interval <= 0 {
        return Err(TsinkError::InvalidConfiguration(format!(
            "rollup policy {} interval must be positive",
            policy.id
        )));
    }
    if policy.aggregation == Aggregation::None {
        return Err(TsinkError::InvalidConfiguration(format!(
            "rollup policy {} requires a concrete aggregation",
            policy.id
        )));
    }
    if is_internal_rollup_metric(&policy.metric) {
        return Err(TsinkError::InvalidConfiguration(format!(
            "rollup policy {} cannot target internal rollup metric {}",
            policy.id, policy.metric
        )));
    }
    validate_metric(&policy.metric)?;
    policy.match_labels.sort();
    policy.match_labels.dedup();
    validate_labels(&policy.match_labels)?;
    Ok(policy)
}

pub(super) fn load_rollup_policies(path: Option<&Path>) -> Result<Vec<RollupPolicy>> {
    let Some(path) = path else {
        return Ok(Vec::new());
    };
    let bytes = match fs::read(path) {
        Ok(bytes) => bytes,
        Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
        Err(err) => {
            return Err(TsinkError::IoWithPath {
                path: path.to_path_buf(),
                source: err,
            });
        }
    };

    let file: PersistedRollupPoliciesFile = serde_json::from_slice(&bytes)?;
    if file.magic != ROLLUP_POLICIES_MAGIC || file.version != ROLLUP_SCHEMA_VERSION {
        return Err(TsinkError::DataCorruption(format!(
            "unsupported rollup policies file {}",
            path.display()
        )));
    }

    file.policies
        .into_iter()
        .map(normalize_policy)
        .collect::<Result<Vec<_>>>()
}

pub(super) fn persist_rollup_policies(
    path: Option<&Path>,
    policies: &[RollupPolicy],
) -> Result<()> {
    let Some(path) = path else {
        if policies.is_empty() {
            return Ok(());
        }
        return Err(TsinkError::InvalidConfiguration(
            "rollup policies require persistent storage".to_string(),
        ));
    };

    let payload = PersistedRollupPoliciesFile {
        magic: ROLLUP_POLICIES_MAGIC.to_string(),
        version: ROLLUP_SCHEMA_VERSION,
        policies: policies.to_vec(),
    };
    let encoded = serde_json::to_vec_pretty(&payload)?;
    if let Some(parent) = path.parent() {
        fs::create_dir_all(parent)?;
    }
    write_file_atomically_and_sync_parent(path, &encoded)
}

impl RollupQuerySelectionContext<'_> {
    fn rollup_query_candidate(
        self,
        metric: &str,
        labels: &[Label],
        interval: i64,
        aggregation: Aggregation,
        start: i64,
        end: i64,
    ) -> Option<RollupQueryCandidate> {
        if is_internal_rollup_metric(metric) || aggregation == Aggregation::None || interval <= 0 {
            return None;
        }
        let series_id = self.registry.resolve_existing_series_id(metric, labels)?;

        let checkpoints = self.store.state.checkpoints.read();
        let generations = self.store.state.generations.read();
        let pending_delete_invalidations = self.store.state.pending_delete_invalidations.read();
        let source_key = source_series_key(metric, labels);
        self.store
            .state
            .policies
            .read()
            .iter()
            .filter(|policy| {
                policy.interval == interval
                    && policy.aggregation == aggregation
                    && is_bucket_aligned(start, policy)
                    && policy_matches_source(policy, metric, labels)
            })
            .filter_map(|policy| {
                let materialized_through = checkpoints
                    .get(&policy.id)
                    .and_then(|items| items.get(&source_key))
                    .copied()?;
                let covered_end = materialized_through.min(end);
                (covered_end > start
                    && !pending_delete_blocks_rollup_candidate(
                        &pending_delete_invalidations,
                        series_id,
                        &policy.id,
                        start,
                        covered_end,
                    ))
                .then(|| RollupQueryCandidate {
                    policy: policy.clone(),
                    metric: rollup_metric_name(
                        policy,
                        generations.get(&policy.id).copied().unwrap_or(0),
                    ),
                    materialized_through,
                })
            })
            .max_by(|left, right| {
                (
                    left.policy.match_labels.len(),
                    left.materialized_through,
                    &left.policy.id,
                )
                    .cmp(&(
                        right.policy.match_labels.len(),
                        right.materialized_through,
                        &right.policy.id,
                    ))
            })
    }

    fn record_rollup_query_use(self, points_read: usize, partial: bool) {
        self.query_observability
            .rollup_query_plans_total
            .fetch_add(1, Ordering::Relaxed);
        if partial {
            self.query_observability
                .partial_rollup_query_plans_total
                .fetch_add(1, Ordering::Relaxed);
        }
        self.query_observability
            .rollup_points_read_total
            .fetch_add(saturating_u64_from_usize(points_read), Ordering::Relaxed);
    }

    fn rollup_observability_snapshot(self) -> RollupObservabilitySnapshot {
        let policies = self.store.policies_snapshot();
        let checkpoints = self.store.checkpoints_snapshot();
        let policy_stats = self.store.policy_stats_snapshot();
        let max_observed = self
            .source_reads
            .bounded_recency_reference_timestamp()
            .unwrap_or(i64::MIN);

        let policies = policies
            .into_iter()
            .map(|policy| {
                let matched_sources = self
                    .source_reads
                    .matching_rollup_sources_best_effort(&policy);
                let mut min_through = None::<i64>;
                let mut materialized_series = 0u64;
                if let Some(entries) = checkpoints.get(&policy.id) {
                    for source in &matched_sources {
                        if let Some(materialized_through) = entries.get(&source.source_key).copied()
                        {
                            materialized_series = materialized_series.saturating_add(1);
                            min_through = Some(
                                min_through
                                    .map(|current| current.min(materialized_through))
                                    .unwrap_or(materialized_through),
                            );
                        }
                    }
                }
                let runtime = policy_stats.get(&policy.id).cloned().unwrap_or_default();

                RollupPolicyStatus {
                    policy,
                    matched_series: u64::try_from(matched_sources.len()).unwrap_or(u64::MAX),
                    materialized_series,
                    materialized_through: runtime.materialized_through.or(min_through),
                    lag: runtime
                        .materialized_through
                        .or(min_through)
                        .and_then(|through| {
                            (max_observed != i64::MIN)
                                .then_some(max_observed.saturating_sub(through))
                        }),
                    last_run_started_at_ms: runtime.last_run_started_at_ms,
                    last_run_completed_at_ms: runtime.last_run_completed_at_ms,
                    last_run_duration_nanos: runtime.last_run_duration_nanos,
                    last_error: runtime.last_error,
                }
            })
            .collect::<Vec<_>>();

        RollupObservabilitySnapshot {
            worker_runs_total: self
                .rollup_observability
                .worker_runs_total
                .load(Ordering::Relaxed),
            worker_success_total: self
                .rollup_observability
                .worker_success_total
                .load(Ordering::Relaxed),
            worker_errors_total: self
                .rollup_observability
                .worker_errors_total
                .load(Ordering::Relaxed),
            policy_runs_total: self
                .rollup_observability
                .policy_runs_total
                .load(Ordering::Relaxed),
            buckets_materialized_total: self
                .rollup_observability
                .buckets_materialized_total
                .load(Ordering::Relaxed),
            points_materialized_total: self
                .rollup_observability
                .points_materialized_total
                .load(Ordering::Relaxed),
            last_run_duration_nanos: self
                .rollup_observability
                .last_run_duration_nanos
                .load(Ordering::Relaxed),
            policies,
        }
    }
}

impl ChunkStorage {
    pub(in crate::engine) fn apply_rollup_policies_impl(
        &self,
        policies: Vec<RollupPolicy>,
    ) -> Result<RollupObservabilitySnapshot> {
        self.ensure_open()?;
        let state_store = self.rollup_state_store_context();
        if state_store.dir_path().is_none() {
            return Err(TsinkError::InvalidConfiguration(
                "rollup policies require persistent storage (data_path)".to_string(),
            ));
        }

        let mut normalized = Vec::with_capacity(policies.len());
        let mut ids = BTreeSet::new();
        for policy in policies {
            let policy = normalize_policy(policy)?;
            if !ids.insert(policy.id.clone()) {
                return Err(TsinkError::InvalidConfiguration(format!(
                    "duplicate rollup policy id {}",
                    policy.id
                )));
            }
            normalized.push(policy);
        }
        normalized.sort_by(|left, right| left.id.cmp(&right.id));

        let _run_guard = self.rollup_run_coordination_context().run_lock.lock();
        let snapshot = state_store.next_snapshot_for_policies(normalized);
        state_store.persist_snapshot(&snapshot)?;
        state_store.install_snapshot(snapshot);
        self.run_rollup_pipeline_once_locked()?;
        Ok(self.rollup_observability_snapshot())
    }

    pub(in crate::engine) fn rollup_query_candidate(
        &self,
        metric: &str,
        labels: &[Label],
        interval: i64,
        aggregation: Aggregation,
        start: i64,
        end: i64,
    ) -> Option<RollupQueryCandidate> {
        self.rollup_query_selection_context()
            .rollup_query_candidate(metric, labels, interval, aggregation, start, end)
    }

    pub(in crate::engine) fn record_rollup_query_use(&self, points_read: usize, partial: bool) {
        self.rollup_query_selection_context()
            .record_rollup_query_use(points_read, partial);
    }

    pub(in crate::engine) fn rollup_observability_snapshot(&self) -> RollupObservabilitySnapshot {
        self.rollup_query_selection_context()
            .rollup_observability_snapshot()
    }

    #[cfg(test)]
    pub(in crate::engine) fn set_rollup_policy_start_hook<F>(&self, hook: F)
    where
        F: Fn(&RollupPolicy) + Send + Sync + 'static,
    {
        self.rollup_state_store_context()
            .set_policy_start_hook(hook);
    }

    #[cfg(test)]
    pub(in crate::engine) fn clear_rollup_policy_start_hook(&self) {
        self.rollup_state_store_context().clear_policy_start_hook();
    }

    #[cfg(test)]
    pub(in crate::engine) fn set_rollup_state_persist_hook<F>(&self, hook: F)
    where
        F: Fn() -> Result<()> + Send + Sync + 'static,
    {
        self.rollup_state_store_context()
            .set_state_persist_hook(hook);
    }

    #[cfg(test)]
    pub(in crate::engine) fn clear_rollup_state_persist_hook(&self) {
        self.rollup_state_store_context().clear_state_persist_hook();
    }
}