use std::ops::Deref;
use crate::encoding;
use crate::engine::Record;
use super::{SSTable, SSTableCell, SSTableDataBlock, SSTableError};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockEntry {
pub key: Vec<u8>,
pub value: Vec<u8>,
pub is_delete: bool,
pub lsn: u64,
pub timestamp: u64,
}
pub struct BlockIterator {
data: Vec<u8>,
cursor: usize,
}
impl BlockIterator {
pub fn new(data: Vec<u8>) -> Self {
Self { data, cursor: 0 }
}
pub fn seek_to_first(&mut self) {
self.cursor = 0;
}
pub fn seek_to(&mut self, search_key: &[u8]) {
self.cursor = 0;
while self.cursor < self.data.len() {
match encoding::decode_from_slice::<SSTableCell>(&self.data[self.cursor..]) {
Ok((cell, cell_len)) => {
let pos = self.cursor + cell_len;
let key_len = cell.key_len as usize;
let value_len = cell.value_len as usize;
if pos + key_len + value_len > self.data.len() {
self.cursor = self.data.len();
return;
}
let key_bytes = &self.data[pos..pos + key_len];
if key_bytes >= search_key {
return;
}
self.cursor = pos + key_len + value_len;
}
Err(e) => {
tracing::warn!(cursor = self.cursor, ?e, "decode error during seek");
self.cursor = self.data.len();
return;
}
}
}
}
pub fn next_entry(&mut self) -> Option<BlockEntry> {
if self.cursor >= self.data.len() {
return None;
}
match encoding::decode_from_slice::<SSTableCell>(&self.data[self.cursor..]) {
Ok((cell, cell_len)) => {
self.cursor += cell_len;
let key_len = cell.key_len as usize;
let value_len = cell.value_len as usize;
if self.cursor + key_len + value_len > self.data.len() {
self.cursor = self.data.len();
return None;
}
let key = self.data[self.cursor..self.cursor + key_len].to_vec();
self.cursor += key_len;
let value = self.data[self.cursor..self.cursor + value_len].to_vec();
self.cursor += value_len;
Some(BlockEntry {
key,
value,
is_delete: cell.is_delete,
lsn: cell.lsn,
timestamp: cell.timestamp,
})
}
Err(_) => {
self.cursor = self.data.len();
None
}
}
}
#[allow(dead_code)]
pub fn is_end(&self) -> bool {
self.cursor >= self.data.len()
}
}
impl Iterator for BlockIterator {
type Item = BlockEntry;
fn next(&mut self) -> Option<Self::Item> {
self.next_entry()
}
}
pub struct ScanIterator<S: Deref<Target = SSTable> = &'static SSTable> {
sstable: S,
current_block_index: usize,
current_block_iter: Option<BlockIterator>,
start_key: Vec<u8>,
end_key: Vec<u8>,
pending_range_idx: usize,
next_range: Option<Record>,
next_point: Option<Record>,
}
impl<S: Deref<Target = SSTable>> ScanIterator<S> {
pub fn new(sstable: S, start_key: Vec<u8>, end_key: Vec<u8>) -> Result<Self, SSTableError> {
if start_key >= end_key {
return Err(SSTableError::Internal("scan start >= end".to_string()));
}
let current_block_index = sstable.find_block_for_key(start_key.as_slice());
let block_iter = if current_block_index < sstable.index.len() {
let entry = &sstable.index[current_block_index];
let block_bytes = SSTable::read_block_bytes(&sstable.mmap, &entry.handle)?;
let (block, _) = encoding::decode_from_slice::<SSTableDataBlock>(&block_bytes)?;
let mut it = BlockIterator::new(block.data);
it.seek_to(start_key.as_slice());
Some(it)
} else {
None
};
Ok(Self {
sstable,
current_block_index,
current_block_iter: block_iter,
start_key,
end_key,
pending_range_idx: 0,
next_range: None,
next_point: None,
})
}
fn load_next_block(&mut self) -> Result<bool, SSTableError> {
self.current_block_index += 1;
if self.current_block_index >= self.sstable.index.len() {
self.current_block_iter = None;
return Ok(false);
}
let entry = &self.sstable.index[self.current_block_index];
let block_bytes = SSTable::read_block_bytes(&self.sstable.mmap, &entry.handle)?;
let (block, _) = encoding::decode_from_slice::<SSTableDataBlock>(&block_bytes)?;
let mut it = BlockIterator::new(block.data);
it.seek_to_first();
self.current_block_iter = Some(it);
Ok(true)
}
fn next_point_or_delete(&mut self) -> Option<Record> {
loop {
let it = self.current_block_iter.as_mut()?;
if let Some(item) = it.next_entry() {
if item.key.as_slice() >= self.end_key.as_slice() {
return None;
}
if item.is_delete {
return Some(Record::Delete {
key: item.key,
lsn: item.lsn,
timestamp: item.timestamp,
});
}
return Some(Record::Put {
key: item.key,
value: item.value,
lsn: item.lsn,
timestamp: item.timestamp,
});
}
match self.load_next_block() {
Ok(true) => {}
Ok(false) => return None,
Err(e) => {
tracing::warn!(?e, "error loading next block during scan");
return None;
}
}
}
}
fn next_range_delete(&mut self) -> Option<Record> {
while self.pending_range_idx < self.sstable.range_deletes.data.len() {
let r = &self.sstable.range_deletes.data[self.pending_range_idx];
if r.end_key.as_slice() <= self.start_key.as_slice() {
self.pending_range_idx += 1;
continue;
}
if r.start_key.as_slice() >= self.end_key.as_slice() {
return None;
}
self.pending_range_idx += 1;
return Some(Record::RangeDelete {
start: r.start_key.clone(),
end: r.end_key.clone(),
lsn: r.lsn,
timestamp: r.timestamp,
});
}
None
}
fn fill_range(&mut self) {
if self.next_range.is_none() {
self.next_range = self.next_range_delete();
}
}
fn fill_point(&mut self) {
if self.next_point.is_none() {
self.next_point = self.next_point_or_delete();
}
}
}
impl<S: Deref<Target = SSTable>> Iterator for ScanIterator<S> {
type Item = Record;
fn next(&mut self) -> Option<Self::Item> {
self.fill_range();
self.fill_point();
match (&self.next_range, &self.next_point) {
(None, None) => None,
(Some(_), None) => self.next_range.take(),
(None, Some(_)) => self.next_point.take(),
(Some(r), Some(p)) => {
if r.key()
.cmp(p.key())
.then_with(|| p.lsn().cmp(&r.lsn()))
.is_le()
{
self.next_range.take()
} else {
self.next_point.take()
}
}
}
}
}