use crate::memtable::Entry;
use crate::range_merge::{KWayMergeIterator, KWayMergeIteratorRev};
use crate::sstable::SSTableRangeIterator;
use crate::MergeOperator;
use bytes::Bytes;
use std::sync::Arc;
pub type RangeItem = (Bytes, Bytes);
struct SSTableRangeAdapter<I> {
inner: I,
}
impl<I> SSTableRangeAdapter<I> {
const fn new(inner: I) -> Self {
Self { inner }
}
}
impl<I> Iterator for SSTableRangeAdapter<I>
where
I: Iterator<Item = crate::sstable::Result<(Bytes, Entry)>>,
{
type Item = Result<(Bytes, Entry), Box<dyn std::error::Error + Send + Sync>>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|result| result.map_err(Into::into))
}
}
pub struct RangeIterator {
inner: KWayMergeIterator<
Box<dyn Iterator<Item = Result<(Bytes, Entry), Box<dyn std::error::Error + Send + Sync>>>>,
>,
}
impl RangeIterator {
pub fn new(
start_key: &[u8],
end_key: Option<&[u8]>,
memtables: &[&crate::memtable::Memtable],
sstable_iters: Vec<SSTableRangeIterator>,
merge_operator: Option<Arc<dyn MergeOperator>>,
) -> crate::db::Result<Self> {
let mut iterators: Vec<
Box<
dyn Iterator<
Item = Result<(Bytes, Entry), Box<dyn std::error::Error + Send + Sync>>,
>,
>,
> = Vec::new();
for memtable in memtables {
let partition_entries: Vec<(Bytes, Entry)> = if let Some(end_key) = end_key {
memtable.range(start_key, end_key).collect()
} else {
memtable.range_from(start_key).collect()
};
let partition_iter: Box<
dyn Iterator<
Item = Result<(Bytes, Entry), Box<dyn std::error::Error + Send + Sync>>,
>,
> = Box::new(partition_entries.into_iter().map(Ok));
iterators.push(partition_iter);
}
for sst_iter in sstable_iters {
let adapted: Box<
dyn Iterator<
Item = Result<(Bytes, Entry), Box<dyn std::error::Error + Send + Sync>>,
>,
> = Box::new(SSTableRangeAdapter::new(sst_iter));
iterators.push(adapted);
}
let merge =
KWayMergeIterator::new(iterators, merge_operator).map_err(std::io::Error::other)?;
Ok(Self { inner: merge })
}
}
impl Iterator for RangeIterator {
type Item = Result<RangeItem, Box<dyn std::error::Error>>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
let result = self.inner.next().map(|res| {
res.map(|(key, entry)| match entry {
Entry::Value(val) => (key, val),
Entry::Merge { base, operands } => {
(
key,
base.unwrap_or_else(|| operands.last().cloned().unwrap_or_default()),
)
}
Entry::Tombstone => (key, Bytes::new()), })
.map_err(|e| e as Box<dyn std::error::Error>)
});
result
}
}
pub struct RangeIteratorRev {
inner: KWayMergeIteratorRev<
Box<dyn Iterator<Item = Result<(Bytes, Entry), Box<dyn std::error::Error + Send + Sync>>>>,
>,
}
impl RangeIteratorRev {
pub fn new(
start_key: &[u8],
end_key: Option<&[u8]>,
memtables: &[&crate::memtable::Memtable],
sstable_iters: Vec<Box<dyn Iterator<Item = crate::sstable::Result<(Bytes, Entry)>>>>,
merge_operator: Option<Arc<dyn MergeOperator>>,
) -> crate::db::Result<Self> {
let mut iterators: Vec<
Box<
dyn Iterator<
Item = Result<(Bytes, Entry), Box<dyn std::error::Error + Send + Sync>>,
>,
>,
> = Vec::new();
for memtable in memtables {
let partition_entries: Vec<(Bytes, Entry)> = if let Some(end_key) = end_key {
memtable.range_rev(start_key, end_key).collect()
} else {
memtable
.range_from(start_key)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect()
};
let partition_iter: Box<
dyn Iterator<
Item = Result<(Bytes, Entry), Box<dyn std::error::Error + Send + Sync>>,
>,
> = Box::new(partition_entries.into_iter().map(Ok));
iterators.push(partition_iter);
}
for sst_iter in sstable_iters {
let adapted: Box<
dyn Iterator<
Item = Result<(Bytes, Entry), Box<dyn std::error::Error + Send + Sync>>,
>,
> = Box::new(SSTableRangeAdapter::new(sst_iter));
iterators.push(adapted);
}
let merge =
KWayMergeIteratorRev::new(iterators, merge_operator).map_err(std::io::Error::other)?;
Ok(Self { inner: merge })
}
}
impl Iterator for RangeIteratorRev {
type Item = Result<RangeItem, Box<dyn std::error::Error>>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|res| {
res.map(|(key, entry)| match entry {
Entry::Value(val) => (key, val),
Entry::Merge { base, operands } => {
(
key,
base.unwrap_or_else(|| operands.last().cloned().unwrap_or_default()),
)
}
Entry::Tombstone => (key, Bytes::new()),
})
.map_err(|e| e as Box<dyn std::error::Error>)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memtable::Memtable;
#[test]
fn test_range_iterator_empty() {
let memtable = Memtable::new(1024 * 1024);
let memtables = [&memtable];
let range_iter = RangeIterator::new(b"start", None, &memtables, vec![], None).unwrap();
assert_eq!(range_iter.count(), 0);
}
#[test]
fn test_range_iterator_memtable_only() {
let memtable = Memtable::new(1024 * 1024);
memtable.put(Bytes::from("key1"), Bytes::from("value1"), 1);
memtable.put(Bytes::from("key2"), Bytes::from("value2"), 2);
memtable.put(Bytes::from("key3"), Bytes::from("value3"), 3);
let memtables = [&memtable];
let mut range_iter =
RangeIterator::new(b"key1", Some(b"key3"), &memtables, vec![], None).unwrap();
let mut results = vec![];
while let Some(Ok((key, value))) = range_iter.next() {
results.push((key, value));
}
assert_eq!(results.len(), 2);
assert_eq!(results[0], (Bytes::from("key1"), Bytes::from("value1")));
assert_eq!(results[1], (Bytes::from("key2"), Bytes::from("value2")));
}
#[test]
fn test_range_iterator_tombstone() {
let memtable = Memtable::new(1024 * 1024);
memtable.put(Bytes::from("key1"), Bytes::from("value1"), 1);
memtable.delete(Bytes::from("key2"), 2); memtable.put(Bytes::from("key3"), Bytes::from("value3"), 3);
let memtables = [&memtable];
let mut range_iter = RangeIterator::new(b"key1", None, &memtables, vec![], None).unwrap();
let mut results = vec![];
while let Some(Ok((key, value))) = range_iter.next() {
results.push((key, value));
}
assert_eq!(results.len(), 2);
assert_eq!(results[0], (Bytes::from("key1"), Bytes::from("value1")));
assert_eq!(results[1], (Bytes::from("key3"), Bytes::from("value3")));
}
}