use crate::direction::Direction;
use crate::inner::Inner;
use crate::iter::{MergeIterator, MergeQueueIter, TreeIterState};
use crate::queue::Merge;
use bytes::Bytes;
use std::collections::BTreeMap;
use std::sync::Arc;
pub struct Cursor<'a> {
database: &'a Arc<Inner>,
writeset: &'a BTreeMap<Bytes, Option<Bytes>>,
beg: Bytes,
end: Bytes,
version: u64,
merge_sources: Vec<Arc<Merge>>,
direction: Direction,
current: Option<(Bytes, Option<Bytes>)>,
positioned: bool,
iter: Option<MergeIterator<'a>>,
}
impl<'a> Cursor<'a> {
pub(crate) fn new(
database: &'a Arc<Inner>,
writeset: &'a BTreeMap<Bytes, Option<Bytes>>,
beg: Bytes,
end: Bytes,
version: u64,
) -> Self {
let merge_sources: Vec<Arc<Merge>> = database
.transaction_merge_queue
.range(..=version)
.rev()
.map(|e| e.value().clone())
.collect();
Cursor {
database,
writeset,
beg,
end,
version,
merge_sources,
direction: Direction::Forward,
current: None,
positioned: false,
iter: None,
}
}
#[inline]
pub fn valid(&self) -> bool {
self.current.is_some()
}
#[inline]
pub fn key(&self) -> Option<&Bytes> {
self.current.as_ref().map(|(k, _)| k)
}
#[inline]
pub fn value(&self) -> Option<&Bytes> {
self.current.as_ref().and_then(|(_, v)| v.as_ref())
}
#[inline]
pub fn exists(&self) -> bool {
self.current.as_ref().map(|(_, v)| v.is_some()).unwrap_or(false)
}
pub fn seek_to_first(&mut self) {
self.direction = Direction::Forward;
self.positioned = true;
let beg = self.beg.clone();
let end = self.end.clone();
self.create_iterator(&beg, &end, Direction::Forward);
self.advance_current();
}
pub fn seek_to_last(&mut self) {
self.direction = Direction::Reverse;
self.positioned = true;
let beg = self.beg.clone();
let end = self.end.clone();
self.create_iterator(&beg, &end, Direction::Reverse);
self.advance_current();
}
pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
self.direction = Direction::Forward;
self.positioned = true;
let key_bytes = Bytes::copy_from_slice(key.as_ref());
let seek_key = if key_bytes < self.beg {
self.beg.clone()
} else if key_bytes >= self.end {
self.current = None;
self.iter = None;
return;
} else {
key_bytes
};
let end = self.end.clone();
self.create_iterator(&seek_key, &end, Direction::Forward);
self.advance_current();
}
pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
self.direction = Direction::Reverse;
self.positioned = true;
let key_bytes = Bytes::copy_from_slice(key.as_ref());
let seek_key = if key_bytes >= self.end {
self.end.clone()
} else if key_bytes < self.beg {
self.current = None;
self.iter = None;
return;
} else {
key_bytes
};
let beg = self.beg.clone();
self.create_iterator(&beg, &seek_key, Direction::Reverse);
self.advance_current();
}
pub fn next(&mut self) {
if !self.positioned {
self.seek_to_first();
return;
}
if matches!(self.direction, Direction::Reverse) {
self.direction = Direction::Forward;
if let Some((ref key, _)) = self.current {
let next_start = next_key(key);
if next_start >= self.end {
self.current = None;
self.iter = None;
return;
}
let end = self.end.clone();
self.create_iterator(&next_start, &end, Direction::Forward);
} else {
self.iter = None;
}
self.advance_current();
return;
}
self.advance_current();
}
pub fn prev(&mut self) {
if !self.positioned {
self.seek_to_last();
return;
}
if matches!(self.direction, Direction::Forward) {
self.direction = Direction::Reverse;
if let Some((ref key, _)) = self.current {
if key <= &self.beg {
self.current = None;
self.iter = None;
return;
}
let beg = self.beg.clone();
let key_clone = key.clone();
self.create_iterator(&beg, &key_clone, Direction::Reverse);
} else {
self.iter = None;
}
self.advance_current();
return;
}
self.advance_current();
}
fn create_iterator(&mut self, start: &Bytes, end: &Bytes, direction: Direction) {
let join_iter = Box::new(MergeQueueIter::new(
self.merge_sources.clone(),
start.clone(),
end.clone(),
direction,
));
self.iter = Some(MergeIterator::new(
TreeIterState::build(&self.database.datastore, start, end, direction),
join_iter,
self.writeset.range::<Bytes, _>(start..end),
direction,
self.version,
0,
));
}
#[inline]
fn advance_current(&mut self) {
self.current = match &mut self.iter {
Some(iter) => iter.next(),
None => None,
};
}
}
pub struct KeyIterator<'a> {
cursor: Cursor<'a>,
reverse: bool,
}
impl<'a> KeyIterator<'a> {
pub(crate) fn new(cursor: Cursor<'a>) -> Self {
let mut iter = KeyIterator {
cursor,
reverse: false,
};
iter.cursor.seek_to_first();
iter
}
pub(crate) fn new_reverse(cursor: Cursor<'a>) -> Self {
let mut iter = KeyIterator {
cursor,
reverse: true,
};
iter.cursor.seek_to_last();
iter
}
}
impl<'a> Iterator for KeyIterator<'a> {
type Item = Bytes;
fn next(&mut self) -> Option<Self::Item> {
while self.cursor.valid() {
if self.cursor.exists() {
let key = self.cursor.key()?.clone();
if self.reverse {
self.cursor.prev();
} else {
self.cursor.next();
}
return Some(key);
}
if self.reverse {
self.cursor.prev();
} else {
self.cursor.next();
}
}
None
}
}
impl<'a> DoubleEndedIterator for KeyIterator<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
let was_reverse = self.reverse;
self.reverse = !was_reverse;
if was_reverse {
if !self.cursor.positioned {
self.cursor.seek_to_first();
}
} else {
if !self.cursor.positioned {
self.cursor.seek_to_last();
}
}
let result = self.next();
self.reverse = was_reverse;
result
}
}
pub struct ScanIterator<'a> {
cursor: Cursor<'a>,
reverse: bool,
}
impl<'a> ScanIterator<'a> {
pub(crate) fn new(cursor: Cursor<'a>) -> Self {
let mut iter = ScanIterator {
cursor,
reverse: false,
};
iter.cursor.seek_to_first();
iter
}
pub(crate) fn new_reverse(cursor: Cursor<'a>) -> Self {
let mut iter = ScanIterator {
cursor,
reverse: true,
};
iter.cursor.seek_to_last();
iter
}
}
impl<'a> Iterator for ScanIterator<'a> {
type Item = (Bytes, Bytes);
fn next(&mut self) -> Option<Self::Item> {
while self.cursor.valid() {
if let Some(value) = self.cursor.value() {
let key = self.cursor.key()?.clone();
let value = value.clone();
if self.reverse {
self.cursor.prev();
} else {
self.cursor.next();
}
return Some((key, value));
}
if self.reverse {
self.cursor.prev();
} else {
self.cursor.next();
}
}
None
}
}
impl<'a> DoubleEndedIterator for ScanIterator<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
let was_reverse = self.reverse;
self.reverse = !was_reverse;
if was_reverse {
if !self.cursor.positioned {
self.cursor.seek_to_first();
}
} else {
if !self.cursor.positioned {
self.cursor.seek_to_last();
}
}
let result = self.next();
self.reverse = was_reverse;
result
}
}
fn next_key(key: &Bytes) -> Bytes {
let mut next = key.to_vec();
next.push(0);
Bytes::from(next)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_next_key() {
let key = Bytes::from_static(b"hello");
let next = next_key(&key);
assert_eq!(next.as_ref(), b"hello\0");
assert!(next > key);
}
}