use std::cmp::Ordering;
use std::sync::Arc;
use crate::iterator::merge::SeekableIterator;
use crate::manifest::version::TableFile;
use crate::options::BlockPropertyFilter;
use crate::sst::table_reader::TableIterator;
use crate::types::{LazyValue, compare_internal_key, user_key as user_key_from_internal};
pub struct LevelIterator {
files: Vec<TableFile>,
file_index: usize,
current_iter: Option<TableIterator>,
prefix_filter: Option<Vec<u8>>,
start_hint: Option<Vec<u8>>,
end_hint: Option<Vec<u8>>,
upper_bound: Option<Vec<u8>>,
block_property_filters: Vec<Arc<dyn BlockPropertyFilter>>,
}
impl LevelIterator {
pub fn new(files: Vec<TableFile>) -> Self {
Self {
files,
file_index: 0,
current_iter: None,
prefix_filter: None,
start_hint: None,
end_hint: None,
upper_bound: None,
block_property_filters: Vec::new(),
}
}
pub fn with_prefix(mut self, prefix: Vec<u8>) -> Self {
self.prefix_filter = Some(prefix);
self
}
pub fn with_range_hints(mut self, start: Option<Vec<u8>>, end: Option<Vec<u8>>) -> Self {
self.start_hint = start;
self.end_hint = end;
self
}
pub fn with_block_filters(mut self, filters: Vec<Arc<dyn BlockPropertyFilter>>) -> Self {
self.block_property_filters = filters;
self
}
fn file_passes_filters(&self, tf: &TableFile) -> bool {
if let Some(ref start) = self.start_hint {
let largest_uk = user_key_from_internal(&tf.meta.largest_key);
if largest_uk < start.as_slice() {
return false;
}
}
if let Some(ref end) = self.end_hint {
let smallest_uk = user_key_from_internal(&tf.meta.smallest_key);
if smallest_uk >= end.as_slice() {
return false;
}
}
if let Some(ref prefix) = self.prefix_filter
&& !tf.reader.prefix_may_match(prefix)
{
return false;
}
true
}
fn seek_impl(&mut self, target: &[u8]) {
let idx = self.files.partition_point(|tf| {
compare_internal_key(&tf.meta.largest_key, target) == Ordering::Less
});
self.file_index = idx;
self.current_iter = None;
self.open_file_and_seek(Some(target));
}
fn open_file_and_seek(&mut self, target: Option<&[u8]>) {
while self.file_index < self.files.len() {
let tf = &self.files[self.file_index];
if !self.file_passes_filters(tf) {
self.file_index += 1;
continue;
}
if let Some(ref ub) = self.upper_bound {
let smallest_uk = user_key_from_internal(&tf.meta.smallest_key);
if smallest_uk >= ub.as_slice() {
self.file_index = self.files.len();
self.current_iter = None;
return;
}
}
let mut table_iter = TableIterator::new(tf.reader.clone());
if !self.block_property_filters.is_empty() {
table_iter = table_iter.with_block_filters(self.block_property_filters.clone());
}
if let Some(ref ub) = self.upper_bound {
table_iter.set_bounds(None, Some(ub));
}
if let Some(t) = target {
table_iter.seek(t);
}
self.current_iter = Some(table_iter);
return;
}
self.current_iter = None;
}
}
impl Iterator for LevelIterator {
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.current_iter.is_none() && self.file_index < self.files.len() {
self.open_file_and_seek(None);
self.current_iter.as_ref()?;
}
if let Some(ref mut iter) = self.current_iter
&& let Some(entry) = iter.next()
{
return Some(entry);
}
self.current_iter = None;
self.file_index += 1;
if self.file_index >= self.files.len() {
return None;
}
}
}
}
impl super::merge::SeekableIterator for LevelIterator {
fn seek_to(&mut self, target: &[u8]) {
self.seek_impl(target);
}
fn current(&self) -> Option<(Vec<u8>, LazyValue)> {
self.current_iter
.as_ref()
.and_then(|iter| iter.current())
.map(|(k, v)| (k, LazyValue::Inline(v)))
}
fn prev(&mut self) -> Option<(Vec<u8>, LazyValue)> {
if let Some(ref mut iter) = self.current_iter
&& let Some(entry) = iter.prev()
{
return Some((entry.0, LazyValue::Inline(entry.1)));
}
loop {
if self.file_index == 0 {
self.current_iter = None;
return None;
}
self.file_index -= 1;
let tf = &self.files[self.file_index];
if !self.file_passes_filters(tf) {
continue;
}
let mut table_iter = TableIterator::new(tf.reader.clone());
if !self.block_property_filters.is_empty() {
table_iter = table_iter.with_block_filters(self.block_property_filters.clone());
}
if let Some(ref ub) = self.upper_bound {
table_iter.set_bounds(None, Some(ub));
}
table_iter.seek_to_last();
if let Some(entry) = table_iter.current() {
self.current_iter = Some(table_iter);
return Some((entry.0, LazyValue::Inline(entry.1)));
}
}
}
fn seek_for_prev(&mut self, target: &[u8]) {
let idx = self.files.partition_point(|tf| {
compare_internal_key(&tf.meta.smallest_key, target) != Ordering::Greater
});
if idx == 0 {
self.file_index = 0;
self.current_iter = None;
return;
}
let start = idx - 1;
let min_try = 0;
for try_idx in (min_try..=start).rev() {
let tf = &self.files[try_idx];
if !self.file_passes_filters(tf) {
continue;
}
let mut table_iter = TableIterator::new(tf.reader.clone());
if !self.block_property_filters.is_empty() {
table_iter = table_iter.with_block_filters(self.block_property_filters.clone());
}
if let Some(ref ub) = self.upper_bound {
table_iter.set_bounds(None, Some(ub));
}
table_iter.seek_for_prev(target);
if table_iter.current().is_some() {
self.file_index = try_idx;
self.current_iter = Some(table_iter);
return;
}
}
self.file_index = 0;
self.current_iter = None;
}
fn seek_to_first(&mut self) {
self.file_index = 0;
self.current_iter = None;
self.open_file_and_seek(None);
}
fn seek_to_last(&mut self) {
self.current_iter = None;
for idx in (0..self.files.len()).rev() {
let tf = &self.files[idx];
if !self.file_passes_filters(tf) {
continue;
}
let mut table_iter = TableIterator::new(tf.reader.clone());
if !self.block_property_filters.is_empty() {
table_iter = table_iter.with_block_filters(self.block_property_filters.clone());
}
if let Some(ref ub) = self.upper_bound {
table_iter.set_bounds(None, Some(ub));
}
table_iter.seek_to_last();
self.file_index = idx;
self.current_iter = Some(table_iter);
return;
}
}
fn next_into(&mut self, key_buf: &mut Vec<u8>, value_buf: &mut Vec<u8>) -> bool {
loop {
if self.current_iter.is_none() && self.file_index < self.files.len() {
self.open_file_and_seek(None);
if self.current_iter.is_none() {
return false;
}
}
if let Some(ref mut iter) = self.current_iter
&& iter.next_into(key_buf, value_buf)
{
return true;
}
self.current_iter = None;
self.file_index += 1;
if self.file_index >= self.files.len() {
return false;
}
}
}
fn set_bounds(&mut self, _lower: Option<&[u8]>, upper: Option<&[u8]>) {
self.upper_bound = upper.map(|b| b.to_vec());
if let Some(ref mut iter) = self.current_iter {
iter.set_bounds(None, upper);
}
}
fn iter_error(&self) -> Option<String> {
self.current_iter.as_ref().and_then(|it| it.iter_error())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::manifest::version_edit::FileMetaData;
use crate::sst::table_builder::{TableBuildOptions, TableBuilder};
use crate::sst::table_reader::TableReader;
use crate::types::{InternalKey, ValueType};
use std::path::Path;
use std::sync::Arc;
fn build_sst(dir: &Path, file_num: u64, start: usize, end: usize) -> TableFile {
let path = dir.join(format!("{:06}.sst", file_num));
let mut builder = TableBuilder::new(
&path,
TableBuildOptions {
bloom_bits_per_key: 0,
..Default::default()
},
)
.unwrap();
let mut smallest = Vec::new();
let mut largest = Vec::new();
for i in start..end {
let uk = format!("key_{:06}", i);
let ik = InternalKey::new(uk.as_bytes(), 100, ValueType::Value);
let val = format!("value_{}", i);
if i == start {
smallest = ik.as_bytes().to_vec();
}
largest = ik.as_bytes().to_vec();
builder.add(ik.as_bytes(), val.as_bytes()).unwrap();
}
builder.finish().unwrap();
let reader = Arc::new(TableReader::open(&path).unwrap());
TableFile {
meta: FileMetaData {
number: file_num,
file_size: reader.file_size(),
smallest_key: smallest,
largest_key: largest,
has_range_deletions: false,
},
reader,
}
}
fn build_sst_with_prefix_bloom(
dir: &Path,
file_num: u64,
start: usize,
end: usize,
prefix_len: usize,
) -> TableFile {
let path = dir.join(format!("{:06}.sst", file_num));
let mut builder = TableBuilder::new(
&path,
TableBuildOptions {
bloom_bits_per_key: 10,
internal_keys: true,
prefix_len,
..Default::default()
},
)
.unwrap();
let mut smallest = Vec::new();
let mut largest = Vec::new();
for i in start..end {
let uk = format!("key_{:06}", i);
let ik = InternalKey::new(uk.as_bytes(), 100, ValueType::Value);
let val = format!("value_{}", i);
if i == start {
smallest = ik.as_bytes().to_vec();
}
largest = ik.as_bytes().to_vec();
builder.add(ik.as_bytes(), val.as_bytes()).unwrap();
}
builder.finish().unwrap();
let reader = Arc::new(TableReader::open(&path).unwrap());
TableFile {
meta: FileMetaData {
number: file_num,
file_size: reader.file_size(),
smallest_key: smallest,
largest_key: largest,
has_range_deletions: false,
},
reader,
}
}
#[test]
fn test_empty_level() {
let mut iter = LevelIterator::new(vec![]);
assert!(iter.next().is_none());
}
#[test]
fn test_single_file() {
let dir = tempfile::tempdir().unwrap();
let tf = build_sst(dir.path(), 1, 0, 10);
let mut level_iter = LevelIterator::new(vec![tf.clone()]);
let level_entries: Vec<_> = (&mut level_iter).collect();
let mut table_iter = TableIterator::new(tf.reader.clone());
let table_entries: Vec<_> = (&mut table_iter).collect();
assert_eq!(level_entries, table_entries);
assert_eq!(level_entries.len(), 10);
}
#[test]
fn test_multiple_files_concatenation() {
let dir = tempfile::tempdir().unwrap();
let files = vec![
build_sst(dir.path(), 1, 0, 10),
build_sst(dir.path(), 2, 10, 20),
build_sst(dir.path(), 3, 20, 30),
];
let mut iter = LevelIterator::new(files);
let entries: Vec<_> = (&mut iter).collect();
assert_eq!(entries.len(), 30);
for i in 1..entries.len() {
assert!(entries[i].0 > entries[i - 1].0, "not sorted at index {}", i);
}
let first_uk = user_key_from_internal(&entries[0].0);
let last_uk = user_key_from_internal(&entries[29].0);
assert_eq!(first_uk, b"key_000000");
assert_eq!(last_uk, b"key_000029");
}
#[test]
fn test_seek_to_exact_key_in_middle_file() {
let dir = tempfile::tempdir().unwrap();
let files = vec![
build_sst(dir.path(), 1, 0, 10),
build_sst(dir.path(), 2, 10, 20),
build_sst(dir.path(), 3, 20, 30),
];
let target = InternalKey::new(b"key_000015", 100, ValueType::Value);
let mut iter = LevelIterator::new(files);
iter.seek_impl(&target.into_bytes());
let entry = iter.next().unwrap();
let uk = user_key_from_internal(&entry.0);
assert_eq!(uk, b"key_000015");
}
#[test]
fn test_seek_between_files() {
let dir = tempfile::tempdir().unwrap();
let files = vec![
build_sst(dir.path(), 1, 0, 10),
build_sst(dir.path(), 2, 20, 30),
];
let target = InternalKey::new(b"key_000015", 100, ValueType::Value);
let mut iter = LevelIterator::new(files);
iter.seek_impl(&target.into_bytes());
let entry = iter.next().unwrap();
let uk = user_key_from_internal(&entry.0);
assert_eq!(uk, b"key_000020"); }
#[test]
fn test_seek_past_all_files() {
let dir = tempfile::tempdir().unwrap();
let files = vec![
build_sst(dir.path(), 1, 0, 10),
build_sst(dir.path(), 2, 10, 20),
];
let target = InternalKey::new(b"zzz", 100, ValueType::Value);
let mut iter = LevelIterator::new(files);
iter.seek_impl(&target.into_bytes());
assert!(iter.next().is_none());
}
#[test]
fn test_seek_before_all_files() {
let dir = tempfile::tempdir().unwrap();
let files = vec![
build_sst(dir.path(), 1, 10, 20),
build_sst(dir.path(), 2, 20, 30),
];
let target = InternalKey::new(b"aaa", 100, ValueType::Value);
let mut iter = LevelIterator::new(files);
iter.seek_impl(&target.into_bytes());
let entry = iter.next().unwrap();
let uk = user_key_from_internal(&entry.0);
assert_eq!(uk, b"key_000010"); }
#[test]
fn test_file_boundary_crossing() {
let dir = tempfile::tempdir().unwrap();
let files = vec![
build_sst(dir.path(), 1, 0, 3),
build_sst(dir.path(), 2, 3, 6),
];
let mut iter = LevelIterator::new(files);
let entries: Vec<_> = (&mut iter).collect();
assert_eq!(entries.len(), 6);
let uks: Vec<_> = entries
.iter()
.map(|(k, _)| user_key_from_internal(k).to_vec())
.collect();
assert_eq!(uks[2], b"key_000002");
assert_eq!(uks[3], b"key_000003"); }
#[test]
fn test_range_filter_skips_files() {
let dir = tempfile::tempdir().unwrap();
let files = vec![
build_sst(dir.path(), 1, 0, 10), build_sst(dir.path(), 2, 10, 20), build_sst(dir.path(), 3, 20, 30), ];
let mut iter = LevelIterator::new(files)
.with_range_hints(Some(b"key_000010".to_vec()), Some(b"key_000020".to_vec()));
let entries: Vec<_> = (&mut iter).collect();
assert_eq!(entries.len(), 10);
let first_uk = user_key_from_internal(&entries[0].0);
let last_uk = user_key_from_internal(&entries[9].0);
assert_eq!(first_uk, b"key_000010");
assert_eq!(last_uk, b"key_000019");
}
#[test]
fn test_prefix_bloom_filter_skips_files() {
let dir = tempfile::tempdir().unwrap();
let files = vec![
build_sst_with_prefix_bloom(dir.path(), 1, 0, 10, 5),
build_sst_with_prefix_bloom(dir.path(), 2, 10, 20, 5),
];
let mut iter = LevelIterator::new(files.clone()).with_prefix(b"key_0".to_vec());
let entries: Vec<_> = (&mut iter).collect();
assert_eq!(entries.len(), 20);
let mut iter = LevelIterator::new(files).with_prefix(b"xxxx_".to_vec());
let entries: Vec<_> = (&mut iter).collect();
assert_eq!(entries.len(), 0);
}
}