use std::sync::Arc;
use rand::distributions::{Distribution, Uniform};
use rand::rngs::StdRng;
use rand::SeedableRng;
use crate::compaction::{CompactionWorker, TaskKind};
use crate::config::ITERATION_READ_BYTES_PERIOD;
use crate::db::PortableDatabaseState;
use crate::errors::RainDBError;
use crate::key::{InternalKey, RainDbKeyType};
use crate::versioning::file_iterators::MergingIterator;
use crate::{Operation, DB};
pub trait RainDbIterator
where
Self::Key: RainDbKeyType,
{
type Key;
type Error;
fn is_valid(&self) -> bool;
fn seek(&mut self, target: &Self::Key) -> Result<(), Self::Error>;
fn seek_to_first(&mut self) -> Result<(), Self::Error>;
fn seek_to_last(&mut self) -> Result<(), Self::Error>;
fn next(&mut self) -> Option<(&Self::Key, &Vec<u8>)>;
fn prev(&mut self) -> Option<(&Self::Key, &Vec<u8>)>;
fn current(&self) -> Option<(&Self::Key, &Vec<u8>)>;
}
pub(crate) struct CachingIterator {
iterator: Box<dyn RainDbIterator<Key = InternalKey, Error = RainDBError>>,
is_valid: bool,
cached_entry: Option<(InternalKey, Vec<u8>)>,
}
impl CachingIterator {
pub(crate) fn new(
iterator: Box<dyn RainDbIterator<Key = InternalKey, Error = RainDBError>>,
) -> Self {
let is_valid = iterator.is_valid();
let current_entry = iterator
.current()
.map(|(key, value)| (key.clone(), value.clone()));
Self {
iterator,
is_valid,
cached_entry: current_entry,
}
}
}
impl CachingIterator {
fn update_cached_values(&mut self) {
self.is_valid = self.iterator.is_valid();
if self.is_valid {
self.cached_entry = self
.iterator
.current()
.map(|(key, value)| (key.clone(), value.clone()));
}
}
}
impl RainDbIterator for CachingIterator {
type Key = InternalKey;
type Error = RainDBError;
fn is_valid(&self) -> bool {
self.is_valid
}
fn seek(&mut self, target: &Self::Key) -> Result<(), Self::Error> {
self.iterator.seek(target)?;
self.update_cached_values();
Ok(())
}
fn seek_to_first(&mut self) -> Result<(), Self::Error> {
self.iterator.seek_to_first()?;
self.update_cached_values();
Ok(())
}
fn seek_to_last(&mut self) -> Result<(), Self::Error> {
self.iterator.seek_to_last()?;
self.update_cached_values();
Ok(())
}
fn next(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
self.iterator.next();
self.update_cached_values();
self.current()
}
fn prev(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
self.iterator.prev();
self.update_cached_values();
self.current()
}
fn current(&self) -> Option<(&Self::Key, &Vec<u8>)> {
self.cached_entry.as_ref().map(|entry| (&entry.0, &entry.1))
}
}
#[derive(Eq, PartialEq)]
enum DbIterationDirection {
Forward,
Backward,
}
pub struct DatabaseIterator {
db_state: PortableDatabaseState,
compaction_worker: Arc<CompactionWorker>,
direction: DbIterationDirection,
inner_iter: MergingIterator,
sequence_snapshot: u64,
is_valid: bool,
rng: StdRng,
distribution: Uniform<u64>,
bytes_until_read_sampling: usize,
cached_user_key: Option<Vec<u8>>,
cached_value: Option<Vec<u8>>,
}
impl DatabaseIterator {
pub(crate) fn new(
db_state: PortableDatabaseState,
inner_iter: MergingIterator,
sequence_snapshot: u64,
sampling_seed: u64,
compaction_worker: Arc<CompactionWorker>,
) -> Self {
Self {
db_state,
direction: DbIterationDirection::Forward,
inner_iter,
sequence_snapshot,
is_valid: false,
rng: StdRng::seed_from_u64(sampling_seed),
distribution: Uniform::from(0..(2 * ITERATION_READ_BYTES_PERIOD)),
bytes_until_read_sampling: 0,
cached_user_key: None,
cached_value: None,
compaction_worker,
}
}
}
impl DatabaseIterator {
fn sample_read_stats_for_current_key(&mut self) {
if !self.inner_iter.is_valid() {
return;
}
let bytes_read = {
let (current_key, current_val) = self.inner_iter.current().unwrap();
current_key.get_approximate_size() + current_val.len()
};
while self.bytes_until_read_sampling < bytes_read {
self.bytes_until_read_sampling += self.random_compaction_period();
let mut db_fields_guard = self.db_state.guarded_db_fields.lock();
let current_version = &mut db_fields_guard.version_set.get_current_version();
let (current_key, _current_val) = self.inner_iter.current().unwrap();
let needs_seek_compaction = current_version
.write()
.element
.record_read_sample(current_key);
if needs_seek_compaction
&& DB::should_schedule_compaction(&self.db_state, &mut db_fields_guard)
{
self.compaction_worker.schedule_task(TaskKind::Compaction);
}
}
self.bytes_until_read_sampling -= bytes_read;
}
fn random_compaction_period(&mut self) -> usize {
self.distribution.sample(&mut self.rng) as usize
}
fn find_next_client_entry(&mut self, initial_is_skipping: bool) {
assert!(self.inner_iter.is_valid());
assert!(self.direction == DbIterationDirection::Forward);
let mut is_skipping: bool = initial_is_skipping;
loop {
self.sample_read_stats_for_current_key();
let (current_key, _) = self.inner_iter.current().unwrap();
let curr_sequence_num = current_key.get_sequence_number();
let curr_operation = current_key.get_operation();
if curr_sequence_num > self.sequence_snapshot {
} else {
match curr_operation {
Operation::Delete => {
is_skipping = true;
let current_user_key =
self.inner_iter.current().unwrap().0.get_user_key().to_vec();
self.cached_user_key = Some(current_user_key);
}
Operation::Put => {
if is_skipping
&& self.cached_user_key.is_some()
&& current_key.get_user_key() <= self.cached_user_key.as_ref().unwrap()
{
} else {
self.is_valid = true;
self.cached_user_key = None;
return;
}
}
}
}
self.inner_iter.next();
if !self.inner_iter.is_valid() {
break;
}
}
self.cached_user_key = None;
self.is_valid = false;
}
fn find_prev_client_entry(&mut self) {
assert!(self.direction == DbIterationDirection::Backward);
let mut last_operation_type = Operation::Delete;
if self.inner_iter.is_valid() {
loop {
self.sample_read_stats_for_current_key();
let (current_key, _) = self.inner_iter.current().unwrap();
let curr_sequence_num = current_key.get_sequence_number();
let curr_operation = current_key.get_operation();
if curr_sequence_num > self.sequence_snapshot {
} else {
if last_operation_type != Operation::Delete
&& current_key.get_user_key() < self.cached_user_key.as_ref().unwrap()
{
break;
}
last_operation_type = curr_operation;
match curr_operation {
Operation::Delete => {
self.cached_user_key = None;
self.cached_value = None;
}
Operation::Put => {
let (current_key, current_val) = self.inner_iter.current().unwrap();
self.cached_user_key = Some(current_key.get_user_key().to_vec());
self.cached_value = Some(current_val.clone());
}
}
}
self.inner_iter.prev();
if !self.inner_iter.is_valid() {
break;
}
}
}
if last_operation_type == Operation::Delete {
self.is_valid = false;
self.cached_user_key = None;
self.cached_value = None;
self.direction = DbIterationDirection::Forward;
} else {
self.is_valid = true;
}
}
}
impl RainDbIterator for DatabaseIterator {
type Key = Vec<u8>;
type Error = RainDBError;
fn is_valid(&self) -> bool {
self.is_valid
}
fn seek(&mut self, target: &Self::Key) -> Result<(), Self::Error> {
self.direction = DbIterationDirection::Forward;
self.cached_value = None;
self.cached_user_key = Some(target.clone());
let lookup_key = InternalKey::new_for_seeking(target.clone(), self.sequence_snapshot);
self.inner_iter.seek(&lookup_key)?;
if self.inner_iter.is_valid() {
self.find_next_client_entry(false);
return Ok(());
}
self.is_valid = false;
Ok(())
}
fn seek_to_first(&mut self) -> Result<(), Self::Error> {
self.direction = DbIterationDirection::Forward;
self.cached_value = None;
self.inner_iter.seek_to_first()?;
if self.inner_iter.is_valid() {
self.find_next_client_entry(false);
return Ok(());
}
self.is_valid = false;
Ok(())
}
fn seek_to_last(&mut self) -> Result<(), Self::Error> {
self.direction = DbIterationDirection::Backward;
self.cached_value = None;
self.inner_iter.seek_to_last()?;
self.find_prev_client_entry();
Ok(())
}
fn next(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
assert!(self.is_valid);
if self.direction == DbIterationDirection::Backward {
self.direction = DbIterationDirection::Forward;
if !self.inner_iter.is_valid() {
let _seek_result = self.inner_iter.seek_to_first();
} else {
self.inner_iter.next();
}
if !self.inner_iter.is_valid() {
self.is_valid = false;
self.cached_user_key = None;
return None;
}
} else {
self.cached_user_key =
Some(self.inner_iter.current().unwrap().0.get_user_key().to_vec());
self.inner_iter.next();
if !self.inner_iter.is_valid() {
self.is_valid = false;
self.cached_user_key = None;
return None;
}
}
self.find_next_client_entry(true);
if !self.is_valid() {
return None;
}
self.current()
}
fn prev(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
assert!(self.is_valid);
if self.direction == DbIterationDirection::Forward {
self.cached_user_key =
Some(self.inner_iter.current().unwrap().0.get_user_key().to_vec());
loop {
if let Some((current_key, _)) = self.inner_iter.prev() {
if current_key.get_user_key() < self.cached_user_key.as_ref().unwrap() {
break;
}
} else {
self.is_valid = false;
self.cached_user_key = None;
self.cached_value = None;
return None;
}
}
self.direction = DbIterationDirection::Backward;
}
self.find_prev_client_entry();
if !self.is_valid() {
return None;
}
self.current()
}
fn current(&self) -> Option<(&Self::Key, &Vec<u8>)> {
assert!(self.is_valid);
match self.direction {
DbIterationDirection::Forward => {
let (curr_key, curr_val) = self.inner_iter.current().unwrap();
return Some((curr_key.get_user_key_as_vec(), curr_val));
}
DbIterationDirection::Backward => {
return Some((
self.cached_user_key.as_ref().unwrap(),
self.cached_value.as_ref().unwrap(),
));
}
}
}
}