use crate::error::{Result, RypeError};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use super::inverted::InvertedIndex;
use crate::types::IndexMetadata;
#[derive(Debug, Clone)]
pub struct ShardInfo {
pub shard_id: u32,
pub min_start: u64,
pub min_end: u64,
pub is_last_shard: bool,
pub num_minimizers: usize,
pub num_bucket_ids: usize,
}
#[derive(Debug, Clone)]
pub struct ShardManifest {
pub k: usize,
pub w: usize,
pub salt: u64,
pub source_hash: u64,
pub total_minimizers: usize,
pub total_bucket_ids: usize,
pub has_overlapping_shards: bool,
pub shards: Vec<ShardInfo>,
pub bucket_names: HashMap<u32, String>,
pub bucket_sources: HashMap<u32, Vec<String>>,
pub bucket_minimizer_counts: HashMap<u32, usize>,
pub bucket_file_stats: Option<HashMap<u32, crate::types::BucketFileStats>>,
}
impl ShardManifest {
pub fn shard_path_parquet(base: &Path, shard_id: u32) -> PathBuf {
base.join("inverted")
.join(format!("shard.{}.parquet", shard_id))
}
pub fn to_metadata(&self) -> IndexMetadata {
let largest_shard_entries = self
.shards
.iter()
.map(|s| s.num_bucket_ids as u64)
.max()
.unwrap_or(0);
IndexMetadata {
k: self.k,
w: self.w,
salt: self.salt,
bucket_names: self.bucket_names.clone(),
bucket_sources: self.bucket_sources.clone(),
bucket_minimizer_counts: self.bucket_minimizer_counts.clone(),
largest_shard_entries,
bucket_file_stats: self.bucket_file_stats.clone(),
}
}
pub fn has_bucket_metadata(&self) -> bool {
!self.bucket_names.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct ShardedInvertedIndex {
manifest: ShardManifest,
base_path: PathBuf,
rg_ranges_cache: Vec<Vec<super::inverted::RowGroupRangeInfo>>,
}
impl ShardedInvertedIndex {
pub fn open(base_path: &Path) -> Result<Self> {
use super::parquet::ParquetManifest;
if base_path.extension().is_some_and(|ext| ext == "ryidx") {
return Err(RypeError::format(
base_path,
"This appears to be an old .ryidx file. Rype now only supports \
Parquet indices (.ryxdi directories). Please rebuild your index \
using: rype index create -o output.ryxdi -r your_refs.fasta",
));
}
let parquet_manifest = ParquetManifest::load(base_path)
.map_err(|e| RypeError::format(base_path, e.to_string()))?;
let (bucket_names, bucket_sources, bucket_file_stats) =
super::parquet::read_buckets_parquet(base_path)
.map_err(|e| RypeError::format(base_path, e.to_string()))?;
let inverted = parquet_manifest
.inverted
.as_ref()
.ok_or_else(|| RypeError::format(base_path, "missing inverted section in manifest"))?;
let shards: Vec<ShardInfo> = inverted
.shards
.iter()
.map(|s| ShardInfo {
shard_id: s.shard_id,
min_start: s.min_minimizer,
min_end: s.max_minimizer,
is_last_shard: s.shard_id == inverted.num_shards.saturating_sub(1),
num_minimizers: s.num_entries as usize,
num_bucket_ids: s.num_entries as usize, })
.collect();
let manifest = ShardManifest {
k: parquet_manifest.k,
w: parquet_manifest.w,
salt: parquet_manifest.salt,
source_hash: parquet_manifest.source_hash,
total_minimizers: inverted.total_entries as usize,
total_bucket_ids: inverted.total_entries as usize,
has_overlapping_shards: inverted.has_overlapping_shards,
shards,
bucket_names,
bucket_sources,
bucket_minimizer_counts: HashMap::new(), bucket_file_stats,
};
let rg_ranges_cache = Self::load_rg_ranges_for_shards(base_path, &manifest.shards)?;
Ok(ShardedInvertedIndex {
manifest,
base_path: base_path.to_path_buf(),
rg_ranges_cache,
})
}
fn load_rg_ranges_for_shards(
base_path: &Path,
shards: &[ShardInfo],
) -> Result<Vec<Vec<super::inverted::RowGroupRangeInfo>>> {
use super::inverted::get_row_group_ranges;
let mut cache = Vec::with_capacity(shards.len());
for shard_info in shards {
let shard_path = ShardManifest::shard_path_parquet(base_path, shard_info.shard_id);
let ranges = get_row_group_ranges(&shard_path)?;
cache.push(ranges);
}
Ok(cache)
}
pub fn k(&self) -> usize {
self.manifest.k
}
pub fn w(&self) -> usize {
self.manifest.w
}
pub fn salt(&self) -> u64 {
self.manifest.salt
}
pub fn source_hash(&self) -> u64 {
self.manifest.source_hash
}
pub fn num_shards(&self) -> usize {
self.manifest.shards.len()
}
pub fn total_minimizers(&self) -> usize {
self.manifest.total_minimizers
}
pub fn total_bucket_ids(&self) -> usize {
self.manifest.total_bucket_ids
}
pub fn manifest(&self) -> &ShardManifest {
&self.manifest
}
pub fn base_path(&self) -> &Path {
&self.base_path
}
pub fn shard_path(&self, shard_id: u32) -> PathBuf {
ShardManifest::shard_path_parquet(&self.base_path, shard_id)
}
pub fn rg_ranges(&self, shard_pos: usize) -> Option<&[super::inverted::RowGroupRangeInfo]> {
self.rg_ranges_cache.get(shard_pos).map(|v| v.as_slice())
}
pub fn has_rg_cache(&self) -> bool {
!self.rg_ranges_cache.is_empty()
}
pub fn total_uncompressed_size(&self) -> usize {
self.rg_ranges_cache
.iter()
.flat_map(|rgs| rgs.iter())
.map(|info| info.uncompressed_size)
.sum()
}
#[cfg(unix)]
pub fn advise_prefetch(&self, max_bytes: Option<usize>) -> usize {
use std::os::unix::io::AsRawFd;
if self.rg_ranges_cache.is_empty() {
return 0;
}
let budget = max_bytes.unwrap_or(usize::MAX);
let mut total_advised = 0usize;
for shard_info in &self.manifest.shards {
let shard_path =
ShardManifest::shard_path_parquet(&self.base_path, shard_info.shard_id);
let file_size = match std::fs::metadata(&shard_path) {
Ok(meta) => meta.len() as usize,
Err(_) => continue,
};
if total_advised + file_size > budget {
log::debug!(
"Prefetch budget reached at {} bytes, skipping remaining shards",
total_advised
);
break;
}
let file = match std::fs::File::open(&shard_path) {
Ok(f) => f,
Err(_) => continue,
};
unsafe {
let ptr = libc::mmap(
std::ptr::null_mut(),
file_size,
libc::PROT_READ,
libc::MAP_PRIVATE,
file.as_raw_fd(),
0,
);
if ptr != libc::MAP_FAILED {
libc::madvise(ptr, file_size, libc::MADV_WILLNEED);
libc::munmap(ptr, file_size);
total_advised += file_size;
}
}
}
if total_advised > 0 {
log::debug!(
"Advised kernel to prefetch {} bytes across {} shards",
total_advised,
self.manifest.shards.len()
);
}
total_advised
}
#[cfg(not(unix))]
pub fn advise_prefetch(&self, _max_bytes: Option<usize>) -> usize {
0
}
pub fn load_shard(&self, shard_id: u32) -> Result<InvertedIndex> {
let path = self.shard_path(shard_id);
InvertedIndex::load_shard_parquet_with_params(
&path,
self.manifest.k,
self.manifest.w,
self.manifest.salt,
self.manifest.source_hash,
)
.map_err(|e| RypeError::format(&path, e.to_string()))
}
pub fn load_shard_for_query(
&self,
shard_id: u32,
query_minimizers: &[u64],
options: Option<&super::parquet::ParquetReadOptions>,
) -> Result<InvertedIndex> {
let path = self.shard_path(shard_id);
InvertedIndex::load_shard_parquet_for_query(
&path,
self.manifest.k,
self.manifest.w,
self.manifest.salt,
self.manifest.source_hash,
query_minimizers,
options,
)
.map_err(|e| RypeError::format(&path, e.to_string()))
}
pub fn load_shard_coo_for_query(
&self,
shard_id: u32,
query_minimizers: &[u64],
options: Option<&super::parquet::ParquetReadOptions>,
) -> Result<Vec<(u64, u32)>> {
let path = self.shard_path(shard_id);
InvertedIndex::load_shard_coo_for_query(&path, self.manifest.k, query_minimizers, options)
.map_err(|e| RypeError::format(&path, e.to_string()))
}
pub fn is_parquet(&self) -> bool {
true
}
pub fn validate_against_metadata(&self, metadata: &IndexMetadata) -> Result<()> {
if self.manifest.k != metadata.k {
return Err(RypeError::validation(format!(
"K mismatch: sharded index has K={}, metadata has K={}",
self.manifest.k, metadata.k
)));
}
if self.manifest.w != metadata.w {
return Err(RypeError::validation(format!(
"W mismatch: sharded index has W={}, metadata has W={}",
self.manifest.w, metadata.w
)));
}
if self.manifest.salt != metadata.salt {
return Err(RypeError::validation(format!(
"Salt mismatch: sharded index has salt={:#x}, metadata has salt={:#x}",
self.manifest.salt, metadata.salt
)));
}
let expected_hash = InvertedIndex::compute_metadata_hash(metadata);
if self.manifest.source_hash != expected_hash {
return Err(RypeError::validation(
"Source hash mismatch: sharded index is stale or was built from different source",
));
}
Ok(())
}
}