tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::*;
use crate::engine::fs_utils::{
    copy_dir_contents, path_exists_no_follow, remove_path_if_exists_and_sync_parent,
    rename_and_sync_parents, stage_dir_path, sync_dir,
};
use crate::engine::segment::{LoadedSegmentIndexes, WalHighWatermark};

#[path = "bootstrap/discovery.rs"]
mod discovery;
#[path = "bootstrap/finalize.rs"]
mod finalize;
#[path = "bootstrap/hydrate.rs"]
mod hydrate;
#[path = "bootstrap/planning.rs"]
mod planning;
#[path = "bootstrap/recovery.rs"]
mod recovery;
#[path = "bootstrap/wal_open.rs"]
mod wal_open;

#[cfg(test)]
#[path = "bootstrap/tests.rs"]
mod tests;

use discovery::StartupDiscoveryPhase;
use finalize::StartupFinalizePhase;
use hydrate::StartupHydrationPhase;
use planning::StartupPlanningPhase;
use recovery::StartupRecoveryPhase;
use wal_open::StartupWalOpenPhase;

pub(super) fn build_storage(builder: StorageBuilder) -> Result<Arc<dyn Storage>> {
    let plan = StartupPlanningPhase::prepare(&builder)?;
    let discovered = StartupDiscoveryPhase::discover(&builder, &plan)?;
    let recovered = StartupRecoveryPhase::recover(&plan, discovered)?;
    let finalize_state = recovered.finalize_state();
    let replay_highwater = recovered.loaded_segments().wal_replay_highwater;
    let next_segment_id = recovered.loaded_segments().next_segment_id;

    let wal = StartupWalOpenPhase::open(&builder, &plan, replay_highwater)?;
    let storage_options = plan.storage_options().clone();
    let paths = plan.paths().clone();
    let runtime_inputs = plan.into_runtime_inputs();

    let storage = StartupHydrationPhase::create_storage(
        builder.chunk_points(),
        storage_options,
        &paths,
        next_segment_id,
        wal,
    )?;
    StartupHydrationPhase::hydrate(
        storage.as_ref(),
        &builder,
        recovered,
        runtime_inputs.data_path_process_lock,
    )?;
    StartupFinalizePhase::run(
        &storage,
        finalize_state,
        runtime_inputs.background_threads_enabled,
        runtime_inputs.background_fail_fast,
    )?;

    Ok(storage as Arc<dyn Storage>)
}

pub(super) fn restore_storage_from_snapshot(snapshot_path: &Path, data_path: &Path) -> Result<()> {
    if snapshot_path == data_path {
        return Err(TsinkError::InvalidConfiguration(
            "snapshot and restore paths must differ".to_string(),
        ));
    }

    let snapshot_meta =
        std::fs::symlink_metadata(snapshot_path).map_err(|err| TsinkError::IoWithPath {
            path: snapshot_path.to_path_buf(),
            source: err,
        })?;
    if !snapshot_meta.is_dir() {
        return Err(TsinkError::InvalidConfiguration(format!(
            "snapshot path is not a directory: {}",
            snapshot_path.display()
        )));
    }

    let Some(parent) = data_path.parent() else {
        return Err(TsinkError::InvalidConfiguration(format!(
            "restore target has no parent directory: {}",
            data_path.display()
        )));
    };
    std::fs::create_dir_all(parent)?;

    let staging = stage_dir_path(data_path, "restore-staging")?;
    std::fs::create_dir_all(&staging)?;
    if let Err(err) = copy_dir_contents(snapshot_path, &staging) {
        let _ = remove_path_if_exists_and_sync_parent(&staging);
        return Err(err);
    }

    sync_dir(&staging)?;

    let backup = if path_exists_no_follow(data_path)? {
        Some(stage_dir_path(data_path, "restore-backup")?)
    } else {
        None
    };

    if let Some(backup_path) = backup.as_ref() {
        if let Err(err) = rename_and_sync_parents(data_path, backup_path) {
            let _ = remove_path_if_exists_and_sync_parent(&staging);
            return Err(err);
        }
    }

    if let Err(activate_err) = rename_and_sync_parents(&staging, data_path) {
        let mut rollback_err = None;
        if let Some(backup_path) = backup.as_ref() {
            if let Err(err) = rename_and_sync_parents(backup_path, data_path) {
                rollback_err = Some(err);
            }
        }
        let _ = remove_path_if_exists_and_sync_parent(&staging);

        if let Some(rollback_err) = rollback_err {
            return Err(TsinkError::Other(format!(
                "restore activation failed: {activate_err}; rollback failed: {rollback_err}"
            )));
        }
        return Err(activate_err);
    }

    if let Some(backup_path) = backup {
        if let Err(cleanup_err) = remove_path_if_exists_and_sync_parent(&backup_path) {
            return Err(TsinkError::Other(format!(
                "restore succeeded but failed to remove backup {}: {cleanup_err}",
                backup_path.display()
            )));
        }
    }

    Ok(())
}

pub(super) fn merge_loaded_segment_indexes(
    mut numeric: LoadedSegmentIndexes,
    mut blob: LoadedSegmentIndexes,
    numeric_lane_enabled: bool,
    blob_lane_enabled: bool,
) -> Result<LoadedSegmentIndexes> {
    let mut series_by_id = BTreeMap::new();
    for series in numeric.series.drain(..) {
        series_by_id.insert(series.series_id, series);
    }

    for series in blob.series.drain(..) {
        match series_by_id.get(&series.series_id) {
            Some(existing)
                if existing.metric == series.metric && existing.labels == series.labels => {}
            Some(_) => {
                return Err(TsinkError::DataCorruption(format!(
                    "series id {} conflicts across lane segment families",
                    series.series_id
                )));
            }
            None => {
                series_by_id.insert(series.series_id, series);
            }
        }
    }

    let numeric_has_segments = !numeric.indexed_segments.is_empty();
    let blob_has_segments = !blob.indexed_segments.is_empty();

    let mut indexed_segments = numeric.indexed_segments;
    indexed_segments.append(&mut blob.indexed_segments);
    indexed_segments.sort_by_key(|segment| (segment.manifest.level, segment.manifest.segment_id));

    let replay_highwater = match (numeric_lane_enabled, blob_lane_enabled) {
        (true, true) => match (numeric_has_segments, blob_has_segments) {
            (true, true) => numeric.wal_replay_highwater.min(blob.wal_replay_highwater),
            _ => WalHighWatermark::default(),
        },
        (true, false) => numeric.wal_replay_highwater,
        (false, true) => blob.wal_replay_highwater,
        (false, false) => WalHighWatermark::default(),
    };

    Ok(LoadedSegmentIndexes {
        next_segment_id: numeric.next_segment_id.max(blob.next_segment_id).max(1),
        series: series_by_id.into_values().collect(),
        indexed_segments,
        wal_replay_highwater: replay_highwater,
    })
}