use std::io;
use log::debug;
use crate::codecs::codec_util;
use crate::codecs::lucene90::points::{
BKD_CODEC, BKD_VERSION, DATA_CODEC, DATA_EXTENSION, FORMAT_VERSION, INDEX_CODEC,
INDEX_EXTENSION, META_CODEC, META_EXTENSION,
};
use crate::index::{FieldInfos, index_file_names};
use crate::store::checksum_input::ChecksumIndexInput;
use crate::store::{DataInput, Directory, IndexInput};
#[derive(Clone)]
struct BkdEntry {
num_leaves: u32,
point_count: i64,
doc_count: i32,
}
pub struct PointsReader {
entries: Box<[Option<BkdEntry>]>,
#[expect(dead_code)]
index_in: Box<dyn IndexInput>,
#[expect(dead_code)]
data_in: Box<dyn IndexInput>,
}
impl PointsReader {
pub fn open(
directory: &dyn Directory,
segment_name: &str,
segment_suffix: &str,
segment_id: &[u8; codec_util::ID_LENGTH],
field_infos: &FieldInfos,
) -> io::Result<Self> {
let kdi_name =
index_file_names::segment_file_name(segment_name, segment_suffix, INDEX_EXTENSION);
let mut index_in = directory.open_input(&kdi_name)?;
codec_util::check_index_header(
index_in.as_mut(),
INDEX_CODEC,
FORMAT_VERSION,
FORMAT_VERSION,
segment_id,
segment_suffix,
)?;
codec_util::retrieve_checksum(index_in.as_mut())?;
let kdd_name =
index_file_names::segment_file_name(segment_name, segment_suffix, DATA_EXTENSION);
let mut data_in = directory.open_input(&kdd_name)?;
codec_util::check_index_header(
data_in.as_mut(),
DATA_CODEC,
FORMAT_VERSION,
FORMAT_VERSION,
segment_id,
segment_suffix,
)?;
codec_util::retrieve_checksum(data_in.as_mut())?;
let kdm_name =
index_file_names::segment_file_name(segment_name, segment_suffix, META_EXTENSION);
let meta_input = directory.open_input(&kdm_name)?;
let mut meta_in = ChecksumIndexInput::new(meta_input);
codec_util::check_index_header(
&mut meta_in,
META_CODEC,
FORMAT_VERSION,
FORMAT_VERSION,
segment_id,
segment_suffix,
)?;
let entries = read_fields(&mut meta_in, field_infos)?;
let index_length = meta_in.read_le_long()?;
let data_length = meta_in.read_le_long()?;
codec_util::check_footer(&mut meta_in)?;
codec_util::retrieve_checksum_with_length(index_in.as_mut(), index_length)?;
codec_util::retrieve_checksum_with_length(data_in.as_mut(), data_length)?;
debug!(
"points_reader: opened {} entries for segment {segment_name}",
entries.iter().filter(|e| e.is_some()).count()
);
Ok(Self {
entries,
index_in,
data_in,
})
}
pub fn point_count(&self, field_number: u32) -> Option<i64> {
self.entry(field_number).map(|e| e.point_count)
}
pub fn doc_count(&self, field_number: u32) -> Option<i32> {
self.entry(field_number).map(|e| e.doc_count)
}
pub fn num_leaves(&self, field_number: u32) -> Option<u32> {
self.entry(field_number).map(|e| e.num_leaves)
}
fn entry(&self, field_number: u32) -> Option<&BkdEntry> {
self.entries
.get(field_number as usize)
.and_then(|opt| opt.as_ref())
}
}
fn read_fields(
meta: &mut dyn DataInput,
field_infos: &FieldInfos,
) -> io::Result<Box<[Option<BkdEntry>]>> {
let mut entries: Vec<Option<BkdEntry>> = vec![None; field_infos.len()];
loop {
let field_number = meta.read_le_int()?;
if field_number == -1 {
break;
}
if field_number < 0 {
return Err(io::Error::other(format!(
"Illegal field number: {field_number}"
)));
}
let field_number = field_number as u32;
let _info = field_infos
.field_info_by_number(field_number)
.ok_or_else(|| io::Error::other(format!("invalid field number: {field_number}")))?;
let entry = read_bkd_entry(meta)?;
entries[field_number as usize] = Some(entry);
}
Ok(entries.into_boxed_slice())
}
fn read_bkd_entry(meta: &mut dyn DataInput) -> io::Result<BkdEntry> {
codec_util::check_header(meta, BKD_CODEC, BKD_VERSION, BKD_VERSION)?;
let _num_dims = meta.read_vint()? as u32;
let num_index_dims = meta.read_vint()? as u32;
let _max_points_in_leaf = meta.read_vint()?;
let bytes_per_dim = meta.read_vint()? as u32;
let num_leaves = meta.read_vint()? as u32;
let packed_len = (num_index_dims * bytes_per_dim) as u64;
meta.skip_bytes(packed_len * 2)?;
let point_count = meta.read_vlong()?;
let doc_count = meta.read_vint()?;
let _num_index_bytes = meta.read_vint()?;
let _data_start_fp = meta.read_le_long()?;
let _index_start_fp = meta.read_le_long()?;
Ok(BkdEntry {
num_leaves,
point_count,
doc_count,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codecs::lucene90::points;
use crate::document::{DocValuesType, IndexOptions};
use crate::index::indexing_chain::PerFieldData;
use crate::index::{FieldInfo, FieldInfos, PointDimensionConfig};
use crate::store::{MemoryDirectory, SharedDirectory};
use assertables::*;
use std::collections::HashMap;
fn test_directory() -> SharedDirectory {
SharedDirectory::new(Box::new(MemoryDirectory::new()))
}
fn make_point_field(
name: &str,
number: u32,
dims: u32,
index_dims: u32,
num_bytes: u32,
) -> FieldInfo {
FieldInfo::new(
name.to_string(),
number,
false,
true,
IndexOptions::None,
DocValuesType::None,
PointDimensionConfig {
dimension_count: dims,
index_dimension_count: index_dims,
num_bytes,
},
)
}
fn make_point_data(values: Vec<(i32, Vec<u8>)>) -> PerFieldData {
let mut pfd = PerFieldData::new();
pfd.points = values;
pfd
}
fn write_and_read(
field_infos: &FieldInfos,
per_field: &HashMap<String, PerFieldData>,
num_docs: i32,
) -> PointsReader {
let segment_id = [0u8; 16];
let dir = test_directory();
points::write(
&dir,
"_0",
"",
&segment_id,
field_infos,
per_field,
num_docs,
)
.unwrap();
let guard = dir.lock().unwrap();
PointsReader::open(guard.as_ref(), "_0", "", &segment_id, field_infos).unwrap()
}
#[test]
fn test_1d_int_field() {
let fi = make_point_field("size", 0, 1, 1, 4);
let field_infos = FieldInfos::new(vec![fi]);
let mut per_field = HashMap::new();
per_field.insert(
"size".to_string(),
make_point_data(vec![
(0, 100i32.to_be_bytes().to_vec()),
(1, 200i32.to_be_bytes().to_vec()),
(2, 300i32.to_be_bytes().to_vec()),
]),
);
let reader = write_and_read(&field_infos, &per_field, 3);
assert_eq!(reader.point_count(0), Some(3));
assert_eq!(reader.doc_count(0), Some(3));
}
#[test]
fn test_2d_latlon_field() {
let fi = make_point_field("location", 0, 2, 2, 4);
let field_infos = FieldInfos::new(vec![fi]);
let mut per_field = HashMap::new();
let mut point1 = Vec::new();
point1.extend_from_slice(&10i32.to_be_bytes());
point1.extend_from_slice(&20i32.to_be_bytes());
let mut point2 = Vec::new();
point2.extend_from_slice(&30i32.to_be_bytes());
point2.extend_from_slice(&40i32.to_be_bytes());
per_field.insert(
"location".to_string(),
make_point_data(vec![(0, point1), (1, point2)]),
);
let reader = write_and_read(&field_infos, &per_field, 2);
assert_eq!(reader.point_count(0), Some(2));
assert_eq!(reader.doc_count(0), Some(2));
assert_eq!(reader.num_leaves(0), Some(1));
}
#[test]
fn test_multiple_fields() {
let fi_size = make_point_field("size", 0, 1, 1, 4);
let fi_loc = make_point_field("location", 1, 2, 2, 4);
let field_infos = FieldInfos::new(vec![fi_size, fi_loc]);
let mut per_field = HashMap::new();
per_field.insert(
"size".to_string(),
make_point_data(vec![
(0, 100i32.to_be_bytes().to_vec()),
(1, 200i32.to_be_bytes().to_vec()),
]),
);
let mut loc_point = Vec::new();
loc_point.extend_from_slice(&10i32.to_be_bytes());
loc_point.extend_from_slice(&20i32.to_be_bytes());
per_field.insert(
"location".to_string(),
make_point_data(vec![(0, loc_point)]),
);
let reader = write_and_read(&field_infos, &per_field, 2);
assert_eq!(reader.point_count(0), Some(2));
assert_eq!(reader.doc_count(0), Some(2));
assert_eq!(reader.point_count(1), Some(1));
assert_eq!(reader.doc_count(1), Some(1));
}
#[test]
fn test_nonexistent_field() {
let fi = make_point_field("size", 0, 1, 1, 4);
let field_infos = FieldInfos::new(vec![fi]);
let mut per_field = HashMap::new();
per_field.insert(
"size".to_string(),
make_point_data(vec![(0, 42i32.to_be_bytes().to_vec())]),
);
let reader = write_and_read(&field_infos, &per_field, 1);
assert_none!(reader.point_count(99));
assert_none!(reader.doc_count(99));
}
#[test]
fn test_8byte_long_field() {
let fi = make_point_field("modified", 0, 1, 1, 8);
let field_infos = FieldInfos::new(vec![fi]);
let mut per_field = HashMap::new();
per_field.insert(
"modified".to_string(),
make_point_data(vec![
(0, 1000i64.to_be_bytes().to_vec()),
(1, 2000i64.to_be_bytes().to_vec()),
]),
);
let reader = write_and_read(&field_infos, &per_field, 2);
assert_eq!(reader.point_count(0), Some(2));
assert_eq!(reader.doc_count(0), Some(2));
assert_eq!(reader.num_leaves(0), Some(1));
}
#[test]
fn test_truncated_data_file_detected() {
let fi = make_point_field("size", 0, 1, 1, 4);
let field_infos = FieldInfos::new(vec![fi]);
let segment_id = [0u8; 16];
let dir = test_directory();
let mut per_field = HashMap::new();
per_field.insert(
"size".to_string(),
make_point_data(vec![(0, 42i32.to_be_bytes().to_vec())]),
);
points::write(&dir, "_0", "", &segment_id, &field_infos, &per_field, 1).unwrap();
let mut mem_dir = MemoryDirectory::new();
let guard = dir.lock().unwrap();
for name in guard.list_all().unwrap() {
let data = guard.read_file(&name).unwrap();
if name.ends_with(".kdd") {
mem_dir.write_file(&name, &data[..data.len() - 4]).unwrap();
} else {
mem_dir.write_file(&name, &data).unwrap();
}
}
let result = PointsReader::open(&mem_dir, "_0", "", &segment_id, &field_infos);
assert!(result.is_err(), "should detect truncated .kdd");
}
}