use crate::value::{ParsedInternalKey, SeqNo, UserValue, ValueType};
use crate::{UserKey, Value};
use crossbeam_skiplist::SkipMap;
use std::sync::atomic::AtomicU32;
#[derive(Default)]
pub struct MemTable {
#[doc(hidden)]
pub items: SkipMap<ParsedInternalKey, UserValue>,
pub(crate) approximate_size: AtomicU32,
}
impl MemTable {
pub fn prefix(&self, prefix: UserKey) -> impl DoubleEndedIterator<Item = Value> + '_ {
self.items
.range(ParsedInternalKey::new(prefix.clone(), SeqNo::MAX, ValueType::Tombstone)..)
.filter(move |entry| entry.key().user_key.starts_with(&prefix))
.map(|entry| Value::from((entry.key().clone(), entry.value().clone())))
}
pub fn get<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> Option<Value> {
let prefix = key.as_ref();
let range = ParsedInternalKey::new(prefix, SeqNo::MAX, ValueType::Tombstone)..;
for entry in self.items.range(range) {
let key = entry.key();
if &*key.user_key > prefix {
return None;
}
if let Some(seqno) = seqno {
if key.seqno < seqno {
return Some(Value::from((entry.key().clone(), entry.value().clone())));
}
} else {
return Some(Value::from((entry.key().clone(), entry.value().clone())));
}
}
None
}
pub fn size(&self) -> u32 {
self.approximate_size
.load(std::sync::atomic::Ordering::Acquire)
}
pub fn len(&self) -> usize {
self.items.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn insert(&self, item: Value) -> (u32, u32) {
#[allow(clippy::cast_possible_truncation)]
let item_size = item.size() as u32;
let size_before = self
.approximate_size
.fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
let key = ParsedInternalKey::new(item.key, item.seqno, item.value_type);
self.items.insert(key, item.value);
(item_size, size_before + item_size)
}
pub fn get_lsn(&self) -> Option<SeqNo> {
self.items
.iter()
.map(|x| {
let key = x.key();
key.seqno
})
.max()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::value::ValueType;
use test_log::test;
#[test]
#[allow(clippy::unwrap_used)]
fn memtable_mvcc_point_read() {
let memtable = MemTable::default();
memtable.insert(Value::new(
*b"hello-key-999991",
*b"hello-value-999991",
0,
ValueType::Value,
));
let item = memtable.get("hello-key-99999", None);
assert_eq!(None, item);
let item = memtable.get("hello-key-999991", None);
assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
memtable.insert(Value::new(
*b"hello-key-999991",
*b"hello-value-999991-2",
1,
ValueType::Value,
));
let item = memtable.get("hello-key-99999", None);
assert_eq!(None, item);
let item = memtable.get("hello-key-999991", None);
assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
let item = memtable.get("hello-key-99999", Some(1));
assert_eq!(None, item);
let item = memtable.get("hello-key-999991", Some(1));
assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
let item = memtable.get("hello-key-99999", Some(2));
assert_eq!(None, item);
let item = memtable.get("hello-key-999991", Some(2));
assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
}
#[test]
fn memtable_get() {
let memtable = MemTable::default();
let value = Value::new(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
memtable.insert(value.clone());
assert_eq!(Some(value), memtable.get("abc", None));
}
#[test]
fn memtable_get_highest_seqno() {
let memtable = MemTable::default();
memtable.insert(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
0,
ValueType::Value,
));
memtable.insert(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
1,
ValueType::Value,
));
memtable.insert(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
2,
ValueType::Value,
));
memtable.insert(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
3,
ValueType::Value,
));
memtable.insert(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
4,
ValueType::Value,
));
assert_eq!(
Some(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
4,
ValueType::Value,
)),
memtable.get("abc", None)
);
}
#[test]
fn memtable_get_prefix() {
let memtable = MemTable::default();
memtable.insert(Value::new(
b"abc0".to_vec(),
b"abc".to_vec(),
0,
ValueType::Value,
));
memtable.insert(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
255,
ValueType::Value,
));
assert_eq!(
Some(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
255,
ValueType::Value,
)),
memtable.get("abc", None)
);
assert_eq!(
Some(Value::new(
b"abc0".to_vec(),
b"abc".to_vec(),
0,
ValueType::Value,
)),
memtable.get("abc0", None)
);
}
#[test]
fn memtable_get_old_version() {
let memtable = MemTable::default();
memtable.insert(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
0,
ValueType::Value,
));
memtable.insert(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
99,
ValueType::Value,
));
memtable.insert(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
255,
ValueType::Value,
));
assert_eq!(
Some(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
255,
ValueType::Value,
)),
memtable.get("abc", None)
);
assert_eq!(
Some(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
99,
ValueType::Value,
)),
memtable.get("abc", Some(100))
);
assert_eq!(
Some(Value::new(
b"abc".to_vec(),
b"abc".to_vec(),
0,
ValueType::Value,
)),
memtable.get("abc", Some(50))
);
}
}