tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use tracing::warn;

use super::recovery::StartupRecoveryState;
use super::*;
use crate::engine::segment::StartupQuarantinedSegment;

pub(super) struct StartupHydrationPhase;

impl StartupHydrationPhase {
    pub(super) fn create_storage(
        chunk_points: usize,
        storage_options: ChunkStorageOptions,
        paths: &config::StoragePathLayout,
        next_segment_id: u64,
        wal: Option<FramedWal>,
    ) -> Result<Arc<ChunkStorage>> {
        Ok(Arc::new(ChunkStorage::new_with_data_path_and_options(
            chunk_points,
            wal,
            paths.numeric_lane_path.clone(),
            paths.blob_lane_path.clone(),
            next_segment_id,
            storage_options,
        )?))
    }

    pub(super) fn hydrate(
        storage: &ChunkStorage,
        builder: &StorageBuilder,
        recovered: StartupRecoveryState,
        data_path_process_lock: Option<DataPathProcessLock>,
    ) -> Result<()> {
        let StartupRecoveryState {
            registry,
            loaded_segments,
            startup_quarantined,
        } = recovered;
        let replay_highwater = loaded_segments.wal_replay_highwater;

        storage.load_tombstones_index()?;
        if let Some(data_path_process_lock) = data_path_process_lock {
            storage.install_data_path_process_lock(data_path_process_lock);
        }
        if let Some(loaded_registry) = registry.persisted_registry {
            storage.replace_registry_from_persisted_state(
                loaded_registry.registry,
                loaded_registry.delta_series_count,
            )?;
        }
        storage.apply_loaded_segment_indexes(
            loaded_segments,
            registry.reconcile_registry_with_persisted,
        )?;
        storage.replay_from_wal(replay_highwater, builder.wal_replay_mode())?;
        storage.load_rollup_runtime_state()?;
        storage.refresh_segment_catalog_and_observability_from_persisted_state(&[])?;
        if storage.persisted.tiered_storage.is_some() {
            storage.mark_remote_catalog_refresh_success();
        }
        record_startup_segment_quarantines(storage, &startup_quarantined);

        Ok(())
    }
}

fn record_startup_segment_quarantines(
    storage: &ChunkStorage,
    startup_quarantined: &[StartupQuarantinedSegment],
) {
    for quarantined in startup_quarantined {
        warn!(
            segment = %quarantined.original_root.display(),
            quarantine = %quarantined.quarantined.path.display(),
            details = %quarantined.details,
            "Startup quarantined persisted segment"
        );
        storage.observability.record_maintenance_error(
            "startup persisted segment recovery",
            &TsinkError::DataCorruption(format!(
                "quarantined invalid persisted segment {} at {}: {}",
                quarantined.original_root.display(),
                quarantined.quarantined.path.display(),
                quarantined.details
            )),
        );
    }
}