use crate::key::InternalKey;
use crate::{
value::{InternalValue, SeqNo, UserValue},
ValueType,
};
use crossbeam_skiplist::SkipMap;
use std::ops::RangeBounds;
use std::sync::atomic::{AtomicBool, AtomicU64};
pub use crate::tree::inner::MemtableId;
pub struct Memtable {
#[doc(hidden)]
pub id: MemtableId,
#[doc(hidden)]
pub items: SkipMap<InternalKey, UserValue>,
pub(crate) approximate_size: AtomicU64,
pub(crate) highest_seqno: AtomicU64,
pub(crate) requested_rotation: AtomicBool,
}
impl Memtable {
pub fn id(&self) -> MemtableId {
self.id
}
pub fn is_flagged_for_rotation(&self) -> bool {
self.requested_rotation
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn flag_rotated(&self) {
self.requested_rotation
.store(true, std::sync::atomic::Ordering::Relaxed);
}
#[doc(hidden)]
#[must_use]
pub fn new(id: MemtableId) -> Self {
Self {
id,
items: SkipMap::default(),
approximate_size: AtomicU64::default(),
highest_seqno: AtomicU64::default(),
requested_rotation: AtomicBool::default(),
}
}
pub fn iter(&self) -> impl DoubleEndedIterator<Item = InternalValue> + '_ {
self.items.iter().map(|entry| InternalValue {
key: entry.key().clone(),
value: entry.value().clone(),
})
}
pub(crate) fn range<'a, R: RangeBounds<InternalKey> + 'a>(
&'a self,
range: R,
) -> impl DoubleEndedIterator<Item = InternalValue> + 'a {
self.items.range(range).map(|entry| InternalValue {
key: entry.key().clone(),
value: entry.value().clone(),
})
}
#[doc(hidden)]
pub fn get(&self, key: &[u8], seqno: SeqNo) -> Option<InternalValue> {
if seqno == 0 {
return None;
}
let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);
let mut iter = self
.items
.range(lower_bound..)
.take_while(|entry| &*entry.key().user_key == key);
iter.next().map(|entry| InternalValue {
key: entry.key().clone(),
value: entry.value().clone(),
})
}
pub fn size(&self) -> u64 {
self.approximate_size
.load(std::sync::atomic::Ordering::Acquire)
}
pub fn len(&self) -> usize {
self.items.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
#[doc(hidden)]
pub fn insert(&self, item: InternalValue) -> (u64, u64) {
#[expect(
clippy::expect_used,
reason = "keys are limited to 16-bit length + values are limited to 32-bit length"
)]
let item_size =
(item.key.user_key.len() + item.value.len() + std::mem::size_of::<InternalValue>())
.try_into()
.expect("should fit into u64");
let size_before = self
.approximate_size
.fetch_add(item_size, std::sync::atomic::Ordering::AcqRel);
let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
self.items.insert(key, item.value);
self.highest_seqno
.fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);
(item_size, size_before + item_size)
}
pub fn get_highest_seqno(&self) -> Option<SeqNo> {
if self.is_empty() {
None
} else {
Some(
self.highest_seqno
.load(std::sync::atomic::Ordering::Acquire),
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ValueType;
use test_log::test;
#[test]
#[expect(clippy::unwrap_used)]
fn memtable_mvcc_point_read() {
let memtable = Memtable::new(0);
memtable.insert(InternalValue::from_components(
*b"hello-key-999991",
*b"hello-value-999991",
0,
ValueType::Value,
));
let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
assert_eq!(None, item);
let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
assert_eq!(*b"hello-value-999991", &*item.unwrap().value);
memtable.insert(InternalValue::from_components(
*b"hello-key-999991",
*b"hello-value-999991-2",
1,
ValueType::Value,
));
let item = memtable.get(b"hello-key-99999", SeqNo::MAX);
assert_eq!(None, item);
let item = memtable.get(b"hello-key-999991", SeqNo::MAX);
assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
let item = memtable.get(b"hello-key-99999", 1);
assert_eq!(None, item);
let item = memtable.get(b"hello-key-999991", 1);
assert_eq!((*b"hello-value-999991"), &*item.unwrap().value);
let item = memtable.get(b"hello-key-99999", 2);
assert_eq!(None, item);
let item = memtable.get(b"hello-key-999991", 2);
assert_eq!((*b"hello-value-999991-2"), &*item.unwrap().value);
}
#[test]
fn memtable_get() {
let memtable = Memtable::new(0);
let value =
InternalValue::from_components(b"abc".to_vec(), b"abc".to_vec(), 0, ValueType::Value);
memtable.insert(value.clone());
assert_eq!(Some(value), memtable.get(b"abc", SeqNo::MAX));
}
#[test]
fn memtable_get_highest_seqno() {
let memtable = Memtable::new(0);
memtable.insert(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
0,
ValueType::Value,
));
memtable.insert(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
1,
ValueType::Value,
));
memtable.insert(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
2,
ValueType::Value,
));
memtable.insert(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
3,
ValueType::Value,
));
memtable.insert(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
4,
ValueType::Value,
));
assert_eq!(
Some(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
4,
ValueType::Value,
)),
memtable.get(b"abc", SeqNo::MAX)
);
}
#[test]
fn memtable_get_prefix() {
let memtable = Memtable::new(0);
memtable.insert(InternalValue::from_components(
b"abc0".to_vec(),
b"abc".to_vec(),
0,
ValueType::Value,
));
memtable.insert(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
255,
ValueType::Value,
));
assert_eq!(
Some(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
255,
ValueType::Value,
)),
memtable.get(b"abc", SeqNo::MAX)
);
assert_eq!(
Some(InternalValue::from_components(
b"abc0".to_vec(),
b"abc".to_vec(),
0,
ValueType::Value,
)),
memtable.get(b"abc0", SeqNo::MAX)
);
}
#[test]
fn memtable_get_old_version() {
let memtable = Memtable::new(0);
memtable.insert(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
0,
ValueType::Value,
));
memtable.insert(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
99,
ValueType::Value,
));
memtable.insert(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
255,
ValueType::Value,
));
assert_eq!(
Some(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
255,
ValueType::Value,
)),
memtable.get(b"abc", SeqNo::MAX)
);
assert_eq!(
Some(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
99,
ValueType::Value,
)),
memtable.get(b"abc", 100)
);
assert_eq!(
Some(InternalValue::from_components(
b"abc".to_vec(),
b"abc".to_vec(),
0,
ValueType::Value,
)),
memtable.get(b"abc", 50)
);
}
}