use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek, SeekIter};
use crate::kernel::lsm::table::ss_table::block::{Block, BlockItem, Entry};
use crate::kernel::KernelResult;
use bytes::Bytes;
pub(crate) struct BlockIter<'a, T> {
block: &'a Block<T>,
entry_len: usize,
offset: usize,
buf_shared_key: &'a [u8],
}
impl<'a, T> BlockIter<'a, T>
where
T: BlockItem,
{
pub(crate) fn new(block: &'a Block<T>) -> BlockIter<'a, T> {
let buf_shared_key = block.shared_key_prefix(0, block.restart_shared_len(0));
BlockIter {
block,
entry_len: block.entry_len(),
offset: 0,
buf_shared_key,
}
}
fn item(&self) -> (Bytes, T) {
let offset = self.offset - 1;
let Entry { key, item, .. } = self.block.get_entry(offset);
let item_key = if offset % self.block.restart_interval() != 0 {
Bytes::from([self.buf_shared_key, &key[..]].concat())
} else {
key.clone()
};
(item_key, item.clone())
}
fn offset_move(&mut self, offset: usize, is_seek: bool) -> Option<(Bytes, T)> {
let block = self.block;
let restart_interval = block.restart_interval();
let old_offset = self.offset;
self.offset = offset;
(offset > 0 && offset < self.entry_len + 1)
.then(|| {
let real_offset = offset - 1;
if old_offset - 1 / restart_interval != real_offset / restart_interval {
self.buf_shared_key =
block.shared_key_prefix(real_offset, block.restart_shared_len(real_offset));
}
(!is_seek).then(|| self.item())
})
.flatten()
}
}
impl<'a, V> ForwardIter<'a> for BlockIter<'a, V>
where
V: Sync + Send + BlockItem,
{
fn try_prev(&mut self) -> KernelResult<Option<Self::Item>> {
Ok((self.is_valid() || self.offset == self.entry_len + 1)
.then(|| self.offset_move(self.offset - 1, false))
.flatten())
}
}
impl<'a, V> Iter<'a> for BlockIter<'a, V>
where
V: Sync + Send + BlockItem,
{
type Item = (Bytes, V);
fn try_next(&mut self) -> KernelResult<Option<Self::Item>> {
Ok((self.is_valid() || self.offset == 0)
.then(|| self.offset_move(self.offset + 1, false))
.flatten())
}
fn is_valid(&self) -> bool {
self.offset > 0 && self.offset <= self.entry_len
}
}
impl<'a, V> SeekIter<'a> for BlockIter<'a, V>
where
V: Sync + Send + BlockItem,
{
fn seek(&mut self, seek: Seek<'_>) -> KernelResult<()> {
match seek {
Seek::First => Some(0),
Seek::Last => Some(self.entry_len + 1),
Seek::Backward(key) => match self.block.binary_search(key) {
Ok(index) => Some(index),
Err(index) => (index < self.entry_len).then_some(index),
},
}
.and_then(|index| self.offset_move(index, true));
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek, SeekIter};
use crate::kernel::lsm::table::ss_table::block::{Block, Value, DEFAULT_DATA_RESTART_INTERVAL};
use crate::kernel::lsm::table::ss_table::block_iter::BlockIter;
use crate::kernel::KernelResult;
use bincode::Options;
use bytes::Bytes;
use std::vec;
#[test]
fn test_iterator() -> KernelResult<()> {
let data = vec![
(Bytes::from(vec![b'1']), Value::from(None)),
(
Bytes::from(vec![b'2']),
Value::from(Some(Bytes::from(vec![b'0']))),
),
(Bytes::from(vec![b'4']), Value::from(None)),
];
let block = Block::new(data, DEFAULT_DATA_RESTART_INTERVAL);
let mut iterator = BlockIter::new(&block);
assert!(!iterator.is_valid());
assert_eq!(
iterator.try_next()?,
Some((Bytes::from(vec![b'1']), Value::from(None)))
);
assert_eq!(
iterator.try_next()?,
Some((
Bytes::from(vec![b'2']),
Value::from(Some(Bytes::from(vec![b'0'])))
))
);
assert_eq!(
iterator.try_next()?,
Some((Bytes::from(vec![b'4']), Value::from(None)))
);
assert_eq!(iterator.try_next()?, None);
assert_eq!(
iterator.try_prev()?,
Some((Bytes::from(vec![b'4']), Value::from(None)))
);
assert_eq!(
iterator.try_prev()?,
Some((
Bytes::from(vec![b'2']),
Value::from(Some(Bytes::from(vec![b'0'])))
))
);
assert_eq!(
iterator.try_prev()?,
Some((Bytes::from(vec![b'1']), Value::from(None)))
);
assert_eq!(iterator.try_prev()?, None);
iterator.seek(Seek::First)?;
assert_eq!(
iterator.try_next()?,
Some((Bytes::from(vec![b'1']), Value::from(None)))
);
iterator.seek(Seek::Last)?;
assert_eq!(iterator.try_next()?, None);
iterator.seek(Seek::Backward(&[b'2']))?;
assert_eq!(
iterator.try_next()?,
Some((
Bytes::from(vec![b'2']),
Value::from(Some(Bytes::from(vec![b'0'])))
))
);
iterator.seek(Seek::Backward(&[b'3']))?;
assert_eq!(
iterator.try_next()?,
Some((Bytes::from(vec![b'4']), Value::from(None)))
);
Ok(())
}
#[test]
fn test_iterator_1000() -> KernelResult<()> {
let mut vec_data = Vec::new();
let value =
Bytes::from_static(b"What you are you do not see, what you see is your shadow.");
let times = 1000;
for i in 0..times {
let mut key = b"KipDB-".to_vec();
key.append(&mut bincode::options().with_big_endian().serialize(&i)?);
vec_data.push((Bytes::from(key), Value::from(Some(value.clone()))));
}
let block = Block::new(vec_data.clone(), DEFAULT_DATA_RESTART_INTERVAL);
tokio_test::block_on(async move {
let mut iterator = BlockIter::new(&block);
for kv in vec_data.iter().take(times) {
assert_eq!(iterator.try_next()?.unwrap(), kv.clone());
}
for i in (0..times - 1).rev() {
assert_eq!(iterator.try_prev()?.unwrap(), vec_data[i]);
}
Ok(())
})
}
}