use std::io;
use crate::store::IndexInput;
pub struct DirectReader<'a> {
input: IndexInput<'a>,
bits_per_value: u32,
offset: u64,
}
impl<'a> DirectReader<'a> {
pub fn new(bytes: &'a [u8], bits_per_value: u32, offset: u64) -> Self {
Self {
input: IndexInput::unnamed(bytes),
bits_per_value,
offset,
}
}
pub fn get(&mut self, index: u64) -> io::Result<i64> {
let bpv = self.bits_per_value;
match bpv {
0 => Ok(0),
1 | 2 | 4 => self.get_sub_byte(index, bpv),
8 => self.get_byte_aligned(index, 1, 0xFF),
12 => self.get_odd(index, bpv, 0xFFF),
16 => self.get_byte_aligned(index, 2, 0xFFFF),
20 => self.get_odd(index, bpv, 0xFFFFF),
24 => self.get_byte_aligned(index, 3, 0xFFFFFF),
28 => self.get_odd(index, bpv, 0xFFFFFFF),
32 => self.get_byte_aligned(index, 4, 0xFFFFFFFF),
40 => self.get_byte_aligned(index, 5, 0xFF_FFFFFFFF),
48 => self.get_byte_aligned(index, 6, 0xFFFF_FFFFFFFF),
56 => self.get_byte_aligned(index, 7, 0xFFFFFF_FFFFFFFF),
64 => self.get_byte_aligned(index, 8, u64::MAX),
_ => Err(io::Error::other(format!(
"unsupported bits per value: {bpv}"
))),
}
}
fn get_sub_byte(&mut self, index: u64, bpv: u32) -> io::Result<i64> {
let values_per_byte = 8 / bpv;
let byte_offset = index / values_per_byte as u64;
let bit_offset = (index % values_per_byte as u64) * bpv as u64;
let mask = (1u64 << bpv) - 1;
self.input.seek((self.offset + byte_offset) as usize)?;
let b = self.input.read_byte()? as u64;
Ok(((b >> bit_offset) & mask) as i64)
}
fn get_byte_aligned(&mut self, index: u64, bytes_per_value: u64, mask: u64) -> io::Result<i64> {
let pos = self.offset + index * bytes_per_value;
self.input.seek(pos as usize)?;
let mut buf = [0u8; 8];
self.input
.read_bytes(&mut buf[..bytes_per_value as usize])?;
let raw = u64::from_le_bytes(buf);
Ok((raw & mask) as i64)
}
fn get_odd(&mut self, index: u64, bpv: u32, mask: u64) -> io::Result<i64> {
let byte_offset = (index * bpv as u64) / 8;
let shift = (index & 1) * 4;
let bytes_to_read = if bpv <= 16 { 4usize } else { 8 };
self.input.seek((self.offset + byte_offset) as usize)?;
let mut buf = [0u8; 8];
let read_len = bytes_to_read.min(self.input.length() - self.input.position());
self.input.read_bytes(&mut buf[..read_len])?;
let raw = u64::from_le_bytes(buf);
Ok(((raw >> shift) & mask) as i64)
}
}
struct BlockMeta {
min: i64,
avg: f32,
bpv: u8,
offset: u64,
}
pub(crate) struct DirectMonotonicReader {
block_shift: u32,
base_data_offset: u64,
blocks: Box<[BlockMeta]>,
}
impl DirectMonotonicReader {
pub(crate) fn load_with_shift(
meta: &mut IndexInput<'_>,
num_values: u32,
base_data_offset: u64,
block_shift: u32,
) -> io::Result<Self> {
let block_size = 1u32 << block_shift;
let num_blocks = (num_values as usize).div_ceil(block_size as usize);
let mut blocks = Vec::with_capacity(num_blocks);
for _ in 0..num_blocks {
let min = meta.read_le_long()?;
let avg_bits = meta.read_le_int()? as u32;
let avg = f32::from_bits(avg_bits);
let offset = meta.read_le_long()? as u64;
let bpv = meta.read_byte()?;
blocks.push(BlockMeta {
min,
avg,
bpv,
offset,
});
}
Ok(Self {
block_shift,
base_data_offset,
blocks: blocks.into_boxed_slice(),
})
}
pub(crate) fn get(&self, index: u64, data: &[u8]) -> io::Result<i64> {
let block = (index >> self.block_shift) as usize;
let block_index = index & ((1u64 << self.block_shift) - 1);
let meta = &self.blocks[block];
let delta = if meta.bpv == 0 {
0
} else {
let start = (self.base_data_offset + meta.offset) as usize;
let mut reader = DirectReader::new(&data[start..], meta.bpv as u32, 0);
reader.get(block_index)?
};
Ok(meta.min + (meta.avg as f64 * block_index as f64) as i64 + delta)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codecs::packed_writers::DirectWriter;
use crate::store::memory::MemoryIndexOutput;
use crate::store::{DataOutput, IndexOutput};
fn round_trip(values: &[i64], bpv: u32) {
let mut out = MemoryIndexOutput::new("test".to_string());
let mut writer = DirectWriter::new(bpv);
for &v in values {
writer.add(v);
}
writer.finish(&mut out).unwrap();
let mut reader = DirectReader::new(out.bytes(), bpv, 0);
for (i, &expected) in values.iter().enumerate() {
let actual = reader.get(i as u64).unwrap();
assert_eq!(actual, expected, "mismatch at index {i} for bpv {bpv}");
}
}
#[test]
fn test_direct_reader_bpv1() {
round_trip(&[1, 0, 1, 1, 0, 0, 1, 0], 1);
}
#[test]
fn test_direct_reader_bpv2() {
round_trip(&[0, 1, 2, 3, 3, 2, 1, 0], 2);
}
#[test]
fn test_direct_reader_bpv4() {
round_trip(&[0, 5, 10, 15, 1, 14, 7, 8], 4);
}
#[test]
fn test_direct_reader_bpv8() {
round_trip(&[0, 127, 255, 1, 42], 8);
}
#[test]
fn test_direct_reader_bpv12() {
round_trip(&[0x123, 0x456, 0x789, 0xABC], 12);
}
#[test]
fn test_direct_reader_bpv16() {
round_trip(&[0, 0x1234, 0xABCD, 0xFFFF], 16);
}
#[test]
fn test_direct_reader_bpv20() {
round_trip(&[0, 0x12345, 0xABCDE, 0xFFFFF], 20);
}
#[test]
fn test_direct_reader_bpv24() {
round_trip(&[0, 0x123456, 0xABCDEF, 0xFFFFFF], 24);
}
#[test]
fn test_direct_reader_bpv28() {
round_trip(&[0, 0x1234567, 0xABCDEF0, 0xFFFFFFF], 28);
}
#[test]
fn test_direct_reader_bpv32() {
round_trip(&[0, 0x12345678, 0xFFFFFFFF], 32);
}
#[test]
fn test_direct_reader_bpv40() {
round_trip(&[0, 0x12_3456_7890, 0xFF_FFFFFFFF], 40);
}
#[test]
fn test_direct_reader_bpv48() {
round_trip(&[0, 0x1234_5678_9ABC, 0xFFFF_FFFFFFFF], 48);
}
#[test]
fn test_direct_reader_bpv56() {
round_trip(&[0, 0x12_3456_7890_ABCD, 0xFFFFFF_FFFFFFFF], 56);
}
#[test]
fn test_direct_reader_bpv64() {
round_trip(&[0, i64::MAX, 1, 0x1234_5678_9ABC_DEF0u64 as i64], 64);
}
#[test]
fn test_direct_reader_bpv0() {
let mut reader = DirectReader::new(&[], 0, 0);
assert_eq!(reader.get(0).unwrap(), 0);
assert_eq!(reader.get(100).unwrap(), 0);
}
#[test]
fn test_direct_reader_single_value() {
for bpv in [1, 2, 4, 8, 12, 16, 20, 24, 28, 32, 40, 48, 56, 64] {
round_trip(&[1], bpv);
}
}
#[test]
fn test_direct_reader_with_offset() {
let mut out = MemoryIndexOutput::new("test".to_string());
for i in 0..10u8 {
out.write_byte(i).unwrap();
}
let offset = out.file_pointer();
let mut writer = DirectWriter::new(8);
writer.add(42);
writer.add(99);
writer.add(255);
writer.finish(&mut out).unwrap();
let mut reader = DirectReader::new(out.bytes(), 8, offset);
assert_eq!(reader.get(0).unwrap(), 42);
assert_eq!(reader.get(1).unwrap(), 99);
assert_eq!(reader.get(2).unwrap(), 255);
}
use crate::codecs::packed_writers::DirectMonotonicWriter;
fn monotonic_round_trip(values: &[i64], block_shift: u32) {
let mut writer = DirectMonotonicWriter::new(block_shift);
for &v in values {
writer.add(v);
}
let mut meta_out = MemoryIndexOutput::new("meta".to_string());
let mut data_out = MemoryIndexOutput::new("data".to_string());
let _num_blocks = writer.finish(&mut meta_out, &mut data_out).unwrap();
let meta_bytes = meta_out.bytes().to_vec();
let data_bytes = data_out.bytes().to_vec();
let mut meta_input = IndexInput::unnamed(&meta_bytes);
let num_values = values.len() as u32;
let reader =
DirectMonotonicReader::load_with_shift(&mut meta_input, num_values, 0, block_shift)
.unwrap();
for (i, &expected) in values.iter().enumerate() {
let actual = reader.get(i as u64, &data_bytes).unwrap();
assert_eq!(
actual, expected,
"mismatch at index {i}: expected {expected}, got {actual}"
);
}
}
#[test]
fn test_monotonic_reader_simple() {
monotonic_round_trip(&[0, 10, 20, 30], 2);
}
#[test]
fn test_monotonic_reader_constant() {
monotonic_round_trip(&[42, 42, 42, 42], 2);
}
#[test]
fn test_monotonic_reader_multiple_blocks() {
monotonic_round_trip(&[0, 100, 200, 300, 400], 1);
}
#[test]
fn test_monotonic_reader_single_value() {
monotonic_round_trip(&[999], 2);
}
#[test]
fn test_monotonic_reader_large_values() {
monotonic_round_trip(&[0, 1_000_000, 2_000_000, 3_000_000, 10_000_000], 2);
}
#[test]
fn test_monotonic_reader_irregular_increments() {
monotonic_round_trip(&[0, 1, 100, 101, 10000, 10001, 10002, 10003], 2);
}
#[test]
fn test_monotonic_reader_two_values() {
monotonic_round_trip(&[0, 1], 1);
}
#[test]
fn test_monotonic_reader_large_block() {
let values: Vec<i64> = (0..100).map(|i| i * 7).collect();
monotonic_round_trip(&values, 10);
}
}