use anyhow::{anyhow, Result};
use std::path::Path;
use rype::memory::{detect_available_memory, format_bytes};
use rype::{parquet_index, IndexMetadata, ShardedInvertedIndex};
use super::metadata::load_index_metadata;
#[derive(Debug, Clone, Default)]
pub struct IndexLoadOptions {
pub use_bloom_filter: bool,
pub parallel_rg: bool,
}
#[derive(Debug)]
pub struct LoadedIndex {
pub metadata: IndexMetadata,
pub sharded: ShardedInvertedIndex,
pub read_options: Option<parquet_index::ParquetReadOptions>,
}
pub fn validate_parquet_index(path: &Path) -> Result<()> {
if !rype::is_parquet_index(path) {
return Err(anyhow!(
"Index not found or not in Parquet format: {}\n\
Create an index with: rype index create -o index.ryxdi -r refs.fasta",
path.display()
));
}
Ok(())
}
pub fn load_index_for_classification(
path: &Path,
options: &IndexLoadOptions,
) -> Result<LoadedIndex> {
validate_parquet_index(path)?;
log::info!("Detected Parquet index at {:?}", path);
let metadata = load_index_metadata(path)?;
log::info!("Metadata loaded: {} buckets", metadata.bucket_names.len());
log::info!("Loading Parquet inverted index from {:?}", path);
let sharded = ShardedInvertedIndex::open(path)?;
log::info!(
"Sharded index: {} shards, {} total minimizers",
sharded.num_shards(),
sharded.total_minimizers()
);
let read_options = if options.use_bloom_filter {
log::info!("Bloom filter row group filtering enabled");
Some(parquet_index::ParquetReadOptions::with_bloom_filter())
} else {
None
};
if options.parallel_rg {
let available = detect_available_memory();
let prefetch_budget = available.bytes / 2;
let advised = sharded.advise_prefetch(Some(prefetch_budget));
if advised > 0 {
log::info!(
"Advised kernel to prefetch {} of index data",
format_bytes(advised)
);
}
}
Ok(LoadedIndex {
metadata,
sharded,
read_options,
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn test_validate_parquet_index_nonexistent_path() {
let path = PathBuf::from("/nonexistent/path/index.ryxdi");
let result = validate_parquet_index(&path);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("not found or not in Parquet format"));
assert!(err.contains("rype index create"));
}
#[test]
fn test_validate_parquet_index_regular_file() {
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("Cargo.toml");
let result = validate_parquet_index(&path);
assert!(result.is_err());
}
#[test]
fn test_index_load_options_default() {
let options = IndexLoadOptions::default();
assert!(!options.use_bloom_filter);
assert!(!options.parallel_rg);
}
#[test]
fn test_index_load_options_with_values() {
let options = IndexLoadOptions {
use_bloom_filter: true,
parallel_rg: true,
};
assert!(options.use_bloom_filter);
assert!(options.parallel_rg);
}
#[test]
fn test_load_index_nonexistent_path() {
let path = PathBuf::from("/nonexistent/path/index.ryxdi");
let options = IndexLoadOptions::default();
let result = load_index_for_classification(&path, &options);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("not found or not in Parquet format"));
}
}