#![allow(dead_code)]
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
use crossbeam_skiplist::SkipMap;
use crossbeam_utils::CachePadded;
const DEFAULT_LIMIT_BYTES: u64 = 3 * 1024 * 1024 * 1024;
const ENTRY_BASE_OVERHEAD: usize = 32;
const BACKPRESSURE_MARGIN: u64 = 64 * 1024 * 1024;
#[derive(Debug, thiserror::Error)]
pub enum MemTableError {
#[error("memtable is frozen")]
Frozen,
#[error("memtable back-pressure active")]
Backpressure,
}
#[derive(Clone, Debug)]
pub struct MemTableEntry {
pub key: Arc<[u8]>,
pub value: Arc<[u8]>,
pub sequence: u64,
pub tombstone: bool,
}
impl MemTableEntry {
fn heap_usage(&self) -> usize {
self.key.len() + self.value.len() + ENTRY_BASE_OVERHEAD
}
}
#[derive(Clone)]
struct EntryValue {
value: Arc<[u8]>,
sequence: u64,
tombstone: bool,
heap_bytes: usize,
}
impl EntryValue {
fn into_entry(self, key: Arc<[u8]>) -> MemTableEntry {
MemTableEntry {
key,
value: self.value,
sequence: self.sequence,
tombstone: self.tombstone,
}
}
}
struct Arena {
used: CachePadded<AtomicU64>,
}
pub struct MemTable {
inner: SkipMap<Arc<[u8]>, EntryValue>,
arenas: Arc<Vec<Arena>>,
bytes_used: AtomicU64,
backpressure: AtomicBool,
frozen: AtomicBool,
sequence: AtomicU64,
limit_bytes: u64,
}
impl MemTable {
pub fn new() -> Self {
Self::with_limit(DEFAULT_LIMIT_BYTES)
}
pub fn with_limit(limit_bytes: u64) -> Self {
let cpu_count = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
.max(1);
let arenas = (0..cpu_count)
.map(|_| Arena {
used: CachePadded::new(AtomicU64::new(0)),
})
.collect();
Self {
inner: SkipMap::new(),
arenas: Arc::new(arenas),
bytes_used: AtomicU64::new(0),
backpressure: AtomicBool::new(false),
frozen: AtomicBool::new(false),
sequence: AtomicU64::new(0),
limit_bytes: limit_bytes.max(1),
}
}
fn threshold(&self) -> u64 {
self.limit_bytes
.saturating_sub(BACKPRESSURE_MARGIN.min(self.limit_bytes / 8))
}
fn arena_for_thread(&self) -> &Arena {
let mut hasher = DefaultHasher::new();
std::thread::current().id().hash(&mut hasher);
let id = hasher.finish() as usize;
let index = id % self.arenas.len().max(1);
&self.arenas[index]
}
fn reserve_bytes(&self, bytes: usize) -> Result<(), MemTableError> {
if bytes == 0 {
return Ok(());
}
let mut current = self.bytes_used.load(AtomicOrdering::Relaxed);
loop {
let proposed = current + bytes as u64;
if proposed > self.limit_bytes {
self.backpressure.store(true, AtomicOrdering::Release);
return Err(MemTableError::Backpressure);
}
match self.bytes_used.compare_exchange_weak(
current,
proposed,
AtomicOrdering::SeqCst,
AtomicOrdering::Relaxed,
) {
Ok(_) => {
self.arena_for_thread()
.used
.fetch_add(bytes as u64, AtomicOrdering::Relaxed);
if proposed >= self.threshold() {
self.backpressure.store(true, AtomicOrdering::Release);
}
return Ok(());
}
Err(actual) => current = actual,
}
}
}
fn release_bytes(&self, bytes: usize) {
if bytes == 0 {
return;
}
self.bytes_used
.fetch_sub(bytes as u64, AtomicOrdering::Relaxed);
self.arena_for_thread()
.used
.fetch_sub(bytes as u64, AtomicOrdering::Relaxed);
if self.bytes_used.load(AtomicOrdering::Relaxed) < self.threshold() {
self.backpressure.store(false, AtomicOrdering::Release);
}
}
pub fn put(&self, key: &[u8], value: Vec<u8>) -> Result<MemTableEntry, MemTableError> {
if self.frozen.load(AtomicOrdering::Acquire) {
return Err(MemTableError::Frozen);
}
let key_arc: Arc<[u8]> = Arc::from(key.to_owned());
let value_arc: Arc<[u8]> = Arc::from(value);
let entry = EntryValue {
value: value_arc.clone(),
sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed) + 1,
tombstone: false,
heap_bytes: key_arc.len() + value_arc.len() + ENTRY_BASE_OVERHEAD,
};
let previous = self
.inner
.get(&key_arc)
.map(|existing| existing.value().heap_bytes);
self.reserve_bytes(entry.heap_bytes)?;
self.inner.insert(key_arc.clone(), entry.clone());
if let Some(bytes) = previous {
self.release_bytes(bytes);
}
Ok(entry.into_entry(key_arc))
}
pub fn put_owned(&self, key: Vec<u8>, value: Vec<u8>) -> Result<MemTableEntry, MemTableError> {
if self.frozen.load(AtomicOrdering::Acquire) {
return Err(MemTableError::Frozen);
}
let key_arc: Arc<[u8]> = Arc::from(key);
let value_arc: Arc<[u8]> = Arc::from(value);
let entry = EntryValue {
value: value_arc.clone(),
sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed) + 1,
tombstone: false,
heap_bytes: key_arc.len() + value_arc.len() + ENTRY_BASE_OVERHEAD,
};
let previous = self
.inner
.get(&key_arc)
.map(|existing| existing.value().heap_bytes);
self.reserve_bytes(entry.heap_bytes)?;
self.inner.insert(key_arc.clone(), entry.clone());
if let Some(bytes) = previous {
self.release_bytes(bytes);
}
Ok(entry.into_entry(key_arc))
}
pub fn put_arc(
&self,
key_arc: Arc<[u8]>,
value_arc: Arc<[u8]>,
) -> Result<MemTableEntry, MemTableError> {
if self.frozen.load(AtomicOrdering::Acquire) {
return Err(MemTableError::Frozen);
}
let entry = EntryValue {
value: value_arc.clone(),
sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed) + 1,
tombstone: false,
heap_bytes: key_arc.len() + value_arc.len() + ENTRY_BASE_OVERHEAD,
};
let previous = self
.inner
.get(&key_arc)
.map(|existing| existing.value().heap_bytes);
self.reserve_bytes(entry.heap_bytes)?;
self.inner.insert(key_arc.clone(), entry.clone());
if let Some(bytes) = previous {
self.release_bytes(bytes);
}
Ok(entry.into_entry(key_arc))
}
pub fn delete(&self, key: &[u8]) -> Result<MemTableEntry, MemTableError> {
if self.frozen.load(AtomicOrdering::Acquire) {
return Err(MemTableError::Frozen);
}
let key_arc: Arc<[u8]> = Arc::from(key.to_owned());
let value_arc: Arc<[u8]> = Arc::from(Vec::new());
let entry = EntryValue {
value: value_arc.clone(),
sequence: self.sequence.fetch_add(1, AtomicOrdering::Relaxed) + 1,
tombstone: true,
heap_bytes: key_arc.len() + ENTRY_BASE_OVERHEAD,
};
let previous = self
.inner
.get(&key_arc)
.map(|existing| existing.value().heap_bytes);
self.reserve_bytes(entry.heap_bytes)?;
self.inner.insert(key_arc.clone(), entry.clone());
if let Some(bytes) = previous {
self.release_bytes(bytes);
}
Ok(entry.into_entry(key_arc))
}
pub fn get(&self, key: &[u8]) -> Option<MemTableEntry> {
self.inner
.get(key)
.map(|entry| entry.value().clone().into_entry(entry.key().clone()))
}
pub fn approximate_size(&self) -> u64 {
self.bytes_used.load(AtomicOrdering::Relaxed)
}
pub fn arena_usage(&self) -> u64 {
self.arena_for_thread().used.load(AtomicOrdering::Relaxed)
}
pub fn is_backpressured(&self) -> bool {
self.backpressure.load(AtomicOrdering::Acquire)
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn freeze(&self) -> FrozenMemTable {
self.frozen.store(true, AtomicOrdering::SeqCst);
let mut entries: Vec<MemTableEntry> = self
.inner
.iter()
.map(|entry| entry.value().clone().into_entry(entry.key().clone()))
.collect();
entries.sort_by(|a, b| a.key.cmp(&b.key).then_with(|| a.sequence.cmp(&b.sequence)));
FrozenMemTable {
entries,
total_bytes: self.approximate_size() as usize,
}
}
pub fn is_frozen(&self) -> bool {
self.frozen.load(AtomicOrdering::Acquire)
}
}
pub struct FrozenMemTable {
entries: Vec<MemTableEntry>,
total_bytes: usize,
}
impl FrozenMemTable {
pub fn entries(&self) -> &[MemTableEntry] {
&self.entries
}
pub fn into_entries(self) -> Vec<MemTableEntry> {
self.entries
}
pub fn total_bytes(&self) -> usize {
self.total_bytes
}
}