use crate::kernel::lsm::compactor::LEVEL_0;
use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter};
use crate::kernel::lsm::mem_table::KeyValue;
use crate::kernel::lsm::version::Version;
use crate::kernel::KernelResult;
use crate::KernelError;
const LEVEL_0_SEEK_MESSAGE: &str = "level 0 cannot seek";
pub(crate) struct LevelIter<'a> {
version: &'a Version,
level: usize,
level_len: usize,
offset: usize,
child_iter: Box<dyn SeekIter<'a, Item = KeyValue> + 'a + Sync + Send>,
}
impl<'a> LevelIter<'a> {
#[allow(dead_code)]
pub(crate) fn new(version: &'a Version, level: usize) -> KernelResult<LevelIter<'a>> {
let table = version.table(level, 0).ok_or(KernelError::DataEmpty)?;
let child_iter = table.iter()?;
let level_len = version.level_len(level);
Ok(Self {
version,
level,
level_len,
offset: 0,
child_iter,
})
}
fn child_iter_seek(&mut self, seek: Seek<'_>, offset: usize) -> KernelResult<()> {
self.offset = offset;
if self.is_valid() {
if let Some(table) = self.version.table(self.level, offset) {
self.child_iter = table.iter()?;
self.child_iter.seek(seek)?;
}
}
Ok(())
}
fn seek_ward(&mut self, key: &[u8], seek: Seek<'_>) -> KernelResult<()> {
let level = self.level;
if level == LEVEL_0 {
return Err(KernelError::NotSupport(LEVEL_0_SEEK_MESSAGE));
}
self.child_iter_seek(seek, self.version.query_meet_index(key, level))
}
}
impl<'a> Iter<'a> for LevelIter<'a> {
type Item = KeyValue;
fn try_next(&mut self) -> KernelResult<Option<Self::Item>> {
match self.child_iter.try_next()? {
None => {
self.child_iter_seek(Seek::First, self.offset + 1)?;
self.child_iter.try_next()
}
Some(item) => Ok(Some(item)),
}
}
fn is_valid(&self) -> bool {
self.offset < self.level_len
}
}
impl<'a> SeekIter<'a> for LevelIter<'a> {
fn seek(&mut self, seek: Seek<'_>) -> KernelResult<()> {
match seek {
Seek::First => self.child_iter_seek(Seek::First, 0),
Seek::Last => self.child_iter_seek(Seek::Last, self.level_len - 1),
Seek::Backward(key) => self.seek_ward(key, seek),
}
}
}
#[cfg(test)]
mod tests {
use crate::kernel::io::IoType;
use crate::kernel::lsm::iterator::level_iter::LevelIter;
use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter};
use crate::kernel::lsm::log::LogLoader;
use crate::kernel::lsm::mem_table::DEFAULT_WAL_PATH;
use crate::kernel::lsm::storage::Config;
use crate::kernel::lsm::table::meta::TableMeta;
use crate::kernel::lsm::table::TableType;
use crate::kernel::lsm::version::edit::VersionEdit;
use crate::kernel::lsm::version::status::VersionStatus;
use crate::kernel::KernelResult;
use bincode::Options;
use bytes::Bytes;
use tempfile::TempDir;
#[test]
fn test_iterator() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
tokio_test::block_on(async move {
let config = Config::new(temp_dir.into_path());
let (wal, _) = LogLoader::reload(
config.path(),
(DEFAULT_WAL_PATH, Some(1)),
IoType::Direct,
&mut vec![0],
|_, _| Ok(()),
)?;
let ver_status = VersionStatus::load_with_path(config.clone(), wal.clone())?;
let value =
Bytes::from_static(b"What you are you do not see, what you see is your shadow.");
let mut vec_data = Vec::new();
let times = 4000;
for i in 0..times {
let mut key = b"KipDB-".to_vec();
key.append(&mut bincode::options().with_big_endian().serialize(&i)?);
vec_data.push((Bytes::from(key), Some(value.clone())));
}
let (slice_1, slice_2) = vec_data.split_at(2000);
let (scope_1, meta_1) = ver_status
.loader()
.create(1, slice_1.to_vec(), 1, TableType::SortedString)
.await?;
let (scope_2, meta_2) = ver_status
.loader()
.create(2, slice_2.to_vec(), 1, TableType::BTree)
.await?;
let fusion_meta = TableMeta::fusion(&[meta_1, meta_2]);
let vec_edit = vec![
VersionEdit::NewFile(
(vec![scope_1.clone()], 0),
0,
TableMeta {
size_of_disk: 0,
len: 0,
},
),
VersionEdit::NewFile((vec![scope_1, scope_2], 1), 0, fusion_meta),
];
ver_status.log_and_apply(vec_edit, 10).await?;
let version = ver_status.current().await;
let mut iterator = LevelIter::new(&version, 1)?;
for kv in vec_data.iter().take(times) {
assert_eq!(iterator.try_next()?.unwrap(), kv.clone());
}
iterator.seek(Seek::Backward(&vec_data[114].0))?;
assert_eq!(iterator.try_next()?.unwrap(), vec_data[114]);
iterator.seek(Seek::Backward(&vec_data[2048].0))?;
assert_eq!(iterator.try_next()?.unwrap(), vec_data[2048]);
iterator.seek(Seek::First)?;
assert_eq!(iterator.try_next()?.unwrap(), vec_data[0]);
iterator.seek(Seek::Last)?;
assert_eq!(iterator.try_next()?, None);
let mut iterator_level_0 = LevelIter::new(&version, 0)?;
assert!(iterator_level_0
.seek(Seek::Backward(&vec_data[3333].0))
.is_err());
Ok(())
})
}
}