pub mod skiplist;
pub mod skiplist_impl;
pub use skiplist::SkipListMemTable;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use crate::types::{InternalKey, SequenceNumber, ValueType};
pub struct MemRangeTombstone {
pub begin: Vec<u8>,
pub end: Vec<u8>,
pub seq: SequenceNumber,
}
pub struct MemTable {
inner: SkipListMemTable,
approximate_size: AtomicUsize,
has_range_deletions: AtomicBool,
range_tombstones: parking_lot::Mutex<Vec<MemRangeTombstone>>,
}
impl MemTable {
pub fn new() -> Self {
Self {
inner: SkipListMemTable::new(),
approximate_size: AtomicUsize::new(0),
has_range_deletions: AtomicBool::new(false),
range_tombstones: parking_lot::Mutex::new(Vec::new()),
}
}
pub fn put(&self, key: &[u8], value: &[u8], sequence: SequenceNumber, value_type: ValueType) {
let ikey = InternalKey::new(key, sequence, value_type);
let val = match value_type {
ValueType::Value => value.to_vec(),
ValueType::Deletion => Vec::new(),
ValueType::RangeDeletion => {
self.range_tombstones.lock().push(MemRangeTombstone {
begin: key.to_vec(),
end: value.to_vec(),
seq: sequence,
});
self.has_range_deletions.store(true, Ordering::Release);
value.to_vec()
}
};
let mut entry_size = ikey.encoded_len() + val.len() + 160; if value_type == ValueType::RangeDeletion {
entry_size += key.len() + value.len() + std::mem::size_of::<MemRangeTombstone>();
}
self.inner.insert(ikey.into_bytes(), val);
self.approximate_size
.fetch_add(entry_size, Ordering::Relaxed);
}
pub fn get(&self, key: &[u8], sequence: SequenceNumber) -> Option<Option<Vec<u8>>> {
self.get_with_seq(key, sequence).map(|(result, _)| result)
}
pub fn get_with_seq(
&self,
key: &[u8],
sequence: SequenceNumber,
) -> Option<(Option<Vec<u8>>, SequenceNumber)> {
let search_key = InternalKey::new(key, sequence, ValueType::Value);
self.inner.get_with_seq(search_key.as_bytes(), key)
}
pub fn approximate_size(&self) -> usize {
self.approximate_size.load(Ordering::Relaxed)
}
pub fn iter(&self) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> {
self.inner.iter()
}
pub fn iter_rev(&self) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> {
self.inner.iter_rev()
}
pub fn skiplist_ref(
&self,
) -> *const skiplist_impl::ConcurrentSkipList<skiplist::OrdInternalKey, Vec<u8>> {
self.inner.skiplist_ptr()
}
pub fn has_range_deletions(&self) -> bool {
self.has_range_deletions.load(Ordering::Acquire)
}
pub fn max_covering_tombstone_seq(
&self,
user_key: &[u8],
read_seq: SequenceNumber,
) -> SequenceNumber {
if !self.has_range_deletions() {
return 0;
}
let tombstones = self.range_tombstones.lock();
let mut max_seq: SequenceNumber = 0;
for rt in tombstones.iter() {
if rt.seq > read_seq {
continue;
}
if user_key >= rt.begin.as_slice() && user_key < rt.end.as_slice() && rt.seq > max_seq {
max_seq = rt.seq;
}
}
max_seq
}
pub fn get_range_tombstones(&self) -> Vec<(Vec<u8>, Vec<u8>, SequenceNumber)> {
if !self.has_range_deletions() {
return Vec::new();
}
let tombstones = self.range_tombstones.lock();
tombstones
.iter()
.map(|rt| (rt.begin.clone(), rt.end.clone(), rt.seq))
.collect()
}
pub fn is_empty(&self) -> bool {
self.approximate_size.load(Ordering::Relaxed) == 0
}
}
impl Default for MemTable {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_memtable_put_get() {
let mt = MemTable::new();
mt.put(b"key1", b"value1", 1, ValueType::Value);
mt.put(b"key2", b"value2", 2, ValueType::Value);
assert_eq!(mt.get(b"key1", 1), Some(Some(b"value1".to_vec())));
assert_eq!(mt.get(b"key2", 2), Some(Some(b"value2".to_vec())));
assert_eq!(mt.get(b"key1", 0), None);
}
#[test]
fn test_memtable_delete() {
let mt = MemTable::new();
mt.put(b"key1", b"value1", 1, ValueType::Value);
mt.put(b"key1", b"", 2, ValueType::Deletion);
assert_eq!(mt.get(b"key1", 2), Some(None));
assert_eq!(mt.get(b"key1", 1), Some(Some(b"value1".to_vec())));
}
#[test]
fn test_memtable_overwrite() {
let mt = MemTable::new();
mt.put(b"key1", b"v1", 1, ValueType::Value);
mt.put(b"key1", b"v2", 2, ValueType::Value);
assert_eq!(mt.get(b"key1", 2), Some(Some(b"v2".to_vec())));
assert_eq!(mt.get(b"key1", 1), Some(Some(b"v1".to_vec())));
assert_eq!(mt.get(b"key1", 10), Some(Some(b"v2".to_vec())));
}
#[test]
fn test_memtable_iterator() {
let mt = MemTable::new();
mt.put(b"b", b"2", 1, ValueType::Value);
mt.put(b"a", b"1", 2, ValueType::Value);
mt.put(b"c", b"3", 3, ValueType::Value);
let entries: Vec<_> = mt.iter().collect();
assert_eq!(entries.len(), 3);
let keys: Vec<&[u8]> = entries
.iter()
.map(|(k, _)| InternalKey::from_encoded(k.clone()).user_key().to_vec())
.map(|_| &[] as &[u8])
.collect();
let user_keys: Vec<Vec<u8>> = entries
.iter()
.map(|(k, _)| {
let ik = InternalKey::from_encoded(k.clone());
ik.user_key().to_vec()
})
.collect();
let _ = keys;
assert_eq!(user_keys[0], b"a");
assert_eq!(user_keys[1], b"b");
assert_eq!(user_keys[2], b"c");
}
#[test]
fn test_memtable_approximate_size() {
let mt = MemTable::new();
assert_eq!(mt.approximate_size(), 0);
mt.put(b"key", b"value", 1, ValueType::Value);
assert!(mt.approximate_size() > 0);
}
#[test]
fn test_memtable_single_writer_concurrent_readers() {
use std::sync::Arc;
use std::thread;
let mt = Arc::new(MemTable::new());
let num_entries = 800;
for i in 0..num_entries {
let t = i / 100;
let j = i % 100;
let key = format!("t{}_k{:04}", t, j);
let val = format!("t{}_v{}", t, j);
let seq = (i + 1) as u64;
mt.put(key.as_bytes(), val.as_bytes(), seq, ValueType::Value);
}
let mut handles = Vec::new();
for _ in 0..8 {
let mt = Arc::clone(&mt);
handles.push(thread::spawn(move || {
for i in 0..num_entries {
let t = i / 100;
let j = i % 100;
let key = format!("t{}_k{:04}", t, j);
let val = format!("t{}_v{}", t, j);
let seq = (i + 1) as u64;
let result = mt.get(key.as_bytes(), seq);
assert_eq!(
result,
Some(Some(val.into_bytes())),
"missing key {} at seq {}",
key,
seq
);
}
}));
}
for h in handles {
h.join().unwrap();
}
let total: usize = mt.iter().count();
assert_eq!(total, num_entries);
}
#[test]
fn test_memtable_prefix_similar_keys() {
let mt = MemTable::new();
mt.put(b"a", b"val_a", 1, ValueType::Value);
mt.put(b"ab", b"val_ab", 2, ValueType::Value);
mt.put(b"abc", b"val_abc", 3, ValueType::Value);
assert_eq!(mt.get(b"a", 10), Some(Some(b"val_a".to_vec())));
assert_eq!(mt.get(b"ab", 10), Some(Some(b"val_ab".to_vec())));
assert_eq!(mt.get(b"abc", 10), Some(Some(b"val_abc".to_vec())));
assert_eq!(mt.get(b"abcd", 10), None);
assert_eq!(mt.get(b"b", 10), None);
assert_eq!(mt.get(b"abc", 2), None);
assert_eq!(mt.get(b"abc", 3), Some(Some(b"val_abc".to_vec())));
let entries: Vec<_> = mt.iter().collect();
assert_eq!(entries.len(), 3);
let user_keys: Vec<Vec<u8>> = entries
.iter()
.map(|(k, _)| InternalKey::from_encoded(k.clone()).user_key().to_vec())
.collect();
assert_eq!(user_keys[0], b"a");
assert_eq!(user_keys[1], b"ab");
assert_eq!(user_keys[2], b"abc");
}
#[test]
fn test_memtable_is_empty_states() {
let mt = MemTable::new();
assert!(mt.is_empty());
assert_eq!(mt.approximate_size(), 0);
mt.put(b"k", b"v", 1, ValueType::Value);
assert!(!mt.is_empty());
assert!(mt.approximate_size() > 0);
let mt2 = MemTable::new();
assert!(mt2.is_empty());
mt2.put(b"k", b"", 1, ValueType::Deletion);
assert!(!mt2.is_empty());
}
}