tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::*;
use crate::engine::compactor::CompactionRunStats;

#[derive(Default)]
pub(super) struct StorageObservabilityCounters {
    pub(super) wal: WalObservabilityCounters,
    pub(super) retention: RetentionObservabilityCounters,
    pub(super) flush: FlushObservabilityCounters,
    pub(super) compaction: CompactionObservabilityCounters,
    pub(super) query: QueryObservabilityCounters,
    pub(super) rollup: RollupObservabilityCounters,
    pub(super) remote: RemoteStorageObservabilityState,
    pub(super) health: HealthObservabilityState,
}

impl StorageObservabilityCounters {
    pub(super) fn record_admission_backpressure_delay(&self, duration: Duration) {
        self.flush
            .admission_backpressure_delays_total
            .fetch_add(1, Ordering::Relaxed);
        self.flush
            .admission_backpressure_delay_nanos_total
            .fetch_add(
                duration.as_nanos().min(u64::MAX as u128) as u64,
                Ordering::Relaxed,
            );
    }

    pub(super) fn record_admission_pressure_relief_request(&self) {
        self.flush
            .admission_pressure_relief_requests_total
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(super) fn record_admission_pressure_relief_observed(&self) {
        self.flush
            .admission_pressure_relief_observed_total
            .fetch_add(1, Ordering::Relaxed);
    }

    pub(super) fn record_compaction_result(&self, stats: CompactionRunStats, duration_nanos: u64) {
        self.compaction.runs_total.fetch_add(1, Ordering::Relaxed);
        self.compaction
            .duration_nanos_total
            .fetch_add(duration_nanos, Ordering::Relaxed);
        self.compaction.source_segments_total.fetch_add(
            saturating_u64_from_usize(stats.source_segments),
            Ordering::Relaxed,
        );
        self.compaction.output_segments_total.fetch_add(
            saturating_u64_from_usize(stats.output_segments),
            Ordering::Relaxed,
        );
        self.compaction.source_chunks_total.fetch_add(
            saturating_u64_from_usize(stats.source_chunks),
            Ordering::Relaxed,
        );
        self.compaction.output_chunks_total.fetch_add(
            saturating_u64_from_usize(stats.output_chunks),
            Ordering::Relaxed,
        );
        self.compaction.source_points_total.fetch_add(
            saturating_u64_from_usize(stats.source_points),
            Ordering::Relaxed,
        );
        self.compaction.output_points_total.fetch_add(
            saturating_u64_from_usize(stats.output_points),
            Ordering::Relaxed,
        );
        if stats.compacted {
            self.compaction
                .success_total
                .fetch_add(1, Ordering::Relaxed);
        } else {
            self.compaction.noop_total.fetch_add(1, Ordering::Relaxed);
        }
    }

    pub(super) fn record_compaction_error(&self, duration_nanos: u64) {
        self.compaction.runs_total.fetch_add(1, Ordering::Relaxed);
        self.compaction.errors_total.fetch_add(1, Ordering::Relaxed);
        self.compaction
            .duration_nanos_total
            .fetch_add(duration_nanos, Ordering::Relaxed);
    }

    pub(super) fn record_background_worker_error(
        &self,
        worker: &'static str,
        error: &TsinkError,
        fail_fast_enabled: bool,
    ) {
        self.health
            .background_errors_total
            .fetch_add(1, Ordering::Relaxed);
        *self.health.last_background_error.write() =
            Some(format!("{worker} worker error: {error}"));
        if fail_fast_enabled {
            self.health
                .fail_fast_triggered
                .store(true, Ordering::SeqCst);
        }
    }

    pub(super) fn record_maintenance_error(&self, operation: &'static str, error: &TsinkError) {
        self.health
            .maintenance_errors_total
            .fetch_add(1, Ordering::Relaxed);
        *self.health.last_maintenance_error.write() = Some(format!("{operation}: {error}"));
    }

    pub(super) fn record_remote_catalog_refresh_success(&self, refreshed_at_unix_ms: u64) {
        self.remote
            .catalog_refreshes_total
            .fetch_add(1, Ordering::Relaxed);
        self.remote.accessible.store(true, Ordering::Relaxed);
        self.remote
            .last_successful_refresh_unix_ms
            .store(refreshed_at_unix_ms, Ordering::Relaxed);
        self.remote
            .last_refresh_attempt_unix_ms
            .store(refreshed_at_unix_ms, Ordering::Relaxed);
        self.remote
            .consecutive_refresh_failures
            .store(0, Ordering::Relaxed);
        self.remote
            .next_refresh_retry_unix_ms
            .store(0, Ordering::Relaxed);
        *self.remote.last_refresh_error.write() = None;
    }

    pub(super) fn record_remote_catalog_refresh_error(
        &self,
        operation: &'static str,
        error: &TsinkError,
        attempted_at_unix_ms: u64,
        next_retry_unix_ms: u64,
        consecutive_failures: u32,
    ) {
        self.remote
            .catalog_refresh_errors_total
            .fetch_add(1, Ordering::Relaxed);
        self.remote.accessible.store(false, Ordering::Relaxed);
        self.remote
            .last_refresh_attempt_unix_ms
            .store(attempted_at_unix_ms, Ordering::Relaxed);
        self.remote
            .next_refresh_retry_unix_ms
            .store(next_retry_unix_ms, Ordering::Relaxed);
        self.remote
            .consecutive_refresh_failures
            .store(u64::from(consecutive_failures), Ordering::Relaxed);
        *self.remote.last_refresh_error.write() = Some(format!("{operation}: {error}"));
    }
}

#[derive(Default)]
pub(super) struct WalObservabilityCounters {
    pub(super) replay_runs_total: AtomicU64,
    pub(super) replay_frames_total: AtomicU64,
    pub(super) replay_series_definitions_total: AtomicU64,
    pub(super) replay_sample_batches_total: AtomicU64,
    pub(super) replay_points_total: AtomicU64,
    pub(super) replay_errors_total: AtomicU64,
    pub(super) replay_duration_nanos_total: AtomicU64,
    pub(super) append_series_definitions_total: AtomicU64,
    pub(super) append_sample_batches_total: AtomicU64,
    pub(super) append_points_total: AtomicU64,
    pub(super) append_bytes_total: AtomicU64,
    pub(super) append_errors_total: AtomicU64,
    pub(super) resets_total: AtomicU64,
    pub(super) reset_errors_total: AtomicU64,
}

pub(super) struct RetentionObservabilityCounters {
    pub(super) future_skew_points_total: AtomicU64,
    pub(super) future_skew_max_timestamp: AtomicI64,
}

impl Default for RetentionObservabilityCounters {
    fn default() -> Self {
        Self {
            future_skew_points_total: AtomicU64::new(0),
            future_skew_max_timestamp: AtomicI64::new(i64::MIN),
        }
    }
}

#[derive(Default)]
pub(super) struct FlushObservabilityCounters {
    pub(super) pipeline_runs_total: AtomicU64,
    pub(super) pipeline_success_total: AtomicU64,
    pub(super) pipeline_timeout_total: AtomicU64,
    pub(super) pipeline_errors_total: AtomicU64,
    pub(super) pipeline_duration_nanos_total: AtomicU64,
    pub(super) admission_backpressure_delays_total: AtomicU64,
    pub(super) admission_backpressure_delay_nanos_total: AtomicU64,
    pub(super) admission_pressure_relief_requests_total: AtomicU64,
    pub(super) admission_pressure_relief_observed_total: AtomicU64,
    pub(super) active_flush_runs_total: AtomicU64,
    pub(super) active_flush_errors_total: AtomicU64,
    pub(super) active_flushed_series_total: AtomicU64,
    pub(super) active_flushed_chunks_total: AtomicU64,
    pub(super) active_flushed_points_total: AtomicU64,
    pub(super) persist_runs_total: AtomicU64,
    pub(super) persist_success_total: AtomicU64,
    pub(super) persist_noop_total: AtomicU64,
    pub(super) persist_errors_total: AtomicU64,
    pub(super) persisted_series_total: AtomicU64,
    pub(super) persisted_chunks_total: AtomicU64,
    pub(super) persisted_points_total: AtomicU64,
    pub(super) persisted_segments_total: AtomicU64,
    pub(super) persist_duration_nanos_total: AtomicU64,
    pub(super) evicted_sealed_chunks_total: AtomicU64,
    pub(super) tier_moves_total: AtomicU64,
    pub(super) tier_move_errors_total: AtomicU64,
    pub(super) expired_segments_total: AtomicU64,
    pub(super) hot_segments_visible: AtomicU64,
    pub(super) warm_segments_visible: AtomicU64,
    pub(super) cold_segments_visible: AtomicU64,
}

#[derive(Default)]
pub(super) struct CompactionObservabilityCounters {
    pub(super) runs_total: AtomicU64,
    pub(super) success_total: AtomicU64,
    pub(super) noop_total: AtomicU64,
    pub(super) errors_total: AtomicU64,
    pub(super) source_segments_total: AtomicU64,
    pub(super) output_segments_total: AtomicU64,
    pub(super) source_chunks_total: AtomicU64,
    pub(super) output_chunks_total: AtomicU64,
    pub(super) source_points_total: AtomicU64,
    pub(super) output_points_total: AtomicU64,
    pub(super) duration_nanos_total: AtomicU64,
}

#[derive(Default)]
pub(super) struct QueryObservabilityCounters {
    pub(super) select_calls_total: AtomicU64,
    pub(super) select_errors_total: AtomicU64,
    pub(super) select_duration_nanos_total: AtomicU64,
    pub(super) select_points_returned_total: AtomicU64,
    pub(super) select_with_options_calls_total: AtomicU64,
    pub(super) select_with_options_errors_total: AtomicU64,
    pub(super) select_with_options_duration_nanos_total: AtomicU64,
    pub(super) select_with_options_points_returned_total: AtomicU64,
    pub(super) select_all_calls_total: AtomicU64,
    pub(super) select_all_errors_total: AtomicU64,
    pub(super) select_all_duration_nanos_total: AtomicU64,
    pub(super) select_all_series_returned_total: AtomicU64,
    pub(super) select_all_points_returned_total: AtomicU64,
    pub(super) select_series_calls_total: AtomicU64,
    pub(super) select_series_errors_total: AtomicU64,
    pub(super) select_series_duration_nanos_total: AtomicU64,
    pub(super) select_series_returned_total: AtomicU64,
    pub(super) merge_path_queries_total: AtomicU64,
    pub(super) merge_path_shard_snapshots_total: AtomicU64,
    pub(super) merge_path_shard_snapshot_wait_nanos_total: AtomicU64,
    pub(super) merge_path_shard_snapshot_hold_nanos_total: AtomicU64,
    pub(super) append_sort_path_queries_total: AtomicU64,
    pub(super) hot_only_query_plans_total: AtomicU64,
    pub(super) warm_tier_query_plans_total: AtomicU64,
    pub(super) cold_tier_query_plans_total: AtomicU64,
    pub(super) hot_tier_persisted_chunks_read_total: AtomicU64,
    pub(super) warm_tier_persisted_chunks_read_total: AtomicU64,
    pub(super) cold_tier_persisted_chunks_read_total: AtomicU64,
    pub(super) warm_tier_fetch_duration_nanos_total: AtomicU64,
    pub(super) cold_tier_fetch_duration_nanos_total: AtomicU64,
    pub(super) rollup_query_plans_total: AtomicU64,
    pub(super) partial_rollup_query_plans_total: AtomicU64,
    pub(super) rollup_points_read_total: AtomicU64,
}

#[derive(Default)]
pub(super) struct RollupObservabilityCounters {
    pub(super) worker_runs_total: AtomicU64,
    pub(super) worker_success_total: AtomicU64,
    pub(super) worker_errors_total: AtomicU64,
    pub(super) policy_runs_total: AtomicU64,
    pub(super) buckets_materialized_total: AtomicU64,
    pub(super) points_materialized_total: AtomicU64,
    pub(super) last_run_duration_nanos: AtomicU64,
}

pub(super) struct RemoteStorageObservabilityState {
    pub(super) catalog_refreshes_total: AtomicU64,
    pub(super) catalog_refresh_errors_total: AtomicU64,
    pub(super) accessible: AtomicBool,
    pub(super) last_refresh_attempt_unix_ms: AtomicU64,
    pub(super) last_successful_refresh_unix_ms: AtomicU64,
    pub(super) consecutive_refresh_failures: AtomicU64,
    pub(super) next_refresh_retry_unix_ms: AtomicU64,
    pub(super) last_refresh_error: RwLock<Option<String>>,
}

impl RemoteStorageObservabilityState {
    pub(super) fn accessible_or_default(&self, enabled: bool) -> bool {
        if enabled {
            self.accessible.load(Ordering::Relaxed)
        } else {
            true
        }
    }
}

impl Default for RemoteStorageObservabilityState {
    fn default() -> Self {
        Self {
            catalog_refreshes_total: AtomicU64::new(0),
            catalog_refresh_errors_total: AtomicU64::new(0),
            accessible: AtomicBool::new(true),
            last_refresh_attempt_unix_ms: AtomicU64::new(0),
            last_successful_refresh_unix_ms: AtomicU64::new(0),
            consecutive_refresh_failures: AtomicU64::new(0),
            next_refresh_retry_unix_ms: AtomicU64::new(0),
            last_refresh_error: RwLock::new(None),
        }
    }
}

#[derive(Default)]
pub(super) struct HealthObservabilityState {
    pub(super) background_errors_total: AtomicU64,
    pub(super) maintenance_errors_total: AtomicU64,
    pub(super) fail_fast_triggered: AtomicBool,
    pub(super) last_background_error: RwLock<Option<String>>,
    pub(super) last_maintenance_error: RwLock<Option<String>>,
}