use crate::data_file::DataFile;
use crate::error::Result;
use crate::iterator::KvIterator;
use crate::r#type::KvValue;
use bytes::Bytes;
use std::sync::Arc;
pub struct SortedRun {
files: Vec<Arc<DataFile>>,
level: u8,
}
impl SortedRun {
pub fn new(level: u8, files: Vec<Arc<DataFile>>) -> Self {
Self { files, level }
}
pub fn len(&self) -> usize {
self.files.len()
}
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
pub fn files(&self) -> &[Arc<DataFile>] {
&self.files
}
pub fn level(&self) -> u8 {
self.level
}
pub fn start_key(&self) -> Option<&[u8]> {
self.files.first().map(|f| f.start_key.as_slice())
}
pub fn end_key(&self) -> Option<&[u8]> {
self.files.last().map(|f| f.end_key.as_slice())
}
pub fn find_file(&self, target: &[u8]) -> Option<usize> {
if self.files.is_empty() {
return None;
}
let mut left = 0;
let mut right = self.files.len();
while left < right {
let mid = (left + right) / 2;
if self.files[mid].end_key.as_slice() < target {
left = mid + 1;
} else {
right = mid;
}
}
if left < self.files.len() {
Some(left)
} else {
None
}
}
pub fn iter<I, F>(&self, create_iterator: F) -> SortedRunIterator<I, F>
where
I: for<'a> KvIterator<'a>,
F: Fn(&DataFile) -> Result<I>,
{
SortedRunIterator::new(self.files.clone(), create_iterator)
}
}
pub struct SortedRunIterator<I, F>
where
I: for<'a> KvIterator<'a>,
F: Fn(&DataFile) -> Result<I>,
{
files: Vec<Arc<DataFile>>,
current_file_idx: usize,
current_iter: Option<I>,
create_iterator: F,
}
impl<I, F> SortedRunIterator<I, F>
where
I: for<'a> KvIterator<'a>,
F: Fn(&DataFile) -> Result<I>,
{
fn new(files: Vec<Arc<DataFile>>, create_iterator: F) -> Self {
Self {
files,
current_file_idx: 0,
current_iter: None,
create_iterator,
}
}
fn load_file(&mut self, idx: usize) -> Result<bool> {
if idx >= self.files.len() {
self.current_iter = None;
return Ok(false);
}
let mut iter = (self.create_iterator)(&self.files[idx])?;
iter.seek_to_first()?;
self.current_file_idx = idx;
self.current_iter = Some(iter);
Ok(true)
}
}
impl<'a, I, F> KvIterator<'a> for SortedRunIterator<I, F>
where
I: for<'b> KvIterator<'b>,
F: Fn(&DataFile) -> Result<I> + 'a,
{
fn seek(&mut self, target: &[u8]) -> Result<()> {
let mut left = 0;
let mut right = self.files.len();
while left < right {
let mid = (left + right) / 2;
if self.files[mid].end_key.as_slice() < target {
left = mid + 1;
} else {
right = mid;
}
}
let file_idx = left;
if file_idx >= self.files.len() {
self.current_iter = None;
return Ok(());
}
self.load_file(file_idx)?;
if let Some(iter) = &mut self.current_iter {
iter.seek(target)?;
if !iter.valid() && file_idx + 1 < self.files.len() {
self.load_file(file_idx + 1)?;
}
}
Ok(())
}
fn seek_to_first(&mut self) -> Result<()> {
if self.files.is_empty() {
self.current_iter = None;
return Ok(());
}
self.load_file(0)?;
Ok(())
}
fn next(&mut self) -> Result<bool> {
if let Some(iter) = &mut self.current_iter {
if iter.next()? {
return Ok(true);
}
let next_idx = self.current_file_idx + 1;
if next_idx < self.files.len() {
self.load_file(next_idx)?;
return Ok(self.current_iter.as_ref().is_some_and(|i| i.valid()));
} else {
self.current_iter = None;
return Ok(false);
}
}
Ok(false)
}
fn valid(&self) -> bool {
self.current_iter.as_ref().is_some_and(|i| i.valid())
}
fn key(&self) -> Result<Option<&[u8]>> {
if let Some(iter) = &self.current_iter {
iter.key()
} else {
Ok(None)
}
}
fn take_key(&mut self) -> Result<Option<Bytes>> {
if let Some(iter) = &mut self.current_iter {
iter.take_key()
} else {
Ok(None)
}
}
fn take_value(&mut self) -> Result<Option<KvValue>> {
if let Some(iter) = &mut self.current_iter {
iter.take_value()
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data_file::DataFileType;
use crate::iterator::mock_iterator::MockIterator;
fn create_data_file(id: u64, start: &[u8], end: &[u8]) -> Arc<DataFile> {
let bucket_range = DataFile::bucket_range_from_keys(start, end);
Arc::new(DataFile::new_detached(
DataFileType::SSTable,
start.to_vec(),
end.to_vec(),
id,
0,
0,
bucket_range.clone(),
bucket_range,
))
}
#[test]
fn test_sorted_run_basic() {
let files = vec![
create_data_file(1, b"a", b"c"),
create_data_file(2, b"d", b"f"),
create_data_file(3, b"g", b"i"),
];
let run = SortedRun::new(1, files);
assert_eq!(run.len(), 3);
assert!(!run.is_empty());
assert_eq!(run.start_key(), Some(b"a".as_slice()));
assert_eq!(run.end_key(), Some(b"i".as_slice()));
}
#[test]
fn test_sorted_run_empty() {
let run = SortedRun::new(1, vec![]);
assert_eq!(run.len(), 0);
assert!(run.is_empty());
assert_eq!(run.start_key(), None);
assert_eq!(run.end_key(), None);
}
#[test]
fn test_find_file() {
let files = vec![
create_data_file(1, b"a", b"c"),
create_data_file(2, b"d", b"f"),
create_data_file(3, b"g", b"i"),
];
let run = SortedRun::new(1, files);
assert_eq!(run.find_file(b"b"), Some(0));
assert_eq!(run.find_file(b"e"), Some(1));
assert_eq!(run.find_file(b"h"), Some(2));
assert_eq!(run.find_file(b"0"), Some(0));
assert_eq!(run.find_file(b"c"), Some(0));
assert_eq!(run.find_file(b"d"), Some(1));
assert_eq!(run.find_file(b"z"), None);
}
#[test]
fn test_sorted_run_iterator() {
let files = vec![
create_data_file(1, b"a", b"c"),
create_data_file(2, b"d", b"f"),
];
let run = SortedRun::new(1, files);
let create_iter = |file: &DataFile| -> Result<MockIterator> {
let entries = match file.file_id {
1 => vec![(b"a".as_slice(), b"v1"), (b"b", b"v2"), (b"c", b"v3")],
2 => vec![(b"d".as_slice(), b"v4"), (b"e", b"v5"), (b"f", b"v6")],
_ => vec![],
};
Ok(MockIterator::new(entries))
};
let mut iter = run.iter(create_iter);
iter.seek_to_first().unwrap();
let mut results = vec![];
while iter.valid() {
let (k, kv) = iter.take_current().unwrap().unwrap();
let v = kv.unwrap_encoded();
results.push((k, v));
iter.next().unwrap();
}
assert_eq!(results.len(), 6);
assert_eq!(results[0].0.as_ref(), b"a");
assert_eq!(results[1].0.as_ref(), b"b");
assert_eq!(results[2].0.as_ref(), b"c");
assert_eq!(results[3].0.as_ref(), b"d");
assert_eq!(results[4].0.as_ref(), b"e");
assert_eq!(results[5].0.as_ref(), b"f");
}
#[test]
fn test_sorted_run_iterator_seek() {
let files = vec![
create_data_file(1, b"a", b"c"),
create_data_file(2, b"d", b"f"),
];
let run = SortedRun::new(1, files);
let create_iter = |file: &DataFile| -> Result<MockIterator> {
let entries = match file.file_id {
1 => vec![(b"a".as_slice(), b"v1"), (b"b", b"v2"), (b"c", b"v3")],
2 => vec![(b"d".as_slice(), b"v4"), (b"e", b"v5"), (b"f", b"v6")],
_ => vec![],
};
Ok(MockIterator::new(entries))
};
let mut iter = run.iter(create_iter);
iter.seek(b"b").unwrap();
assert!(iter.valid());
assert_eq!(iter.key().unwrap().unwrap(), b"b");
iter.seek(b"e").unwrap();
assert!(iter.valid());
assert_eq!(iter.key().unwrap().unwrap(), b"e");
iter.seek(b"d").unwrap();
assert!(iter.valid());
assert_eq!(iter.key().unwrap().unwrap(), b"d");
}
}