use std::sync::Arc;
use nerdondon_hopscotch::concurrent_skiplist::{ConcurrentSkipList, SkipNode};
use crate::errors::{RainDBError, RainDBResult};
use crate::key::InternalKey;
use crate::RainDbIterator;
pub trait MemTable: Send + Sync {
fn approximate_memory_usage(&self) -> usize;
fn insert(&self, key: InternalKey, value: Vec<u8>);
fn get(&self, key: &InternalKey) -> RainDBResult<Option<&Vec<u8>>>;
fn iter(&self) -> Box<dyn RainDbIterator<Key = InternalKey, Error = RainDBError>>;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() < 1
}
}
pub(crate) struct SkipListMemTable {
store: Arc<ConcurrentSkipList<InternalKey, Vec<u8>>>,
}
impl SkipListMemTable {
pub fn new() -> Self {
Self {
store: Arc::new(ConcurrentSkipList::new(None)),
}
}
}
impl MemTable for SkipListMemTable {
fn approximate_memory_usage(&self) -> usize {
self.store.get_approx_mem_usage()
}
fn insert(&self, key: InternalKey, value: Vec<u8>) {
unsafe { self.store.insert_with_size(key, value) }
}
fn get(&self, key: &InternalKey) -> RainDBResult<Option<&Vec<u8>>> {
let mut iter = self.iter();
iter.seek(key).unwrap();
if iter.is_valid() {
let (current_key, _current_val) = iter.current().unwrap();
if current_key.get_user_key() == key.get_user_key() {
match current_key.get_operation() {
crate::Operation::Put => return Ok(self.store.get(current_key)),
crate::Operation::Delete => return Ok(None),
}
}
}
Err(RainDBError::KeyNotFound)
}
fn iter(&self) -> Box<dyn RainDbIterator<Key = InternalKey, Error = RainDBError>> {
Box::new(SkipListMemTableIter {
store: Arc::clone(&self.store),
current_entry: self.store.first_node().map(|node| {
let (key, value) = node.get_entry();
(key.clone(), value.clone())
}),
})
}
fn len(&self) -> usize {
self.store.len()
}
}
struct SkipListMemTableIter {
store: Arc<ConcurrentSkipList<InternalKey, Vec<u8>>>,
current_entry: Option<(InternalKey, Vec<u8>)>,
}
impl SkipListMemTableIter {
fn owned_entry_from_node(node: &SkipNode<InternalKey, Vec<u8>>) -> (InternalKey, Vec<u8>) {
let (key, value) = node.get_entry();
(key.clone(), value.clone())
}
}
impl RainDbIterator for SkipListMemTableIter {
type Key = InternalKey;
type Error = RainDBError;
fn is_valid(&self) -> bool {
self.current_entry.is_some()
}
fn seek(&mut self, target: &Self::Key) -> Result<(), Self::Error> {
self.current_entry = self
.store
.find_greater_or_equal_node(target)
.map(SkipListMemTableIter::owned_entry_from_node);
Ok(())
}
fn seek_to_first(&mut self) -> Result<(), Self::Error> {
self.current_entry = self
.store
.first_node()
.map(SkipListMemTableIter::owned_entry_from_node);
Ok(())
}
fn seek_to_last(&mut self) -> Result<(), Self::Error> {
self.current_entry = self
.store
.last_node()
.map(SkipListMemTableIter::owned_entry_from_node);
Ok(())
}
fn next(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
if !self.is_valid() {
return None;
}
self.current_entry = self.current_entry.take().and_then(|(key, _value)| {
self.store
.find_greater_or_equal_node(&key)
.and_then(|node| node.next())
.map(SkipListMemTableIter::owned_entry_from_node)
});
self.current()
}
fn prev(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
if !self.is_valid() {
return None;
}
let (curr_key, _) = self.current_entry.take().unwrap();
self.current_entry = self
.store
.find_less_than_node(&curr_key)
.map(SkipListMemTableIter::owned_entry_from_node);
self.current()
}
fn current(&self) -> Option<(&Self::Key, &Vec<u8>)> {
self.current_entry
.as_ref()
.map(|(key_ref, value_ref)| (key_ref, value_ref))
}
}
#[cfg(test)]
mod skiplist_memtable_tests {
use integer_encoding::FixedInt;
use pretty_assertions::assert_eq;
use crate::Operation;
use super::*;
#[test]
fn can_insert_and_retrieve_elements() {
let memtable = SkipListMemTable::new();
for idx in 1..30 {
let num: u64 = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
memtable.insert(key, u64::encode_fixed_vec(num));
}
assert_eq!(memtable.len(), 29);
for idx in 1..30 {
let num: u64 = idx + 100_000;
let key = InternalKey::new(num.to_string().as_bytes().to_vec(), idx, Operation::Put);
let actual = memtable.get(&key).unwrap().unwrap();
assert_eq!(actual, &u64::encode_fixed_vec(num));
}
memtable.insert(
InternalKey::new(
100_020_u64.to_string().as_bytes().to_vec(),
31,
Operation::Put,
),
"something new".as_bytes().to_vec(),
);
let actual = memtable
.get(&InternalKey::new_for_seeking(
100_020_u64.to_string().as_bytes().to_vec(),
100,
))
.unwrap()
.unwrap();
assert_eq!(actual, &"something new".as_bytes().to_vec());
memtable.insert(
InternalKey::new(
100_020_u64.to_string().as_bytes().to_vec(),
31,
Operation::Delete,
),
vec![],
);
let actual = memtable
.get(&InternalKey::new_for_seeking(
100_020_u64.to_string().as_bytes().to_vec(),
100,
))
.unwrap();
assert!(actual.is_none());
let actual = memtable
.get(&InternalKey::new_for_seeking(
200_000_u64.to_string().as_bytes().to_vec(),
100,
))
.err()
.unwrap();
assert_eq!(
actual.to_string(),
"The specified key could not be found in the database."
);
}
#[test]
fn can_iterate_all_elements_forward() {
let memtable = SkipListMemTable::new();
for idx in 1..30 {
let num: u64 = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
memtable.insert(key, u64::encode_fixed_vec(num));
}
let mut iter = memtable.iter();
while iter.is_valid() {
iter.next();
}
assert!(
iter.next().is_none(),
"Calling `next` after consuming all the values should not return a value"
);
}
#[test]
fn can_iterate_all_elements_backward() {
let memtable = SkipListMemTable::new();
for idx in 1..30 {
let num: u64 = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
memtable.insert(key, u64::encode_fixed_vec(num));
}
let mut iter = memtable.iter();
assert!(iter.seek_to_last().is_ok());
let (actual_key, actual_val) = iter.current().unwrap();
assert_eq!(
actual_key,
&InternalKey::new(
100_029_u64.to_string().as_bytes().to_vec(),
29,
Operation::Put
)
);
assert_eq!(actual_val, &u64::encode_fixed_vec(100_029));
while iter.is_valid() {
iter.prev();
}
assert!(
iter.prev().is_none(),
"Calling `prev` after consuming all the values should not return a value"
);
}
}