use super::skiplist::SkipList;
const TOMBSTONE: &[u8] = b"\x00__TOMBSTONE__\x00";
#[derive(Debug, Clone)]
pub struct MemtableConfig {
pub max_bytes: usize,
pub flush_threshold: f64,
}
impl Default for MemtableConfig {
fn default() -> Self {
Self {
max_bytes: 64 * 1024 * 1024, flush_threshold: 0.75,
}
}
}
pub struct Memtable {
data: SkipList<Vec<u8>, Vec<u8>>,
size_bytes: usize,
entry_count: usize,
tombstone_count: usize,
config: MemtableConfig,
}
impl Memtable {
pub fn new() -> Self {
Self::with_config(MemtableConfig::default())
}
pub fn with_config(config: MemtableConfig) -> Self {
Self {
data: SkipList::new(),
size_bytes: 0,
entry_count: 0,
tombstone_count: 0,
config,
}
}
pub fn put(&mut self, key: &[u8], value: &[u8]) {
let entry_size = key.len() + value.len() + 16;
if let Some(old_value) = self.data.insert(key.to_vec(), value.to_vec()) {
let old_size = key.len() + old_value.len() + 16;
self.size_bytes = self.size_bytes.saturating_sub(old_size) + entry_size;
if old_value == TOMBSTONE {
self.tombstone_count -= 1;
}
} else {
self.size_bytes += entry_size;
self.entry_count += 1;
}
}
pub fn get(&self, key: &[u8]) -> Option<&[u8]> {
match self.data.get(&key.to_vec()) {
Some(value) if value.as_slice() != TOMBSTONE => Some(value),
_ => None,
}
}
pub fn contains(&self, key: &[u8]) -> bool {
self.get(key).is_some()
}
pub fn is_tombstone(&self, key: &[u8]) -> bool {
self.data
.get(&key.to_vec())
.map(|v| v.as_slice() == TOMBSTONE)
.unwrap_or(false)
}
pub fn delete(&mut self, key: &[u8]) {
let entry_size = key.len() + TOMBSTONE.len() + 16;
if let Some(old_value) = self.data.insert(key.to_vec(), TOMBSTONE.to_vec()) {
let old_size = key.len() + old_value.len() + 16;
self.size_bytes = self.size_bytes.saturating_sub(old_size) + entry_size;
if old_value != TOMBSTONE {
self.tombstone_count += 1;
}
} else {
self.size_bytes += entry_size;
self.entry_count += 1;
self.tombstone_count += 1;
}
}
pub fn scan(&self, start: &[u8], end: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
self.data
.range(&start.to_vec(), &end.to_vec())
.filter(|(_, v)| v.as_slice() != TOMBSTONE)
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
pub fn size_bytes(&self) -> usize {
self.size_bytes
}
pub fn entry_count(&self) -> usize {
self.entry_count
}
pub fn live_count(&self) -> usize {
self.entry_count - self.tombstone_count
}
pub fn should_flush(&self) -> bool {
self.size_bytes >= (self.config.max_bytes as f64 * self.config.flush_threshold) as usize
}
pub fn is_full(&self) -> bool {
self.size_bytes >= self.config.max_bytes
}
pub fn drain_sorted(self) -> Vec<(Vec<u8>, Vec<u8>)> {
self.data.drain_sorted().collect()
}
pub fn iter(&self) -> impl Iterator<Item = (&Vec<u8>, &Vec<u8>)> {
self.data.iter()
}
pub fn clear(&mut self) {
self.data.clear();
self.size_bytes = 0;
self.entry_count = 0;
self.tombstone_count = 0;
}
pub fn stats(&self) -> MemtableStats {
MemtableStats {
size_bytes: self.size_bytes,
entry_count: self.entry_count,
live_count: self.live_count(),
tombstone_count: self.tombstone_count,
max_bytes: self.config.max_bytes,
fill_ratio: self.size_bytes as f64 / self.config.max_bytes as f64,
}
}
}
impl Default for Memtable {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct MemtableStats {
pub size_bytes: usize,
pub entry_count: usize,
pub live_count: usize,
pub tombstone_count: usize,
pub max_bytes: usize,
pub fill_ratio: f64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_memtable_basic() {
let mut mt = Memtable::new();
mt.put(b"key1", b"value1");
mt.put(b"key2", b"value2");
assert_eq!(mt.get(b"key1"), Some(b"value1".as_ref()));
assert_eq!(mt.get(b"key2"), Some(b"value2".as_ref()));
assert_eq!(mt.get(b"key3"), None);
assert_eq!(mt.entry_count(), 2);
assert_eq!(mt.live_count(), 2);
}
#[test]
fn test_memtable_overwrite() {
let mut mt = Memtable::new();
mt.put(b"key", b"old");
mt.put(b"key", b"new");
assert_eq!(mt.get(b"key"), Some(b"new".as_ref()));
assert_eq!(mt.entry_count(), 1); }
#[test]
fn test_memtable_delete_tombstone() {
let mut mt = Memtable::new();
mt.put(b"key1", b"value1");
mt.put(b"key2", b"value2");
mt.delete(b"key1");
assert_eq!(mt.get(b"key1"), None); assert!(mt.is_tombstone(b"key1")); assert_eq!(mt.get(b"key2"), Some(b"value2".as_ref())); assert_eq!(mt.live_count(), 1);
assert_eq!(mt.tombstone_count, 1);
}
#[test]
fn test_memtable_scan() {
let mut mt = Memtable::new();
mt.put(b"a", b"1");
mt.put(b"b", b"2");
mt.put(b"c", b"3");
mt.put(b"d", b"4");
mt.delete(b"c");
let results = mt.scan(b"b", b"d");
assert_eq!(results.len(), 2); assert_eq!(results[0], (b"b".to_vec(), b"2".to_vec()));
assert_eq!(results[1], (b"d".to_vec(), b"4".to_vec()));
}
#[test]
fn test_memtable_drain_sorted() {
let mut mt = Memtable::new();
mt.put(b"c", b"3");
mt.put(b"a", b"1");
mt.put(b"b", b"2");
let drained = mt.drain_sorted();
let keys: Vec<_> = drained.iter().map(|(k, _)| k.clone()).collect();
assert_eq!(keys, vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]);
}
#[test]
fn test_memtable_should_flush() {
let config = MemtableConfig {
max_bytes: 100,
flush_threshold: 0.5,
};
let mut mt = Memtable::with_config(config);
assert!(!mt.should_flush());
mt.put(b"aaaaaaaaaa", b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
assert!(mt.should_flush());
}
#[test]
fn test_memtable_stats() {
let mut mt = Memtable::new();
mt.put(b"k1", b"v1");
mt.put(b"k2", b"v2");
mt.delete(b"k1");
let stats = mt.stats();
assert_eq!(stats.entry_count, 2);
assert_eq!(stats.live_count, 1);
assert_eq!(stats.tombstone_count, 1);
assert!(stats.size_bytes > 0);
}
#[test]
fn test_memtable_clear() {
let mut mt = Memtable::new();
mt.put(b"k1", b"v1");
mt.put(b"k2", b"v2");
mt.clear();
assert_eq!(mt.entry_count(), 0);
assert_eq!(mt.size_bytes(), 0);
assert_eq!(mt.get(b"k1"), None);
}
}