use super::binary_index::Reader as BinaryIndexReader;
use crate::{
table::{block::Trailer, Block},
SeqNo, Slice,
};
use byteorder::{LittleEndian, ReadBytesExt};
use std::{io::Cursor, marker::PhantomData};
pub trait ParsedItem<M> {
fn compare_key(&self, needle: &[u8], bytes: &[u8]) -> std::cmp::Ordering;
fn key_offset(&self) -> usize;
fn materialize(&self, bytes: &Slice) -> M;
}
pub trait Decodable<ParsedItem> {
fn parse_restart_key<'a>(
reader: &mut Cursor<&[u8]>,
offset: usize,
data: &'a [u8],
) -> Option<(&'a [u8], SeqNo)>;
fn parse_full(reader: &mut Cursor<&[u8]>, offset: usize) -> Option<ParsedItem>;
fn parse_truncated(
reader: &mut Cursor<&[u8]>,
offset: usize,
base_key_offset: usize,
) -> Option<ParsedItem>;
}
#[derive(Debug)]
struct LoScanner {
offset: usize,
remaining_in_interval: usize,
base_key_offset: Option<usize>,
}
#[derive(Debug)]
struct HiScanner {
offset: usize,
ptr_idx: usize,
stack: Vec<usize>, base_key_offset: Option<usize>,
}
pub struct Decoder<'a, Item: Decodable<Parsed>, Parsed: ParsedItem<Item>> {
block: &'a Block,
phantom: PhantomData<(Item, Parsed)>,
lo_scanner: LoScanner,
hi_scanner: HiScanner,
restart_interval: u8,
binary_index_step_size: u8,
binary_index_offset: u32,
binary_index_len: u32,
}
impl<'a, Item: Decodable<Parsed>, Parsed: ParsedItem<Item>> Decoder<'a, Item, Parsed> {
#[must_use]
pub fn new(block: &'a Block) -> Self {
let trailer = Trailer::new(block);
let mut reader = trailer.as_slice();
let restart_interval = unwrap!(reader.read_u8());
let binary_index_step_size = unwrap!(reader.read_u8());
debug_assert!(
binary_index_step_size == 2 || binary_index_step_size == 4,
"invalid binary index step size",
);
let binary_index_len = unwrap!(reader.read_u32::<LittleEndian>());
let binary_index_offset = unwrap!(reader.read_u32::<LittleEndian>());
Self {
block,
phantom: PhantomData,
lo_scanner: LoScanner {
offset: 0,
remaining_in_interval: 0,
base_key_offset: None,
},
hi_scanner: HiScanner {
offset: 0,
ptr_idx: binary_index_len as usize,
stack: Vec::new(),
base_key_offset: None,
},
restart_interval,
binary_index_step_size,
binary_index_offset,
binary_index_len,
}
}
fn get_binary_index_reader(&self) -> BinaryIndexReader<'_> {
BinaryIndexReader::new(
&self.block.data,
self.binary_index_offset,
self.binary_index_len,
self.binary_index_step_size,
)
}
fn get_key_at(&self, pos: usize) -> (&[u8], SeqNo) {
let bytes = &self.block.data;
#[expect(
unsafe_code,
reason = "pos is always retrieved from the binary index which we consider to be trustworthy"
)]
let mut cursor = Cursor::new(unsafe { bytes.get_unchecked(pos..) });
#[expect(
clippy::expect_used,
reason = "restart key should be parsed, see reason above"
)]
Item::parse_restart_key(&mut cursor, pos, bytes).expect("should parse restart key")
}
fn partition_point<F>(&self, pred: F) -> Option<(/* offset */ usize, /* idx */ usize)>
where
F: Fn(&[u8], SeqNo) -> bool,
{
let binary_index = self.get_binary_index_reader();
debug_assert!(
binary_index.len() >= 1,
"binary index should never be empty",
);
let mut left: usize = 0;
let mut right = binary_index.len();
if right == 0 {
return None;
}
while left < right {
let mid = usize::midpoint(left, right);
let offset = binary_index.get(mid);
let (head_key, head_seqno) = self.get_key_at(offset);
if pred(head_key, head_seqno) {
left = mid + 1;
} else {
right = mid;
}
}
if left == 0 {
return Some((0, 0));
}
if left == binary_index.len() {
let idx = binary_index.len() - 1;
let offset = binary_index.get(idx);
return Some((offset, idx));
}
let offset = binary_index.get(left - 1);
Some((offset, left - 1))
}
fn partition_point_2<F>(&self, pred: F) -> Option<(/* offset */ usize, /* idx */ usize)>
where
F: Fn(&[u8], SeqNo) -> bool,
{
let binary_index = self.get_binary_index_reader();
debug_assert!(
binary_index.len() >= 1,
"binary index should never be empty",
);
let mut left: usize = 0;
let mut right = binary_index.len();
if right == 0 {
return None;
}
while left < right {
let mid = usize::midpoint(left, right);
let offset = binary_index.get(mid);
let (head_key, head_seqno) = self.get_key_at(offset);
if pred(head_key, head_seqno) {
left = mid + 1;
} else {
right = mid;
}
}
if left == binary_index.len() {
let idx = binary_index.len() - 1;
let offset = binary_index.get(idx);
return Some((offset, idx));
}
let offset = binary_index.get(left);
Some((offset, left))
}
pub fn set_lo_offset(&mut self, offset: usize) {
self.lo_scanner.offset = offset;
}
pub fn seek(&mut self, pred: impl Fn(&[u8], SeqNo) -> bool, second_partition: bool) -> bool {
let result = if second_partition {
self.partition_point_2(&pred)
} else {
self.partition_point(&pred)
};
let Some((offset, _)) = result else {
return false;
};
if second_partition && self.restart_interval == 1 && {
let (key, seqno) = self.get_key_at(offset);
pred(key, seqno)
} {
let end = self.block.data.len();
self.lo_scanner.offset = end;
self.lo_scanner.remaining_in_interval = 0;
self.lo_scanner.base_key_offset = None;
self.hi_scanner.offset = end;
self.hi_scanner.ptr_idx = usize::MAX;
self.hi_scanner.stack.clear();
self.hi_scanner.base_key_offset = Some(0);
return false;
}
self.lo_scanner.offset = offset;
true
}
pub fn seek_upper(
&mut self,
pred: impl Fn(&[u8], SeqNo) -> bool,
second_partition: bool,
) -> bool {
let result = if second_partition {
self.partition_point_2(&pred)
} else {
self.partition_point(&pred)
};
let Some((offset, idx)) = result else {
return false;
};
self.hi_scanner.offset = offset;
self.hi_scanner.ptr_idx = idx;
self.hi_scanner.stack.clear();
self.hi_scanner.base_key_offset = None;
self.fill_stack();
true
}
fn parse_current_item(
reader: &mut Cursor<&[u8]>,
offset: usize,
base_key_offset: Option<usize>,
is_restart: bool,
) -> Option<Parsed> {
if is_restart {
Item::parse_full(reader, offset)
} else {
#[expect(clippy::expect_used, reason = "we trust the is_restart flag")]
Item::parse_truncated(
reader,
offset,
base_key_offset.expect("should parse truncated item"),
)
}
}
fn fill_stack(&mut self) {
let binary_index = self.get_binary_index_reader();
{
self.hi_scanner.offset = binary_index.get(self.hi_scanner.ptr_idx);
let offset = self.hi_scanner.offset;
#[expect(
unsafe_code,
reason = "cursor is advanced by read_ operations which check for EOF, and the cursor starts at 0 - the slice is never empty"
)]
let mut reader = Cursor::new(unsafe { self.block.data.get_unchecked(offset..) });
#[expect(
clippy::cast_possible_truncation,
reason = "blocks do not even come close to 4 GiB in size"
)]
if Item::parse_full(&mut reader, offset)
.inspect(|item| {
self.hi_scanner.offset += reader.position() as usize;
self.hi_scanner.base_key_offset = Some(item.key_offset());
})
.is_some()
{
self.hi_scanner.stack.push(offset);
}
}
for _ in 1..self.restart_interval {
let offset = self.hi_scanner.offset;
#[expect(
unsafe_code,
reason = "cursor is advanced by read_ operations which check for EOF, and the cursor starts at 0 - the slice is never empty"
)]
let mut reader = Cursor::new(unsafe { self.block.data.get_unchecked(offset..) });
#[expect(clippy::expect_used, reason = "base key offset is expected to exist")]
#[expect(
clippy::cast_possible_truncation,
reason = "blocks do not even come close to 4 GiB in size"
)]
if Item::parse_truncated(
&mut reader,
offset,
self.hi_scanner.base_key_offset.expect("should exist"),
)
.inspect(|_| {
self.hi_scanner.offset += reader.position() as usize;
})
.is_some()
{
self.hi_scanner.stack.push(offset);
} else {
break;
}
}
}
fn consume_stack_top(&mut self) -> Option<Parsed> {
let offset = self.hi_scanner.stack.pop()?;
if self.lo_scanner.offset > 0 && offset < self.lo_scanner.offset {
return None;
}
self.hi_scanner.offset = offset;
let is_restart = self.hi_scanner.stack.is_empty();
#[expect(
unsafe_code,
reason = "cursor is advanced by read_ operations which check for EOF, and the cursor starts at 0 - the slice is never empty"
)]
let mut reader = Cursor::new(unsafe { self.block.data.get_unchecked(offset..) });
Self::parse_current_item(
&mut reader,
offset,
self.hi_scanner.base_key_offset,
is_restart,
)
}
}
impl<Item: Decodable<Parsed>, Parsed: ParsedItem<Item>> Iterator for Decoder<'_, Item, Parsed> {
type Item = Parsed;
fn next(&mut self) -> Option<Self::Item> {
if self.hi_scanner.base_key_offset.is_some()
&& self.lo_scanner.offset >= self.hi_scanner.offset
{
return None;
}
let is_restart: bool = self.lo_scanner.remaining_in_interval == 0;
#[expect(
unsafe_code,
reason = "cursor is advanced by read_ operations which check for EOF, and the cursor starts at 0 - the slice is never empty"
)]
let mut reader =
Cursor::new(unsafe { self.block.data.get_unchecked(self.lo_scanner.offset..) });
#[expect(
clippy::cast_possible_truncation,
reason = "blocks do not even come close to 4 GiB in size"
)]
let item = Self::parse_current_item(
&mut reader,
self.lo_scanner.offset,
self.lo_scanner.base_key_offset,
is_restart,
)
.inspect(|item| {
self.lo_scanner.offset += reader.position() as usize;
if is_restart {
self.lo_scanner.base_key_offset = Some(item.key_offset());
}
});
if is_restart {
self.lo_scanner.remaining_in_interval = usize::from(self.restart_interval) - 1;
} else {
self.lo_scanner.remaining_in_interval -= 1;
}
item
}
}
impl<Item: Decodable<Parsed>, Parsed: ParsedItem<Item>> DoubleEndedIterator
for Decoder<'_, Item, Parsed>
{
fn next_back(&mut self) -> Option<Self::Item> {
if let Some(top) = self.consume_stack_top() {
return Some(top);
}
if self.hi_scanner.ptr_idx == usize::MAX {
return None;
}
self.hi_scanner.ptr_idx = self.hi_scanner.ptr_idx.wrapping_sub(1);
if self.hi_scanner.ptr_idx == usize::MAX {
return None;
}
self.fill_stack();
self.consume_stack_top()
}
}