tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::*;
use parking_lot::{Mutex, MutexGuard};
use std::sync::atomic::AtomicBool;

#[derive(Clone, Copy)]
struct LifecycleShutdownContext<'a> {
    background_maintenance_lock: &'a Mutex<()>,
    compaction_lock: &'a Mutex<()>,
    write_limiter: &'a crate::concurrency::Semaphore,
    write_timeout: std::time::Duration,
    numeric_compactor: Option<&'a Compactor>,
    blob_compactor: Option<&'a Compactor>,
    pending_persisted_segment_diff: &'a Mutex<PendingPersistedSegmentDiff>,
    persisted_index_dirty: &'a AtomicBool,
    observability: &'a StorageObservabilityCounters,
}

impl<'a> LifecycleShutdownContext<'a> {
    fn background_maintenance_gate(self) -> MutexGuard<'a, ()> {
        self.background_maintenance_lock.lock()
    }

    fn compaction_gate(self) -> MutexGuard<'a, ()> {
        self.compaction_lock.lock()
    }

    fn acquire_close_write_permits(self) -> Result<Vec<crate::concurrency::SemaphoreGuard<'a>>> {
        self.write_limiter.acquire_all(self.write_timeout)
    }

    fn compact_until_settled(self, max_passes: usize) -> Result<usize> {
        let _compaction_guard = self.compaction_gate();
        let mut passes = 0usize;
        for _ in 0..max_passes.max(1) {
            let changes = ChunkStorage::compact_compactors_with_changes(
                self.numeric_compactor,
                self.blob_compactor,
                Some(self.observability),
            )?;
            if changes.is_empty() {
                break;
            }
            self.pending_persisted_segment_diff.lock().merge(changes);
            self.persisted_index_dirty.store(true, Ordering::SeqCst);
            passes = passes.saturating_add(1);
        }
        Ok(passes)
    }

    fn persisted_index_dirty(self) -> bool {
        self.persisted_index_dirty.load(Ordering::SeqCst)
    }

    fn record_deferred_dirty_refresh(self, err: &TsinkError) {
        self.observability
            .record_maintenance_error("close dirty persisted refresh", err);
    }
}

impl ChunkStorage {
    fn lifecycle_shutdown_context(&self) -> LifecycleShutdownContext<'_> {
        LifecycleShutdownContext {
            background_maintenance_lock: &self.coordination.background_maintenance_lock,
            compaction_lock: self.coordination.compaction_lock.as_ref(),
            write_limiter: &self.runtime.write_limiter,
            write_timeout: self.runtime.write_timeout,
            numeric_compactor: self.persisted.numeric_compactor.as_ref(),
            blob_compactor: self.persisted.blob_compactor.as_ref(),
            pending_persisted_segment_diff: &self.persisted.pending_persisted_segment_diff,
            persisted_index_dirty: self.persisted.persisted_index_dirty.as_ref(),
            observability: self.observability.as_ref(),
        }
    }

    pub(in super::super) fn compact_compactors_with_changes(
        numeric_compactor: Option<&Compactor>,
        blob_compactor: Option<&Compactor>,
        observability: Option<&StorageObservabilityCounters>,
    ) -> Result<PendingPersistedSegmentDiff> {
        let mut changes = PendingPersistedSegmentDiff::default();
        if let Some(compactor) = numeric_compactor {
            changes.merge(Self::run_compactor_once(compactor, observability)?);
        }
        if let Some(compactor) = blob_compactor {
            changes.merge(Self::run_compactor_once(compactor, observability)?);
        }
        Ok(changes)
    }

    fn run_compactor_once(
        compactor: &Compactor,
        observability: Option<&StorageObservabilityCounters>,
    ) -> Result<PendingPersistedSegmentDiff> {
        let started = Instant::now();
        match compactor.compact_once_with_changes() {
            Ok(outcome) => {
                let stats = outcome.stats;
                if let Some(obs) = observability {
                    obs.record_compaction_result(stats, elapsed_nanos_u64(started));
                }
                let mut changes = PendingPersistedSegmentDiff::default();
                if stats.compacted {
                    changes.record_changes(outcome.output_roots, outcome.source_roots);
                }
                Ok(changes)
            }
            Err(err) => {
                if let Some(obs) = observability {
                    obs.record_compaction_error(elapsed_nanos_u64(started));
                }
                Err(err)
            }
        }
    }

    fn close_should_defer_dirty_refresh_error(err: &TsinkError) -> bool {
        crate::engine::segment::segment_validation_error_message(err).is_some()
            || matches!(err, TsinkError::Other(message) if message.contains("during runtime refresh"))
    }

    fn execute_close_pipeline(&self) -> Result<()> {
        let shutdown = self.lifecycle_shutdown_context();
        let _background_maintenance_guard = shutdown.background_maintenance_gate();
        let mut deferred_dirty_refresh = false;
        let _write_permits = shutdown.acquire_close_write_permits()?;
        self.flush_all_active()?;
        self.persist_segment()?;
        self.sweep_expired_persisted_segments()?;
        if self.memory_budget_value() != usize::MAX {
            self.refresh_memory_usage();
        }
        shutdown.compact_until_settled(CLOSE_COMPACTION_MAX_PASSES)?;
        if shutdown.persisted_index_dirty() || self.has_known_persisted_segment_changes() {
            if let Err(err) = self.refresh_dirty_persisted_segments_claimed() {
                if Self::close_should_defer_dirty_refresh_error(&err) {
                    shutdown.record_deferred_dirty_refresh(&err);
                    tracing::warn!(
                        error = %err,
                        "Close deferred dirty persisted refresh and kept the last visible catalog"
                    );
                    deferred_dirty_refresh = true;
                } else {
                    return Err(err);
                }
            }
        }
        self.persist_tombstones_index()?;
        if deferred_dirty_refresh {
            self.checkpoint_series_registry_index_allow_invalid_catalog()?;
        } else {
            self.checkpoint_series_registry_index()?;
        }
        Ok(())
    }

    pub(in super::super) fn close_impl(&self) -> Result<()> {
        self.start_close_transition()?;
        let close_result = self.execute_close_pipeline();
        self.finish_close_transition(close_result)
    }
}