use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use std::io::{self, Write};
use std::ops::Range;
use crate::directories::OwnedBytes;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BlockAddr {
pub offset: u64,
pub length: u32,
}
impl BlockAddr {
pub fn byte_range(&self) -> Range<u64> {
self.offset..self.offset + self.length as u64
}
}
#[derive(Debug)]
pub struct BlockAddrStore {
num_blocks: u32,
offset_bits: u8,
length_bits: u8,
addrs: Vec<BlockAddr>,
}
impl BlockAddrStore {
pub fn build(addrs: &[BlockAddr]) -> io::Result<Vec<u8>> {
if addrs.is_empty() {
let mut buf = Vec::with_capacity(6);
buf.write_u32::<LittleEndian>(0)?;
buf.write_u8(0)?;
buf.write_u8(0)?;
return Ok(buf);
}
let mut deltas = Vec::with_capacity(addrs.len());
let mut prev_end: u64 = 0;
let mut max_delta: u64 = 0;
let mut max_length: u32 = 0;
for addr in addrs {
let delta = addr.offset.saturating_sub(prev_end);
deltas.push(delta);
max_delta = max_delta.max(delta);
max_length = max_length.max(addr.length);
prev_end = addr.offset + addr.length as u64;
}
let offset_bits = if max_delta == 0 {
1
} else {
(64 - max_delta.leading_zeros()) as u8
};
let length_bits = if max_length == 0 {
1
} else {
(32 - max_length.leading_zeros()) as u8
};
let bits_per_entry = offset_bits as usize + length_bits as usize;
let total_bits = bits_per_entry * addrs.len();
let packed_bytes = total_bits.div_ceil(8);
let mut buf = Vec::with_capacity(6 + packed_bytes);
buf.write_u32::<LittleEndian>(addrs.len() as u32)?;
buf.write_u8(offset_bits)?;
buf.write_u8(length_bits)?;
let mut bit_writer = BitWriter::new(&mut buf);
for (i, addr) in addrs.iter().enumerate() {
bit_writer.write(deltas[i], offset_bits)?;
bit_writer.write(addr.length as u64, length_bits)?;
}
bit_writer.flush()?;
Ok(buf)
}
pub fn load(data: OwnedBytes) -> io::Result<Self> {
if data.len() < 6 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"BlockAddrStore data too short",
));
}
let mut reader = data.as_slice();
let num_blocks = reader.read_u32::<LittleEndian>()?;
let offset_bits = reader.read_u8()?;
let length_bits = reader.read_u8()?;
let packed_data = &data.as_slice()[6..];
let mut bit_reader = BitReader::new(packed_data);
let mut addrs = Vec::with_capacity(num_blocks as usize);
let mut current_offset: u64 = 0;
for _ in 0..num_blocks {
if let (Ok(delta), Ok(length)) =
(bit_reader.read(offset_bits), bit_reader.read(length_bits))
{
current_offset += delta;
addrs.push(BlockAddr {
offset: current_offset,
length: length as u32,
});
current_offset += length;
}
}
Ok(Self {
num_blocks,
offset_bits,
length_bits,
addrs,
})
}
pub fn len(&self) -> usize {
self.num_blocks as usize
}
pub fn is_empty(&self) -> bool {
self.num_blocks == 0
}
#[inline]
pub fn get(&self, idx: usize) -> Option<BlockAddr> {
self.addrs.get(idx).copied()
}
pub fn all(&self) -> Vec<BlockAddr> {
self.addrs.clone()
}
}
#[cfg(feature = "fst-index")]
pub struct FstBlockIndex {
fst: fst::Map<OwnedBytes>,
block_addrs: BlockAddrStore,
}
#[cfg(feature = "fst-index")]
impl FstBlockIndex {
pub fn build(entries: &[(Vec<u8>, BlockAddr)]) -> io::Result<Vec<u8>> {
use fst::MapBuilder;
let mut fst_builder = MapBuilder::memory();
for (i, (key, _)) in entries.iter().enumerate() {
fst_builder
.insert(key, i as u64)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
}
let fst_bytes = fst_builder
.into_inner()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let addrs: Vec<BlockAddr> = entries.iter().map(|(_, addr)| *addr).collect();
let addr_bytes = BlockAddrStore::build(&addrs)?;
let mut result = Vec::with_capacity(4 + fst_bytes.len() + addr_bytes.len());
result.write_u32::<LittleEndian>(fst_bytes.len() as u32)?;
result.extend_from_slice(&fst_bytes);
result.extend_from_slice(&addr_bytes);
Ok(result)
}
pub fn load(data: OwnedBytes) -> io::Result<Self> {
if data.len() < 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"FstBlockIndex data too short",
));
}
let fst_len = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
if data.len() < 4 + fst_len {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"FstBlockIndex FST data truncated",
));
}
let fst_data = data.slice(4..4 + fst_len);
let addr_data = data.slice(4 + fst_len..data.len());
let fst =
fst::Map::new(fst_data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let block_addrs = BlockAddrStore::load(addr_data)?;
Ok(Self { fst, block_addrs })
}
pub fn locate(&self, key: &[u8]) -> Option<usize> {
if let Some(ordinal) = self.fst.get(key) {
return Some(ordinal as usize);
}
use fst::{IntoStreamer, Streamer};
let mut stream = self.fst.range().gt(key).into_stream();
match stream.next() {
Some((_, ordinal)) if ordinal > 0 => Some(ordinal as usize - 1),
Some(_) => None, None => {
let len = self.fst.len();
if len > 0 { Some(len - 1) } else { None }
}
}
}
pub fn get_addr(&self, ordinal: usize) -> Option<BlockAddr> {
self.block_addrs.get(ordinal)
}
pub fn len(&self) -> usize {
self.block_addrs.len()
}
pub fn is_empty(&self) -> bool {
self.block_addrs.is_empty()
}
pub fn all_addrs(&self) -> Vec<BlockAddr> {
self.block_addrs.all()
}
}
pub struct MmapBlockIndex {
data: OwnedBytes,
num_blocks: u32,
block_addrs: BlockAddrStore,
keys_offset: usize,
keys_end: usize,
restart_array_offset: usize,
restart_count: usize,
restart_interval: usize,
}
const RESTART_INTERVAL: usize = 16;
impl MmapBlockIndex {
pub fn build(entries: &[(Vec<u8>, BlockAddr)]) -> io::Result<Vec<u8>> {
if entries.is_empty() {
let mut buf = Vec::with_capacity(16);
buf.write_u32::<LittleEndian>(0)?; buf.extend_from_slice(&BlockAddrStore::build(&[])?);
buf.write_u32::<LittleEndian>(0)?; buf.write_u16::<LittleEndian>(RESTART_INTERVAL as u16)?;
return Ok(buf);
}
let addrs: Vec<BlockAddr> = entries.iter().map(|(_, addr)| *addr).collect();
let addr_bytes = BlockAddrStore::build(&addrs)?;
let mut keys_buf = Vec::new();
let mut prev_key: Vec<u8> = Vec::new();
let mut restart_offsets: Vec<u32> = Vec::new();
for (i, (key, _)) in entries.iter().enumerate() {
let is_restart = i % RESTART_INTERVAL == 0;
if is_restart {
restart_offsets.push(keys_buf.len() as u32);
write_vint(&mut keys_buf, 0)?;
write_vint(&mut keys_buf, key.len() as u64)?;
keys_buf.extend_from_slice(key);
} else {
let prefix_len = common_prefix_len(&prev_key, key);
let suffix = &key[prefix_len..];
write_vint(&mut keys_buf, prefix_len as u64)?;
write_vint(&mut keys_buf, suffix.len() as u64)?;
keys_buf.extend_from_slice(suffix);
}
prev_key.clear();
prev_key.extend_from_slice(key);
}
let restart_count = restart_offsets.len();
let mut result =
Vec::with_capacity(4 + addr_bytes.len() + keys_buf.len() + restart_count * 4 + 6);
result.write_u32::<LittleEndian>(entries.len() as u32)?;
result.extend_from_slice(&addr_bytes);
result.extend_from_slice(&keys_buf);
for &off in &restart_offsets {
result.write_u32::<LittleEndian>(off)?;
}
result.write_u32::<LittleEndian>(restart_count as u32)?;
result.write_u16::<LittleEndian>(RESTART_INTERVAL as u16)?;
Ok(result)
}
pub fn load(data: OwnedBytes) -> io::Result<Self> {
if data.len() < 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"MmapBlockIndex data too short",
));
}
let num_blocks = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
let addr_data_start = 4;
let remaining = data.slice(addr_data_start..data.len());
let block_addrs = BlockAddrStore::load(remaining.clone())?;
let bits_per_entry = block_addrs.offset_bits as usize + block_addrs.length_bits as usize;
let total_bits = bits_per_entry * num_blocks as usize;
let addr_packed_size = total_bits.div_ceil(8);
let keys_offset = addr_data_start + 6 + addr_packed_size;
if data.len() < keys_offset + 6 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"MmapBlockIndex missing restart footer",
));
}
let footer_start = data.len() - 6;
let restart_count = u32::from_le_bytes([
data[footer_start],
data[footer_start + 1],
data[footer_start + 2],
data[footer_start + 3],
]) as usize;
let restart_interval =
u16::from_le_bytes([data[footer_start + 4], data[footer_start + 5]]) as usize;
let restart_array_offset = footer_start - restart_count * 4;
let keys_end = restart_array_offset;
Ok(Self {
data,
num_blocks,
block_addrs,
keys_offset,
keys_end,
restart_array_offset,
restart_count,
restart_interval,
})
}
#[inline]
fn restart_offset(&self, idx: usize) -> u32 {
let pos = self.restart_array_offset + idx * 4;
u32::from_le_bytes([
self.data[pos],
self.data[pos + 1],
self.data[pos + 2],
self.data[pos + 3],
])
}
fn decode_restart_key<'a>(&self, keys_data: &'a [u8], restart_idx: usize) -> &'a [u8] {
let offset = self.restart_offset(restart_idx) as usize;
let mut reader = &keys_data[offset..];
let prefix_len = read_vint(&mut reader).unwrap_or(0) as usize;
debug_assert_eq!(prefix_len, 0, "restart point should have prefix_len=0");
let suffix_len = read_vint(&mut reader).unwrap_or(0) as usize;
&reader[..suffix_len]
}
pub fn locate(&self, target: &[u8]) -> Option<usize> {
if self.num_blocks == 0 {
return None;
}
let keys_data = &self.data.as_slice()[self.keys_offset..self.keys_end];
let mut lo = 0usize;
let mut hi = self.restart_count;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let key = self.decode_restart_key(keys_data, mid);
match key.cmp(target) {
std::cmp::Ordering::Equal => {
return Some(mid * self.restart_interval);
}
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
}
}
if lo == 0 {
}
let restart_idx = if lo > 0 { lo - 1 } else { 0 };
let start_ordinal = restart_idx * self.restart_interval;
let end_ordinal = if restart_idx + 1 < self.restart_count {
(restart_idx + 1) * self.restart_interval
} else {
self.num_blocks as usize
};
let scan_offset = self.restart_offset(restart_idx) as usize;
let mut reader = &keys_data[scan_offset..];
let mut current_key = Vec::new();
let mut last_le_block: Option<usize> = None;
for i in start_ordinal..end_ordinal {
let prefix_len = match read_vint(&mut reader) {
Ok(v) => v as usize,
Err(_) => break,
};
let suffix_len = match read_vint(&mut reader) {
Ok(v) => v as usize,
Err(_) => break,
};
current_key.truncate(prefix_len);
if suffix_len > reader.len() {
break;
}
current_key.extend_from_slice(&reader[..suffix_len]);
reader = &reader[suffix_len..];
match current_key.as_slice().cmp(target) {
std::cmp::Ordering::Equal => return Some(i),
std::cmp::Ordering::Less => last_le_block = Some(i),
std::cmp::Ordering::Greater => return last_le_block,
}
}
last_le_block
}
pub fn get_addr(&self, ordinal: usize) -> Option<BlockAddr> {
self.block_addrs.get(ordinal)
}
pub fn len(&self) -> usize {
self.num_blocks as usize
}
pub fn is_empty(&self) -> bool {
self.num_blocks == 0
}
pub fn all_addrs(&self) -> Vec<BlockAddr> {
self.block_addrs.all()
}
pub fn all_keys(&self) -> Vec<Vec<u8>> {
let mut result = Vec::with_capacity(self.num_blocks as usize);
let keys_data = &self.data.as_slice()[self.keys_offset..self.keys_end];
let mut reader = keys_data;
let mut current_key = Vec::new();
for _ in 0..self.num_blocks {
let prefix_len = match read_vint(&mut reader) {
Ok(v) => v as usize,
Err(_) => break,
};
let suffix_len = match read_vint(&mut reader) {
Ok(v) => v as usize,
Err(_) => break,
};
current_key.truncate(prefix_len);
if suffix_len > reader.len() {
break;
}
current_key.extend_from_slice(&reader[..suffix_len]);
reader = &reader[suffix_len..];
result.push(current_key.clone());
}
result
}
}
pub enum BlockIndex {
#[cfg(feature = "fst-index")]
Fst(FstBlockIndex),
Mmap(MmapBlockIndex),
}
impl BlockIndex {
pub fn locate(&self, key: &[u8]) -> Option<usize> {
match self {
#[cfg(feature = "fst-index")]
BlockIndex::Fst(idx) => idx.locate(key),
BlockIndex::Mmap(idx) => idx.locate(key),
}
}
pub fn get_addr(&self, ordinal: usize) -> Option<BlockAddr> {
match self {
#[cfg(feature = "fst-index")]
BlockIndex::Fst(idx) => idx.get_addr(ordinal),
BlockIndex::Mmap(idx) => idx.get_addr(ordinal),
}
}
pub fn len(&self) -> usize {
match self {
#[cfg(feature = "fst-index")]
BlockIndex::Fst(idx) => idx.len(),
BlockIndex::Mmap(idx) => idx.len(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn all_addrs(&self) -> Vec<BlockAddr> {
match self {
#[cfg(feature = "fst-index")]
BlockIndex::Fst(idx) => idx.all_addrs(),
BlockIndex::Mmap(idx) => idx.all_addrs(),
}
}
}
fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
}
fn write_vint<W: Write>(writer: &mut W, mut value: u64) -> io::Result<()> {
loop {
let byte = (value & 0x7F) as u8;
value >>= 7;
if value == 0 {
writer.write_all(&[byte])?;
return Ok(());
} else {
writer.write_all(&[byte | 0x80])?;
}
}
}
fn read_vint(reader: &mut &[u8]) -> io::Result<u64> {
let mut result = 0u64;
let mut shift = 0;
loop {
if reader.is_empty() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Unexpected end of varint",
));
}
let byte = reader[0];
*reader = &reader[1..];
result |= ((byte & 0x7F) as u64) << shift;
if byte & 0x80 == 0 {
return Ok(result);
}
shift += 7;
if shift >= 64 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Varint too long",
));
}
}
}
struct BitWriter<'a> {
output: &'a mut Vec<u8>,
buffer: u64,
bits_in_buffer: u8,
}
impl<'a> BitWriter<'a> {
fn new(output: &'a mut Vec<u8>) -> Self {
Self {
output,
buffer: 0,
bits_in_buffer: 0,
}
}
fn write(&mut self, value: u64, num_bits: u8) -> io::Result<()> {
debug_assert!(num_bits <= 64);
self.buffer |= value << self.bits_in_buffer;
self.bits_in_buffer += num_bits;
while self.bits_in_buffer >= 8 {
self.output.push(self.buffer as u8);
self.buffer >>= 8;
self.bits_in_buffer -= 8;
}
Ok(())
}
fn flush(&mut self) -> io::Result<()> {
if self.bits_in_buffer > 0 {
self.output.push(self.buffer as u8);
self.buffer = 0;
self.bits_in_buffer = 0;
}
Ok(())
}
}
struct BitReader<'a> {
data: &'a [u8],
byte_pos: usize,
bit_pos: u8,
}
impl<'a> BitReader<'a> {
fn new(data: &'a [u8]) -> Self {
Self {
data,
byte_pos: 0,
bit_pos: 0,
}
}
fn read(&mut self, num_bits: u8) -> io::Result<u64> {
if num_bits == 0 {
return Ok(0);
}
let mut result: u64 = 0;
let mut bits_read: u8 = 0;
while bits_read < num_bits {
if self.byte_pos >= self.data.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Not enough bits",
));
}
let bits_available = 8 - self.bit_pos;
let bits_to_read = (num_bits - bits_read).min(bits_available);
let mask = if bits_to_read >= 8 {
0xFF
} else {
(1u8 << bits_to_read) - 1
};
let bits = (self.data[self.byte_pos] >> self.bit_pos) & mask;
result |= (bits as u64) << bits_read;
bits_read += bits_to_read;
self.bit_pos += bits_to_read;
if self.bit_pos >= 8 {
self.byte_pos += 1;
self.bit_pos = 0;
}
}
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_block_addr_store_roundtrip() {
let addrs = vec![
BlockAddr {
offset: 0,
length: 1000,
},
BlockAddr {
offset: 1000,
length: 1500,
},
BlockAddr {
offset: 2500,
length: 800,
},
BlockAddr {
offset: 3300,
length: 2000,
},
];
let bytes = BlockAddrStore::build(&addrs).unwrap();
let store = BlockAddrStore::load(OwnedBytes::new(bytes)).unwrap();
assert_eq!(store.len(), 4);
for (i, expected) in addrs.iter().enumerate() {
let actual = store.get(i).unwrap();
assert_eq!(actual.offset, expected.offset, "offset mismatch at {}", i);
assert_eq!(actual.length, expected.length, "length mismatch at {}", i);
}
}
#[test]
fn test_block_addr_store_empty() {
let bytes = BlockAddrStore::build(&[]).unwrap();
let store = BlockAddrStore::load(OwnedBytes::new(bytes)).unwrap();
assert_eq!(store.len(), 0);
assert!(store.get(0).is_none());
}
#[test]
fn test_mmap_block_index_roundtrip() {
let entries = vec![
(
b"aaa".to_vec(),
BlockAddr {
offset: 0,
length: 100,
},
),
(
b"bbb".to_vec(),
BlockAddr {
offset: 100,
length: 150,
},
),
(
b"ccc".to_vec(),
BlockAddr {
offset: 250,
length: 200,
},
),
];
let bytes = MmapBlockIndex::build(&entries).unwrap();
let index = MmapBlockIndex::load(OwnedBytes::new(bytes)).unwrap();
assert_eq!(index.len(), 3);
assert_eq!(index.locate(b"aaa"), Some(0));
assert_eq!(index.locate(b"bbb"), Some(1));
assert_eq!(index.locate(b"ccc"), Some(2));
assert_eq!(index.locate(b"aab"), Some(0)); assert_eq!(index.locate(b"ddd"), Some(2)); assert_eq!(index.locate(b"000"), None); }
#[cfg(feature = "fst-index")]
#[test]
fn test_fst_block_index_roundtrip() {
let entries = vec![
(
b"aaa".to_vec(),
BlockAddr {
offset: 0,
length: 100,
},
),
(
b"bbb".to_vec(),
BlockAddr {
offset: 100,
length: 150,
},
),
(
b"ccc".to_vec(),
BlockAddr {
offset: 250,
length: 200,
},
),
];
let bytes = FstBlockIndex::build(&entries).unwrap();
let index = FstBlockIndex::load(OwnedBytes::new(bytes)).unwrap();
assert_eq!(index.len(), 3);
assert_eq!(index.locate(b"aaa"), Some(0));
assert_eq!(index.locate(b"bbb"), Some(1));
assert_eq!(index.locate(b"ccc"), Some(2));
assert_eq!(index.locate(b"aab"), Some(0)); assert_eq!(index.locate(b"ddd"), Some(2)); }
#[test]
fn test_bit_writer_reader() {
let mut buf = Vec::new();
let mut writer = BitWriter::new(&mut buf);
writer.write(5, 3).unwrap(); writer.write(3, 2).unwrap(); writer.write(15, 4).unwrap(); writer.flush().unwrap();
let mut reader = BitReader::new(&buf);
assert_eq!(reader.read(3).unwrap(), 5);
assert_eq!(reader.read(2).unwrap(), 3);
assert_eq!(reader.read(4).unwrap(), 15);
}
}