use std::sync::Arc;
use byteorder::{LittleEndian, ReadBytesExt};
use rustc_hash::FxHashMap;
use std::io::Cursor;
use super::super::types::SegmentFiles;
use super::super::vector_data::LazyFlatVectorData;
use super::bmp::BmpIndex;
use super::{SparseIndex, VectorIndex};
use crate::Result;
use crate::directories::{Directory, FileHandle};
use crate::dsl::Schema;
pub struct SparseFileData {
pub maxscore_indexes: FxHashMap<u32, SparseIndex>,
pub bmp_indexes: FxHashMap<u32, BmpIndex>,
}
pub struct VectorsFileData {
pub indexes: FxHashMap<u32, VectorIndex>,
pub flat_vectors: FxHashMap<u32, LazyFlatVectorData>,
}
use crate::segment::format::{
DENSE_TOC_ENTRY_SIZE, DenseVectorTocEntry, FOOTER_SIZE, VECTORS_FOOTER_MAGIC, read_dense_toc,
};
pub async fn load_vectors_file<D: Directory>(
dir: &D,
files: &SegmentFiles,
schema: &Schema,
) -> Result<VectorsFileData> {
let mut indexes = FxHashMap::default();
let mut flat_vectors = FxHashMap::default();
let empty = || VectorsFileData {
indexes: FxHashMap::default(),
flat_vectors: FxHashMap::default(),
};
let has_dense_vectors = schema.fields().any(|(_, entry)| {
entry.dense_vector_config.is_some() || entry.binary_dense_vector_config.is_some()
});
if !has_dense_vectors {
return Ok(empty());
}
let handle = match dir.open_lazy(&files.vectors).await {
Ok(h) => h,
Err(_) => return Ok(empty()),
};
let file_size = handle.len();
if file_size < FOOTER_SIZE {
return Ok(empty());
}
let footer_bytes = handle
.read_bytes_range(file_size - FOOTER_SIZE..file_size)
.await?;
let mut cursor = Cursor::new(footer_bytes.as_slice());
let toc_offset = cursor.read_u64::<LittleEndian>()?;
let num_fields = cursor.read_u32::<LittleEndian>()?;
let magic = cursor.read_u32::<LittleEndian>()?;
let entries: Vec<DenseVectorTocEntry> =
if magic == VECTORS_FOOTER_MAGIC && toc_offset < file_size - FOOTER_SIZE {
let toc_size = num_fields as u64 * DENSE_TOC_ENTRY_SIZE;
let toc_bytes = handle
.read_bytes_range(toc_offset..toc_offset + toc_size)
.await?;
read_dense_toc(toc_bytes.as_slice(), num_fields)?
} else {
let header_bytes = handle.read_bytes_range(0..4).await?;
let mut cursor = Cursor::new(header_bytes.as_slice());
let num_fields = cursor.read_u32::<LittleEndian>()?;
if num_fields == 0 {
return Ok(empty());
}
let entries_size = num_fields as u64 * DENSE_TOC_ENTRY_SIZE;
let entries_bytes = handle.read_bytes_range(4..4 + entries_size).await?;
read_dense_toc(entries_bytes.as_slice(), num_fields)?
};
if entries.is_empty() {
return Ok(empty());
}
use crate::segment::ann_build;
for DenseVectorTocEntry {
field_id,
index_type,
offset,
size: length,
} in entries
{
match index_type {
ann_build::FLAT_TYPE => {
let slice = handle.slice(offset..offset + length);
match LazyFlatVectorData::open(slice).await {
Ok(lazy_flat) => {
flat_vectors.insert(field_id, lazy_flat);
}
Err(e) => {
log::warn!(
"Failed to load lazy flat vectors for field {}: {}",
field_id,
e
);
}
}
}
ann_build::SCANN_TYPE => {
let data = handle.read_bytes_range(offset..offset + length).await?;
indexes.insert(
field_id,
VectorIndex::ScaNN(Arc::new(super::types::LazyScaNN::new(data))),
);
}
ann_build::IVF_RABITQ_TYPE => {
let data = handle.read_bytes_range(offset..offset + length).await?;
indexes.insert(
field_id,
VectorIndex::IVF(Arc::new(super::types::LazyIVF::new(data))),
);
}
ann_build::RABITQ_TYPE => {
let data = handle.read_bytes_range(offset..offset + length).await?;
indexes.insert(
field_id,
VectorIndex::RaBitQ(Arc::new(super::types::LazyRaBitQ::new(data))),
);
}
_ => {
log::warn!(
"Unknown vector index type {} for field {}",
index_type,
field_id
);
}
}
}
Ok(VectorsFileData {
indexes,
flat_vectors,
})
}
pub async fn load_sparse_file<D: Directory>(
dir: &D,
files: &SegmentFiles,
total_docs: u32,
schema: &Schema,
) -> Result<SparseFileData> {
use crate::segment::format::{SPARSE_FOOTER_MAGIC, SPARSE_FOOTER_SIZE};
use crate::structures::SparseVectorConfig;
let empty = || SparseFileData {
maxscore_indexes: FxHashMap::default(),
bmp_indexes: FxHashMap::default(),
};
let mut maxscore_indexes = FxHashMap::default();
let mut bmp_indexes = FxHashMap::default();
let has_sparse_vectors = schema
.fields()
.any(|(_, entry)| entry.sparse_vector_config.is_some());
if !has_sparse_vectors {
return Ok(empty());
}
let handle = match dir.open_lazy(&files.sparse).await {
Ok(h) => h,
Err(e) => {
log::debug!("No sparse file found ({}): {:?}", files.sparse.display(), e);
return Ok(empty());
}
};
let file_size = handle.len();
if file_size < SPARSE_FOOTER_SIZE {
return Ok(empty());
}
let footer_bytes = match handle
.read_bytes_range(file_size - SPARSE_FOOTER_SIZE..file_size)
.await
{
Ok(d) => d,
Err(_) => return Ok(empty()),
};
let fb = footer_bytes.as_slice();
let skip_offset = u64::from_le_bytes(fb[0..8].try_into().unwrap());
let toc_offset = u64::from_le_bytes(fb[8..16].try_into().unwrap());
let num_fields = u32::from_le_bytes(fb[16..20].try_into().unwrap());
let magic = u32::from_le_bytes(fb[20..24].try_into().unwrap());
if magic != SPARSE_FOOTER_MAGIC {
return Err(crate::Error::Corruption(format!(
"Invalid sparse footer magic: {:#x} (expected {:#x})",
magic, SPARSE_FOOTER_MAGIC
)));
}
log::debug!(
"Loading sparse: size={} bytes, num_fields={}, skip_offset={}, toc_offset={}",
file_size,
num_fields,
skip_offset,
toc_offset,
);
if num_fields == 0 {
return Ok(empty());
}
let tail_bytes = handle
.read_bytes_range(skip_offset..file_size - SPARSE_FOOTER_SIZE)
.await?;
let tail = tail_bytes.as_slice();
let skip_section_len = (toc_offset - skip_offset) as usize;
let skip_section = tail_bytes.slice(0..skip_section_len);
let toc_data = &tail[skip_section_len..];
let mut pos = 0usize;
for _ in 0..num_fields {
let field_id = u32::from_le_bytes(toc_data[pos..pos + 4].try_into().unwrap());
let quantization = toc_data[pos + 4];
let ndims = u32::from_le_bytes(toc_data[pos + 5..pos + 9].try_into().unwrap()) as usize;
let total_vectors = u32::from_le_bytes(toc_data[pos + 9..pos + 13].try_into().unwrap());
pos += 13;
let is_bmp = SparseVectorConfig::from_byte(quantization)
.is_some_and(|c| c.format == crate::structures::SparseFormat::Bmp);
if is_bmp && ndims >= 1 {
let d = &toc_data[pos..pos + 28];
let dim_id = u32::from_le_bytes(d[0..4].try_into().unwrap());
let blob_offset = u64::from_le_bytes(d[4..12].try_into().unwrap());
let blob_len_low = u32::from_le_bytes(d[12..16].try_into().unwrap());
let blob_len_high = u32::from_le_bytes(d[16..20].try_into().unwrap());
pos += 28;
for _ in 1..ndims {
pos += 28;
}
if dim_id != 0xFFFFFFFF {
log::warn!(
"BMP field {} has unexpected dim_id {:#x} (expected sentinel)",
field_id,
dim_id
);
}
let blob_len = (blob_len_high as u64) << 32 | blob_len_low as u64;
match BmpIndex::parse(
handle.clone(),
blob_offset,
blob_len,
total_docs,
total_vectors,
) {
Ok(idx) => {
log::debug!(
"Loaded BMP index for field {}: dims={}, num_blocks={}, total_vectors={}",
field_id,
idx.dims(),
idx.num_blocks,
total_vectors,
);
bmp_indexes.insert(field_id, idx);
}
Err(e) => {
return Err(e);
}
}
} else {
let mut dims = super::types::DimensionTable::with_capacity(ndims);
for _ in 0..ndims {
let d = &toc_data[pos..pos + 28];
let dim_id = u32::from_le_bytes(d[0..4].try_into().unwrap());
let block_data_offset = u64::from_le_bytes(d[4..12].try_into().unwrap());
let skip_start = u32::from_le_bytes(d[12..16].try_into().unwrap());
let num_blocks = u32::from_le_bytes(d[16..20].try_into().unwrap());
let doc_count = u32::from_le_bytes(d[20..24].try_into().unwrap());
let max_weight = f32::from_le_bytes(d[24..28].try_into().unwrap());
dims.push(
dim_id,
block_data_offset,
skip_start,
num_blocks,
doc_count,
max_weight,
);
pos += 28;
}
dims.sort_by_dim_id();
log::debug!(
"Loaded sparse index for field {}: num_dims={}, total_vectors={}, skip_bytes={}",
field_id,
dims.len(),
total_vectors,
skip_section.len(),
);
maxscore_indexes.insert(
field_id,
SparseIndex::new(
handle.clone(),
dims,
skip_section.clone(),
total_docs,
total_vectors,
),
);
}
}
log::debug!(
"Sparse file loaded: maxscore_fields={:?}, bmp_fields={:?}",
maxscore_indexes.keys().collect::<Vec<_>>(),
bmp_indexes.keys().collect::<Vec<_>>()
);
Ok(SparseFileData {
maxscore_indexes,
bmp_indexes,
})
}
pub async fn open_positions_file<D: Directory>(
dir: &D,
files: &SegmentFiles,
schema: &Schema,
) -> Result<Option<FileHandle>> {
let has_positions = schema.fields().any(|(_, entry)| entry.positions.is_some());
if !has_positions {
return Ok(None);
}
match dir.open_lazy(&files.positions).await {
Ok(h) => Ok(Some(h)),
Err(_) => Ok(None),
}
}
pub async fn load_fast_fields_file<D: Directory>(
dir: &D,
files: &SegmentFiles,
schema: &Schema,
) -> Result<FxHashMap<u32, crate::structures::fast_field::FastFieldReader>> {
use crate::structures::fast_field::{
FastFieldReader, read_fast_field_footer, read_fast_field_toc,
};
let has_fast = schema.fields().any(|(_, entry)| entry.fast);
if !has_fast {
return Ok(FxHashMap::default());
}
let handle = match dir.open_read(&files.fast).await {
Ok(h) => h,
Err(e) => {
log::debug!("[fast-fields] .fast file not found ({}), skipping", e);
return Ok(FxHashMap::default());
}
};
let file_data = handle.read_bytes().await?;
if file_data.is_empty() {
return Ok(FxHashMap::default());
}
let (toc_offset, num_columns) = read_fast_field_footer(&file_data).map_err(crate::Error::Io)?;
let mut readers = FxHashMap::default();
let toc_entries =
read_fast_field_toc(&file_data, toc_offset, num_columns).map_err(crate::Error::Io)?;
for toc in &toc_entries {
let reader = FastFieldReader::open(&file_data, toc).map_err(crate::Error::Io)?;
readers.insert(toc.field_id, reader);
}
log::debug!(
"[fast-fields] loaded {} columns from .fast file",
readers.len(),
);
Ok(readers)
}