tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use std::path::{Path, PathBuf};

use tracing::warn;

use crate::engine::fs_utils::{path_exists_no_follow, rename_and_sync_parents, stage_dir_path};
use crate::{Result, TsinkError};

pub(crate) const STARTUP_SEGMENT_QUARANTINE_PURPOSE: &str = "startup-segment-quarantine";

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum SegmentLoadPolicy {
    SkipInvalid,
    FailOnInvalid(SegmentValidationContext),
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum SegmentValidationContext {
    Startup,
    RuntimeRefresh,
    Compaction,
    Maintenance,
}

#[derive(Debug, Clone)]
pub(crate) struct QuarantinedSegmentRoot {
    pub(crate) path: PathBuf,
    #[cfg_attr(not(test), allow(dead_code))]
    pub(crate) sync_failed: bool,
}

#[derive(Debug, Clone)]
pub(crate) struct StartupQuarantinedSegment {
    pub(crate) original_root: PathBuf,
    pub(crate) quarantined: QuarantinedSegmentRoot,
    pub(crate) details: String,
}

pub(crate) fn segment_validation_error_message(err: &TsinkError) -> Option<String> {
    match err {
        TsinkError::DataCorruption(msg) | TsinkError::Compression(msg) => Some(msg.clone()),
        _ => None,
    }
}

pub(crate) fn segment_validation_error(
    segment_root: &Path,
    context: SegmentValidationContext,
    details: &str,
) -> TsinkError {
    let context_label = match context {
        SegmentValidationContext::Startup => "startup",
        SegmentValidationContext::RuntimeRefresh => "runtime refresh",
        SegmentValidationContext::Compaction => "compaction",
        SegmentValidationContext::Maintenance => "maintenance",
    };
    TsinkError::DataCorruption(format!(
        "persisted segment {} failed {context_label} validation: {}",
        segment_root.display(),
        details
    ))
}

pub(crate) fn runtime_refresh_disappearance_error(
    segment_root: &Path,
    err: &TsinkError,
) -> TsinkError {
    TsinkError::Other(format!(
        "persisted segment {} disappeared during runtime refresh (transient disappearance race): {}",
        segment_root.display(),
        err
    ))
}

pub(crate) fn quarantine_segment_root(
    segment_root: &Path,
    purpose: &str,
) -> Result<QuarantinedSegmentRoot> {
    let quarantine_root = stage_dir_path(segment_root, purpose)?;
    match rename_and_sync_parents(segment_root, &quarantine_root) {
        Ok(()) => Ok(QuarantinedSegmentRoot {
            path: quarantine_root,
            sync_failed: false,
        }),
        Err(err) => {
            let quarantine_exists = path_exists_no_follow(&quarantine_root)?;
            let source_exists = path_exists_no_follow(segment_root)?;
            if quarantine_exists && !source_exists {
                Ok(QuarantinedSegmentRoot {
                    path: quarantine_root,
                    sync_failed: true,
                })
            } else {
                Err(err)
            }
        }
    }
}

pub(crate) fn quarantine_invalid_startup_segment(
    segment_root: &Path,
    details: &str,
) -> Result<StartupQuarantinedSegment> {
    let quarantined = quarantine_segment_root(segment_root, STARTUP_SEGMENT_QUARANTINE_PURPOSE)
        .map_err(|quarantine_err| {
            TsinkError::Other(format!(
                "failed to quarantine corrupt persisted segment {} during startup: {}; original validation error: {}",
                segment_root.display(),
                quarantine_err,
                details
            ))
        })?;
    warn!(
        path = %segment_root.display(),
        quarantine_path = %quarantined.path.display(),
        sync_failed = quarantined.sync_failed,
        error = %details,
        "Startup quarantined invalid persisted segment"
    );
    Ok(StartupQuarantinedSegment {
        original_root: segment_root.to_path_buf(),
        quarantined,
        details: details.to_string(),
    })
}

pub(crate) fn handle_segment_dir_load_error(
    dir: &Path,
    err: TsinkError,
    policy: SegmentLoadPolicy,
    disappeared_message: &'static str,
    invalid_message: &'static str,
) -> Result<()> {
    match err {
        err if is_not_found_error(&err) => {
            warn!(
                path = %dir.display(),
                error = %err,
                "{}",
                disappeared_message
            );
            Ok(())
        }
        err => {
            if let Some(msg) = segment_validation_error_message(&err) {
                match policy {
                    SegmentLoadPolicy::SkipInvalid => {
                        warn!(path = %dir.display(), error = %msg, "{}", invalid_message);
                        Ok(())
                    }
                    SegmentLoadPolicy::FailOnInvalid(context) => {
                        Err(segment_validation_error(dir, context, &msg))
                    }
                }
            } else {
                Err(err)
            }
        }
    }
}

pub(crate) fn is_not_found_error(err: &TsinkError) -> bool {
    matches!(err, TsinkError::Io(io_err) if io_err.kind() == std::io::ErrorKind::NotFound)
}