use crate::error::{Error, Result};
use crate::storage::{Entry, Key, Value, ValuePointer};
use crossbeam::epoch::{self, Atomic, Owned, Shared};
use parking_lot::RwLock;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use tracing::{debug, trace};
pub trait MemtableImpl: Send + Sync {
fn insert(&mut self, entry: Entry) -> Result<()>;
fn get(&self, key: &Key) -> Result<Option<Entry>>;
fn delete(&mut self, key: &Key, sequence: u64) -> Result<()>;
fn iter(&self) -> Box<dyn Iterator<Item = Entry> + '_>;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn memory_usage(&self) -> usize;
fn clear(&mut self);
}
#[derive(Debug, Clone)]
struct SkipListNode {
entry: Entry,
next: Vec<Atomic<SkipListNode>>,
level: usize,
}
impl SkipListNode {
fn new(entry: Entry, level: usize) -> Self {
let mut next = Vec::with_capacity(level + 1);
for _ in 0..=level {
next.push(Atomic::null());
}
Self {
entry,
next,
level,
}
}
fn next_at(&self, level: usize) -> &Atomic<SkipListNode> {
&self.next[level]
}
}
pub struct SkipListMemtable {
head: Atomic<SkipListNode>,
max_level: usize,
entry_count: AtomicU64,
memory_usage: AtomicU64,
}
impl SkipListMemtable {
pub fn new() -> Self {
let head_entry = Entry::new(
Key::new(vec![]), Value::new(vec![]),
0,
);
let head = Owned::new(SkipListNode::new(head_entry, 32));
Self {
head: Atomic::from(head),
max_level: 32,
entry_count: AtomicU64::new(0),
memory_usage: AtomicU64::new(0),
}
}
fn random_level(&self) -> usize {
let mut level = 0;
let mut rng = fastrand::Rng::new();
while level < self.max_level && rng.u64(..100) < 25 {
level += 1;
}
level
}
fn find_node(&self, key: &Key) -> (Vec<Shared<SkipListNode>>, Vec<Shared<SkipListNode>>) {
let mut preds = Vec::with_capacity(self.max_level + 1);
let mut currs = Vec::with_capacity(self.max_level + 1);
for _ in 0..=self.max_level {
preds.push(Shared::null());
currs.push(Shared::null());
}
let guard = epoch::pin();
let mut pred = self.head.load(AtomicOrdering::Acquire, &guard);
for level in (0..=self.max_level).rev() {
let mut curr = pred;
while let Some(curr_ref) = curr.as_ref() {
let next = curr_ref.next_at(level).load(AtomicOrdering::Acquire, &guard);
if let Some(next_ref) = next.as_ref() {
match next_ref.entry.key.cmp(key) {
Ordering::Less => {
pred = next;
curr = next;
}
Ordering::Equal => {
preds[level] = pred;
currs[level] = curr;
break;
}
Ordering::Greater => {
break;
}
}
} else {
break;
}
}
preds[level] = pred;
currs[level] = curr;
}
(preds, currs)
}
}
impl MemtableImpl for SkipListMemtable {
fn insert(&mut self, entry: Entry) -> Result<()> {
let level = self.random_level();
let new_node = Owned::new(SkipListNode::new(entry.clone(), level));
let guard = epoch::pin();
let (preds, currs) = self.find_node(&entry.key);
if let Some(curr) = currs[0].as_ref() {
if curr.entry.key == entry.key {
return Ok(());
}
}
for i in 0..=level {
if let Some(pred) = preds[i].as_ref() {
new_node.next[i].store(
pred.next_at(i).load(AtomicOrdering::Acquire, &guard),
AtomicOrdering::Release,
);
pred.next_at(i).store(new_node.clone(), AtomicOrdering::Release);
}
}
self.entry_count.fetch_add(1, AtomicOrdering::Relaxed);
self.memory_usage.fetch_add(
entry.key.len() as u64 + entry.value.as_ref().map_or(0, |v| v.len() as u64),
AtomicOrdering::Relaxed,
);
Ok(())
}
fn get(&self, key: &Key) -> Result<Option<Entry>> {
let guard = epoch::pin();
let (_, currs) = self.find_node(key);
if let Some(curr) = currs[0].as_ref() {
if curr.entry.key == *key {
return Ok(Some(curr.entry.clone()));
}
}
Ok(None)
}
fn delete(&mut self, key: &Key, sequence: u64) -> Result<()> {
let guard = epoch::pin();
let (preds, currs) = self.find_node(key);
if let Some(curr) = currs[0].as_ref() {
if curr.entry.key == *key {
let delete_entry = Entry::delete(key.clone(), sequence);
let _ = self.insert(delete_entry);
return Ok(());
}
}
Ok(())
}
fn iter(&self) -> Box<dyn Iterator<Item = Entry> + '_> {
Box::new(std::iter::empty())
}
fn len(&self) -> usize {
self.entry_count.load(AtomicOrdering::Relaxed) as usize
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn memory_usage(&self) -> usize {
self.memory_usage.load(AtomicOrdering::Relaxed) as usize
}
fn clear(&mut self) {
self.entry_count.store(0, AtomicOrdering::Relaxed);
self.memory_usage.store(0, AtomicOrdering::Relaxed);
}
}
pub struct BTreeMemtable {
map: RwLock<BTreeMap<Key, Entry>>,
memory_usage: AtomicU64,
}
impl BTreeMemtable {
pub fn new() -> Self {
Self {
map: RwLock::new(BTreeMap::new()),
memory_usage: AtomicU64::new(0),
}
}
}
impl MemtableImpl for BTreeMemtable {
fn insert(&mut self, entry: Entry) -> Result<()> {
let mut map = self.map.write();
let old_entry = map.insert(entry.key.clone(), entry.clone());
if let Some(old) = old_entry {
self.memory_usage.fetch_sub(
old.key.len() as u64 + old.value.as_ref().map_or(0, |v| v.len() as u64),
AtomicOrdering::Relaxed,
);
}
self.memory_usage.fetch_add(
entry.key.len() as u64 + entry.value.as_ref().map_or(0, |v| v.len() as u64),
AtomicOrdering::Relaxed,
);
Ok(())
}
fn get(&self, key: &Key) -> Result<Option<Entry>> {
let map = self.map.read();
Ok(map.get(key).cloned())
}
fn delete(&mut self, key: &Key, sequence: u64) -> Result<()> {
let mut map = self.map.write();
if let Some(old_entry) = map.remove(key) {
self.memory_usage.fetch_sub(
old_entry.key.len() as u64 + old_entry.value.as_ref().map_or(0, |v| v.len() as u64),
AtomicOrdering::Relaxed,
);
let delete_entry = Entry::delete(key.clone(), sequence);
let _ = map.insert(key.clone(), delete_entry);
}
Ok(())
}
fn iter(&self) -> Box<dyn Iterator<Item = Entry> + '_> {
let map = self.map.read();
Box::new(map.values().cloned().collect::<Vec<_>>().into_iter())
}
fn len(&self) -> usize {
self.map.read().len()
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn memory_usage(&self) -> usize {
self.memory_usage.load(AtomicOrdering::Relaxed) as usize
}
fn clear(&mut self) {
let mut map = self.map.write();
map.clear();
self.memory_usage.store(0, AtomicOrdering::Relaxed);
}
}
pub struct ArtMemtable {
map: RwLock<BTreeMap<Key, Entry>>,
memory_usage: AtomicU64,
}
impl ArtMemtable {
pub fn new() -> Self {
Self {
map: RwLock::new(BTreeMap::new()),
memory_usage: AtomicU64::new(0),
}
}
}
impl MemtableImpl for ArtMemtable {
fn insert(&mut self, entry: Entry) -> Result<()> {
let mut map = self.map.write();
let old_entry = map.insert(entry.key.clone(), entry.clone());
if let Some(old) = old_entry {
self.memory_usage.fetch_sub(
old.key.len() as u64 + old.value.as_ref().map_or(0, |v| v.len() as u64),
AtomicOrdering::Relaxed,
);
}
self.memory_usage.fetch_add(
entry.key.len() as u64 + entry.value.as_ref().map_or(0, |v| v.len() as u64),
AtomicOrdering::Relaxed,
);
Ok(())
}
fn get(&self, key: &Key) -> Result<Option<Entry>> {
let map = self.map.read();
Ok(map.get(key).cloned())
}
fn delete(&mut self, key: &Key, sequence: u64) -> Result<()> {
let mut map = self.map.write();
if let Some(old_entry) = map.remove(key) {
self.memory_usage.fetch_sub(
old_entry.key.len() as u64 + old_entry.value.as_ref().map_or(0, |v| v.len() as u64),
AtomicOrdering::Relaxed,
);
let delete_entry = Entry::delete(key.clone(), sequence);
let _ = map.insert(key.clone(), delete_entry);
}
Ok(())
}
fn iter(&self) -> Box<dyn Iterator<Item = Entry> + '_> {
let map = self.map.read();
Box::new(map.values().cloned().collect::<Vec<_>>().into_iter())
}
fn len(&self) -> usize {
self.map.read().len()
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn memory_usage(&self) -> usize {
self.memory_usage.load(AtomicOrdering::Relaxed) as usize
}
fn clear(&mut self) {
let mut map = self.map.write();
map.clear();
self.memory_usage.store(0, AtomicOrdering::Relaxed);
}
}
pub struct Memtable {
implementation: Box<dyn MemtableImpl>,
max_size: usize,
flush_threshold: f64,
}
impl Memtable {
pub fn new(implementation: Box<dyn MemtableImpl>, max_size: usize, flush_threshold: f64) -> Self {
Self {
implementation,
max_size,
flush_threshold,
}
}
pub fn insert(&mut self, entry: Entry) -> Result<()> {
self.implementation.insert(entry)
}
pub fn get(&self, key: &Key) -> Result<Option<Entry>> {
self.implementation.get(key)
}
pub fn delete(&mut self, key: &Key, sequence: u64) -> Result<()> {
self.implementation.delete(key, sequence)
}
pub fn iter(&self) -> Box<dyn Iterator<Item = Entry> + '_> {
self.implementation.iter()
}
pub fn len(&self) -> usize {
self.implementation.len()
}
pub fn is_empty(&self) -> bool {
self.implementation.is_empty()
}
pub fn memory_usage(&self) -> usize {
self.implementation.memory_usage()
}
pub fn should_flush(&self) -> bool {
self.memory_usage() >= (self.max_size as f64 * self.flush_threshold) as usize
}
pub fn is_full(&self) -> bool {
self.memory_usage() >= self.max_size
}
pub fn clear(&mut self) {
self.implementation.clear();
}
pub fn take_entries(&mut self) -> Vec<Entry> {
let entries: Vec<Entry> = self.iter().collect();
self.clear();
entries
}
}
pub fn create_memtable(
implementation: crate::config::MemtableImpl,
max_size: usize,
flush_threshold: f64,
) -> Memtable {
let impl_box: Box<dyn MemtableImpl> = match implementation {
crate::config::MemtableImpl::SkipList => Box::new(SkipListMemtable::new()),
crate::config::MemtableImpl::Art => Box::new(ArtMemtable::new()),
crate::config::MemtableImpl::BTree => Box::new(BTreeMemtable::new()),
};
Memtable::new(impl_box, max_size, flush_threshold)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::{Entry, Key, Value, OpType};
#[test]
fn test_btree_memtable_insert_get() {
let mut memtable = BTreeMemtable::new();
let key = Key::new(b"test_key".to_vec());
let value = Value::new(b"test_value".to_vec());
let entry = Entry::new(key.clone(), value, 1);
memtable.insert(entry.clone()).unwrap();
let retrieved = memtable.get(&key).unwrap().unwrap();
assert_eq!(retrieved.key, entry.key);
assert_eq!(retrieved.value, entry.value);
}
#[test]
fn test_btree_memtable_delete() {
let mut memtable = BTreeMemtable::new();
let key = Key::new(b"test_key".to_vec());
let value = Value::new(b"test_value".to_vec());
let entry = Entry::new(key.clone(), value, 1);
memtable.insert(entry).unwrap();
assert!(memtable.get(&key).unwrap().is_some());
memtable.delete(&key, 2).unwrap();
let retrieved = memtable.get(&key).unwrap().unwrap();
assert!(retrieved.is_delete());
}
#[test]
fn test_memtable_factory() {
let memtable = create_memtable(
crate::config::MemtableImpl::BTree,
1024,
0.8,
);
assert!(memtable.is_empty());
assert_eq!(memtable.max_size, 1024);
assert_eq!(memtable.flush_threshold, 0.8);
}
}