use std::sync::{Arc, OnceLock};
use tracing::warn;
use super::{try_read_crc_file, Crc};
use crate::path::ParsedLogPath;
use crate::{Engine, Version};
#[derive(Debug, Clone)]
pub(crate) enum CrcLoadResult {
DoesNotExist,
CorruptOrFailed,
Loaded(Arc<Crc>),
}
impl CrcLoadResult {
#[allow(dead_code)] pub(crate) fn get(&self) -> Option<&Arc<Crc>> {
match self {
CrcLoadResult::Loaded(crc) => Some(crc),
_ => None,
}
}
}
#[derive(Debug)]
pub(crate) struct LazyCrc {
crc_file: Option<ParsedLogPath>,
pub(crate) cached: OnceLock<CrcLoadResult>,
precomputed_version: Option<Version>,
}
impl LazyCrc {
pub(crate) fn new(crc_file: Option<ParsedLogPath>) -> Self {
Self {
crc_file,
cached: OnceLock::new(),
precomputed_version: None,
}
}
pub(crate) fn new_precomputed(crc: Crc, version: Version) -> Self {
let cached = OnceLock::new();
let _ = cached.set(CrcLoadResult::Loaded(Arc::new(crc)));
Self {
crc_file: None,
cached,
precomputed_version: Some(version),
}
}
pub(crate) fn get_or_load(&self, engine: &dyn Engine) -> &CrcLoadResult {
self.cached.get_or_init(|| match &self.crc_file {
None => CrcLoadResult::DoesNotExist,
Some(crc_path) => match try_read_crc_file(engine, crc_path) {
Ok(crc) => CrcLoadResult::Loaded(Arc::new(crc)),
Err(e) => {
warn!(
"Failed to read CRC file {:?}: {}.",
crc_path.location.location, e
);
CrcLoadResult::CorruptOrFailed
}
},
})
}
pub(crate) fn get_or_load_if_at_version(
&self,
engine: &dyn Engine,
version: Version,
) -> Option<&Arc<Crc>> {
if self.crc_version() != Some(version) {
return None;
}
self.get_or_load(engine).get()
}
pub(crate) fn get_if_loaded_at_version(&self, version: Version) -> Option<&Arc<Crc>> {
if self.crc_version() != Some(version) {
return None;
}
self.cached.get()?.get()
}
#[allow(dead_code)] pub(crate) fn is_loaded(&self) -> bool {
self.cached.get().is_some()
}
pub(crate) fn crc_version(&self) -> Option<Version> {
self.precomputed_version
.or_else(|| self.crc_file.as_ref().map(|f| f.version))
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use rstest::rstest;
use super::*;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::{DefaultEngine, DefaultEngineBuilder};
use crate::object_store::memory::InMemory;
fn table_root() -> url::Url {
url::Url::parse("memory:///").unwrap()
}
fn test_engine() -> DefaultEngine<TokioBackgroundExecutor> {
DefaultEngineBuilder::new(Arc::new(InMemory::new())).build()
}
#[test]
fn test_crc_load_result_loaded() {
let crc = Crc {
table_size_bytes: 100,
num_files: 10,
num_metadata: 1,
num_protocol: 1,
..Default::default()
};
let loaded = CrcLoadResult::Loaded(Arc::new(crc));
assert!(loaded.get().is_some());
assert_eq!(loaded.get().unwrap().table_size_bytes, 100);
}
#[rstest]
#[case::does_not_exist(CrcLoadResult::DoesNotExist)]
#[case::corrupt(CrcLoadResult::CorruptOrFailed)]
fn test_crc_load_result(#[case] result: CrcLoadResult) {
assert!(result.get().is_none());
}
#[test]
fn test_lazy_crc_no_file() {
let engine = test_engine();
let lazy = LazyCrc::new(None);
assert!(!lazy.is_loaded());
assert_eq!(lazy.crc_version(), None);
let result = lazy.get_or_load(&engine);
assert!(matches!(result, CrcLoadResult::DoesNotExist));
assert!(result.get().is_none());
assert!(lazy.is_loaded());
}
#[test]
fn test_lazy_crc_missing_file() {
let engine = test_engine();
let lazy = LazyCrc::new(Some(ParsedLogPath::create_parsed_crc(&table_root(), 5)));
assert!(!lazy.is_loaded());
assert_eq!(lazy.crc_version(), Some(5));
let result = lazy.get_or_load(&engine);
assert!(matches!(result, CrcLoadResult::CorruptOrFailed));
assert!(result.get().is_none());
assert!(lazy.is_loaded());
}
fn test_table_root(dir: &str) -> url::Url {
let path = std::fs::canonicalize(PathBuf::from(dir)).unwrap();
url::Url::from_directory_path(path).unwrap()
}
#[test]
fn test_lazy_crc_loads_real_file() {
let engine = crate::engine::sync::SyncEngine::new();
let table_root = test_table_root("./tests/data/crc-full/");
let lazy = LazyCrc::new(Some(ParsedLogPath::create_parsed_crc(&table_root, 0)));
assert!(!lazy.is_loaded());
assert_eq!(lazy.crc_version(), Some(0));
let result = lazy.get_or_load(&engine);
assert!(lazy.is_loaded());
let crc = result.get().unwrap();
assert_eq!(crc.table_size_bytes, 5259);
}
#[test]
fn test_lazy_crc_malformed_file() {
let engine = crate::engine::sync::SyncEngine::new();
let table_root = test_table_root("./tests/data/crc-malformed/");
let lazy = LazyCrc::new(Some(ParsedLogPath::create_parsed_crc(&table_root, 0)));
assert!(!lazy.is_loaded());
assert_eq!(lazy.crc_version(), Some(0));
let result = lazy.get_or_load(&engine);
assert!(matches!(result, CrcLoadResult::CorruptOrFailed));
assert!(result.get().is_none());
assert!(lazy.is_loaded());
}
fn test_crc(table_size_bytes: i64) -> Crc {
Crc {
table_size_bytes,
num_files: 1,
num_metadata: 1,
num_protocol: 1,
..Default::default()
}
}
#[test]
fn test_lazy_crc_precomputed() {
let crc = test_crc(42);
let lazy = LazyCrc::new_precomputed(crc, 5);
assert!(lazy.is_loaded());
assert_eq!(lazy.crc_version(), Some(5));
let loaded = lazy.get_if_loaded_at_version(5);
assert!(loaded.is_some());
assert_eq!(loaded.unwrap().table_size_bytes, 42);
assert!(lazy.get_if_loaded_at_version(4).is_none());
assert!(lazy.get_if_loaded_at_version(6).is_none());
}
#[test]
fn test_lazy_crc_precomputed_version_takes_priority() {
let crc = test_crc(100);
let lazy = LazyCrc::new_precomputed(crc, 3);
assert_eq!(lazy.crc_version(), Some(3));
}
#[test]
fn test_get_if_loaded_at_version_not_loaded() {
let lazy = LazyCrc::new(Some(ParsedLogPath::create_parsed_crc(&table_root(), 5)));
assert!(!lazy.is_loaded());
assert!(lazy.get_if_loaded_at_version(5).is_none());
}
#[test]
fn test_get_if_loaded_at_version_wrong_version() {
let crc = test_crc(100);
let lazy = LazyCrc::new_precomputed(crc, 5);
assert!(lazy.get_if_loaded_at_version(3).is_none());
}
#[test]
fn test_get_if_loaded_at_version_no_crc() {
let lazy = LazyCrc::new(None);
assert!(lazy.get_if_loaded_at_version(0).is_none());
}
}