buoyant_kernel 0.21.102

Buoyant Data distribution of delta-kernel
Documentation
//! Lazy CRC loading support.
//!
//! Provides thread-safe lazy loading of CRC files, ensuring they are read at most once and the
//! result is shared across all consumers.

use std::sync::{Arc, OnceLock};

use tracing::warn;

use super::{try_read_crc_file, Crc};
use crate::path::ParsedLogPath;
use crate::{Engine, Version};

/// Result of attempting to load a CRC file.
///
/// The "not yet loaded" state is represented by `OnceLock::get()` returning `None`, not as an enum
/// variant.
#[derive(Debug, Clone)]
pub(crate) enum CrcLoadResult {
    /// No CRC file exists for this log segment.
    DoesNotExist,
    /// CRC file exists but failed to read/parse (corrupted or I/O error).
    CorruptOrFailed,
    /// CRC file was successfully loaded.
    Loaded(Arc<Crc>),
}

impl CrcLoadResult {
    /// Returns the CRC if successfully loaded.
    #[allow(dead_code)] // Used in future phases (domain metadata, ICT)
    pub(crate) fn get(&self) -> Option<&Arc<Crc>> {
        match self {
            CrcLoadResult::Loaded(crc) => Some(crc),
            _ => None,
        }
    }
}

/// Lazy loader for CRC info that ensures it's only read once.
///
/// Uses `OnceLock` to ensure thread-safe initialization that happens at most once.
/// Can also hold a precomputed CRC (e.g. from post-commit CRC merge) without a backing file.
#[derive(Debug)]
pub(crate) struct LazyCrc {
    /// The CRC file path, if one exists in the log segment.
    crc_file: Option<ParsedLogPath>,
    /// Cached load result (loaded lazily, at most once).
    pub(crate) cached: OnceLock<CrcLoadResult>,
    /// Version of a precomputed CRC (set when CRC was computed rather than read from file).
    /// When set, this takes priority over `crc_file` for version checks.
    precomputed_version: Option<Version>,
}

impl LazyCrc {
    /// Create a new lazy CRC loader.
    ///
    /// If `crc_file` is `None`, the loader will immediately return `DoesNotExist` when accessed.
    pub(crate) fn new(crc_file: Option<ParsedLogPath>) -> Self {
        Self {
            crc_file,
            cached: OnceLock::new(),
            precomputed_version: None,
        }
    }

    /// Create a `LazyCrc` with a precomputed CRC value (no backing file).
    ///
    /// The CRC is immediately available via `get_or_load` without any I/O. The `version`
    /// parameter records which table version this CRC corresponds to, enabling
    /// `get_if_loaded_at_version` to work for chained commits.
    pub(crate) fn new_precomputed(crc: Crc, version: Version) -> Self {
        let cached = OnceLock::new();
        // OnceLock::set cannot fail here because we just created it
        let _ = cached.set(CrcLoadResult::Loaded(Arc::new(crc)));
        Self {
            crc_file: None,
            cached,
            precomputed_version: Some(version),
        }
    }

    /// Returns the CRC load result, loading if necessary.
    ///
    /// The loading closure is only called once, even across threads. Subsequent calls return the
    /// cached result.
    pub(crate) fn get_or_load(&self, engine: &dyn Engine) -> &CrcLoadResult {
        self.cached.get_or_init(|| match &self.crc_file {
            None => CrcLoadResult::DoesNotExist,
            Some(crc_path) => match try_read_crc_file(engine, crc_path) {
                Ok(crc) => CrcLoadResult::Loaded(Arc::new(crc)),
                Err(e) => {
                    warn!(
                        "Failed to read CRC file {:?}: {}.",
                        crc_path.location.location, e
                    );
                    CrcLoadResult::CorruptOrFailed
                }
            },
        })
    }

    /// Returns the CRC only if the CRC file is at the given version, loading if necessary.
    pub(crate) fn get_or_load_if_at_version(
        &self,
        engine: &dyn Engine,
        version: Version,
    ) -> Option<&Arc<Crc>> {
        if self.crc_version() != Some(version) {
            return None;
        }
        self.get_or_load(engine).get()
    }

    /// Returns the CRC only if it is already loaded (no I/O) and matches the given version.
    ///
    /// This is purely opportunistic: it returns `Some` only when the CRC was previously loaded
    /// (via `get_or_load`) or precomputed (via `new_precomputed`) AND the version matches.
    pub(crate) fn get_if_loaded_at_version(&self, version: Version) -> Option<&Arc<Crc>> {
        if self.crc_version() != Some(version) {
            return None;
        }
        self.cached.get()?.get()
    }

    /// Check if CRC has been loaded (without triggering loading).
    #[allow(dead_code)] // Used in future phases (domain metadata, ICT)
    pub(crate) fn is_loaded(&self) -> bool {
        self.cached.get().is_some()
    }

    /// Returns the CRC version, checking precomputed version first, then CRC file version.
    ///
    /// This enables chaining: a post-commit snapshot with a precomputed CRC at version N+1
    /// can serve as the read snapshot for a transaction targeting version N+2.
    pub(crate) fn crc_version(&self) -> Option<Version> {
        self.precomputed_version
            .or_else(|| self.crc_file.as_ref().map(|f| f.version))
    }
}

#[cfg(test)]
mod tests {
    use std::path::PathBuf;

    use rstest::rstest;

    use super::*;
    use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
    use crate::engine::default::{DefaultEngine, DefaultEngineBuilder};
    use crate::object_store::memory::InMemory;

    fn table_root() -> url::Url {
        url::Url::parse("memory:///").unwrap()
    }

    fn test_engine() -> DefaultEngine<TokioBackgroundExecutor> {
        DefaultEngineBuilder::new(Arc::new(InMemory::new())).build()
    }

    // ===== CrcLoadResult Tests =====

    #[test]
    fn test_crc_load_result_loaded() {
        let crc = Crc {
            table_size_bytes: 100,
            num_files: 10,
            num_metadata: 1,
            num_protocol: 1,
            ..Default::default()
        };
        let loaded = CrcLoadResult::Loaded(Arc::new(crc));
        assert!(loaded.get().is_some());
        assert_eq!(loaded.get().unwrap().table_size_bytes, 100);
    }

    #[rstest]
    #[case::does_not_exist(CrcLoadResult::DoesNotExist)]
    #[case::corrupt(CrcLoadResult::CorruptOrFailed)]
    fn test_crc_load_result(#[case] result: CrcLoadResult) {
        assert!(result.get().is_none());
    }

    // ===== LazyCrc Tests =====

    #[test]
    fn test_lazy_crc_no_file() {
        let engine = test_engine();

        let lazy = LazyCrc::new(None);
        assert!(!lazy.is_loaded());
        assert_eq!(lazy.crc_version(), None);

        let result = lazy.get_or_load(&engine);
        assert!(matches!(result, CrcLoadResult::DoesNotExist));
        assert!(result.get().is_none());
        assert!(lazy.is_loaded());
    }

    #[test]
    fn test_lazy_crc_missing_file() {
        let engine = test_engine();

        let lazy = LazyCrc::new(Some(ParsedLogPath::create_parsed_crc(&table_root(), 5)));
        assert!(!lazy.is_loaded());
        assert_eq!(lazy.crc_version(), Some(5));

        let result = lazy.get_or_load(&engine);
        assert!(matches!(result, CrcLoadResult::CorruptOrFailed));
        assert!(result.get().is_none());
        assert!(lazy.is_loaded());
    }

    fn test_table_root(dir: &str) -> url::Url {
        let path = std::fs::canonicalize(PathBuf::from(dir)).unwrap();
        url::Url::from_directory_path(path).unwrap()
    }

    #[test]
    fn test_lazy_crc_loads_real_file() {
        let engine = crate::engine::sync::SyncEngine::new();
        let table_root = test_table_root("./tests/data/crc-full/");

        let lazy = LazyCrc::new(Some(ParsedLogPath::create_parsed_crc(&table_root, 0)));
        assert!(!lazy.is_loaded());
        assert_eq!(lazy.crc_version(), Some(0));

        let result = lazy.get_or_load(&engine);
        assert!(lazy.is_loaded());

        let crc = result.get().unwrap();
        assert_eq!(crc.table_size_bytes, 5259);
    }

    #[test]
    fn test_lazy_crc_malformed_file() {
        let engine = crate::engine::sync::SyncEngine::new();
        let table_root = test_table_root("./tests/data/crc-malformed/");

        let lazy = LazyCrc::new(Some(ParsedLogPath::create_parsed_crc(&table_root, 0)));
        assert!(!lazy.is_loaded());
        assert_eq!(lazy.crc_version(), Some(0));

        let result = lazy.get_or_load(&engine);
        assert!(matches!(result, CrcLoadResult::CorruptOrFailed));
        assert!(result.get().is_none());
        assert!(lazy.is_loaded());
    }

    // ===== Precomputed LazyCrc Tests =====

    fn test_crc(table_size_bytes: i64) -> Crc {
        Crc {
            table_size_bytes,
            num_files: 1,
            num_metadata: 1,
            num_protocol: 1,
            ..Default::default()
        }
    }

    #[test]
    fn test_lazy_crc_precomputed() {
        let crc = test_crc(42);
        let lazy = LazyCrc::new_precomputed(crc, 5);

        assert!(lazy.is_loaded());
        assert_eq!(lazy.crc_version(), Some(5));

        // get_if_loaded_at_version should return the CRC at the correct version
        let loaded = lazy.get_if_loaded_at_version(5);
        assert!(loaded.is_some());
        assert_eq!(loaded.unwrap().table_size_bytes, 42);

        // Wrong version should return None
        assert!(lazy.get_if_loaded_at_version(4).is_none());
        assert!(lazy.get_if_loaded_at_version(6).is_none());
    }

    #[test]
    fn test_lazy_crc_precomputed_version_takes_priority() {
        let crc = test_crc(100);
        let lazy = LazyCrc::new_precomputed(crc, 3);
        assert_eq!(lazy.crc_version(), Some(3));
    }

    #[test]
    fn test_get_if_loaded_at_version_not_loaded() {
        // CRC file exists but not yet loaded -> should return None (no I/O)
        let lazy = LazyCrc::new(Some(ParsedLogPath::create_parsed_crc(&table_root(), 5)));
        assert!(!lazy.is_loaded());
        assert!(lazy.get_if_loaded_at_version(5).is_none());
    }

    #[test]
    fn test_get_if_loaded_at_version_wrong_version() {
        let crc = test_crc(100);
        let lazy = LazyCrc::new_precomputed(crc, 5);
        assert!(lazy.get_if_loaded_at_version(3).is_none());
    }

    #[test]
    fn test_get_if_loaded_at_version_no_crc() {
        let lazy = LazyCrc::new(None);
        assert!(lazy.get_if_loaded_at_version(0).is_none());
    }
}