use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread::ThreadId;
use tracing::info;
use super::table::{MemTable, MemTableError, Result};
use super::types::{MemTableConfig, MemTableEntry, MemTableManagerStats};
const THREAD_LOCAL_BUFFER_SIZE: usize = 64;
static MANAGER_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
type BufferEntry = (Vec<u8>, Vec<u8>);
type RegistryKey = (ThreadId, u64);
type BufferRegistry = Mutex<HashMap<RegistryKey, Arc<Mutex<Vec<BufferEntry>>>>>;
static BUFFER_REGISTRY: std::sync::LazyLock<BufferRegistry> =
std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
pub struct MemTableManager {
id: u64,
pub active: Arc<RwLock<Arc<MemTable>>>,
pub immutable: Arc<RwLock<Vec<Arc<MemTable>>>>,
config: MemTableConfig,
}
impl MemTableManager {
pub fn new(config: MemTableConfig) -> Self {
let active = Arc::new(MemTable::new(config.clone()));
let id = MANAGER_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
Self {
id,
active: Arc::new(RwLock::new(active)),
immutable: Arc::new(RwLock::new(Vec::new())),
config,
}
}
fn get_buffer(&self) -> Arc<Mutex<Vec<BufferEntry>>> {
let key = (std::thread::current().id(), self.id);
let mut registry = BUFFER_REGISTRY.lock();
registry
.entry(key)
.or_insert_with(|| Arc::new(Mutex::new(Vec::with_capacity(THREAD_LOCAL_BUFFER_SIZE))))
.clone()
}
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<u64> {
for _ in 0..5 {
let active = self.active.read();
match active.insert(key, value) {
Ok(seq) => return Ok(seq),
Err(MemTableError::Full) => {
drop(active);
self.rotate_memtable()?;
}
Err(e) => return Err(e),
}
}
Err(MemTableError::Full)
}
#[inline]
pub fn insert_buffered(&self, key: &[u8], value: &[u8]) -> Result<u64> {
let buffer = self.get_buffer();
let mut buf = buffer.lock();
buf.push((key.to_vec(), value.to_vec()));
if buf.len() >= THREAD_LOCAL_BUFFER_SIZE {
let entries: Vec<_> = buf.drain(..).collect();
drop(buf); self.flush_buffer(&entries)?;
}
Ok(0) }
fn flush_buffer(&self, entries: &[(Vec<u8>, Vec<u8>)]) -> Result<()> {
for _ in 0..5 {
let active = self.active.read();
let mut success = true;
for (key, value) in entries {
match active.insert(key, value) {
Ok(_) => {}
Err(MemTableError::Full) => {
success = false;
break;
}
Err(e) => return Err(e),
}
}
if success {
return Ok(());
}
drop(active);
self.rotate_memtable()?;
}
Err(MemTableError::Full)
}
pub fn flush_thread_local(&self) -> Result<()> {
let registry = BUFFER_REGISTRY.lock();
for ((_, manager_id), buffer) in registry.iter() {
if *manager_id != self.id {
continue;
}
let mut buf = buffer.lock();
if !buf.is_empty() {
let entries: Vec<_> = buf.drain(..).collect();
drop(buf);
self.flush_buffer(&entries)?;
}
}
Ok(())
}
pub fn delete(&self, key: &[u8]) -> Result<u64> {
for _ in 0..5 {
let active = self.active.read();
match active.delete(key) {
Ok(seq) => return Ok(seq),
Err(MemTableError::Full) => {
drop(active);
self.rotate_memtable()?;
}
Err(e) => return Err(e),
}
}
Err(MemTableError::Full)
}
pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
if let Some(value) = self.active.read().get(key) {
return Some(value);
}
for table in self.immutable.read().iter().rev() {
if let Some(value) = table.get(key) {
return Some(value);
}
if let Some(entry) = table.get_entry(key) {
if entry.is_tombstone() {
return None; }
}
}
None
}
pub fn contains_key(&self, key: &[u8]) -> bool {
if self.active.read().contains_key(key) {
return true;
}
for table in self.immutable.read().iter().rev() {
if table.contains_key(key) {
return true;
}
if let Some(entry) = table.get_entry(key) {
if entry.is_tombstone() {
return false;
}
}
}
false
}
pub fn get_entry(&self, key: &[u8]) -> Option<MemTableEntry> {
if let Some(entry) = self.active.read().get_entry(key) {
return Some(entry);
}
for table in self.immutable.read().iter().rev() {
if let Some(entry) = table.get_entry(key) {
return Some(entry);
}
}
None
}
pub fn range(&self, start: &[u8], end: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
use std::collections::BTreeMap;
let mut merged: BTreeMap<Vec<u8>, Option<Vec<u8>>> = BTreeMap::new();
for table in self.immutable.read().iter() {
for (key, entry) in table.get_all_entries() {
if key >= start.to_vec() && key < end.to_vec() {
merged.insert(key, entry.value);
}
}
}
for (key, entry) in self.active.read().get_all_entries() {
if key >= start.to_vec() && key < end.to_vec() {
merged.insert(key, entry.value);
}
}
merged
.into_iter()
.filter_map(|(k, v)| v.map(|val| (k, val)))
.collect()
}
pub fn scan_prefix(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
use std::collections::BTreeMap;
let mut merged: BTreeMap<Vec<u8>, Option<Vec<u8>>> = BTreeMap::new();
for table in self.immutable.read().iter() {
for (key, entry) in table.get_all_entries() {
if key.starts_with(prefix) {
merged.insert(key, entry.value);
}
}
}
for (key, entry) in self.active.read().get_all_entries() {
if key.starts_with(prefix) {
merged.insert(key, entry.value);
}
}
merged
.into_iter()
.filter_map(|(k, v)| v.map(|val| (k, val)))
.collect()
}
fn rotate_memtable(&self) -> Result<()> {
let mut active_lock = self.active.write();
if !active_lock.should_flush() {
return Ok(());
}
info!("Rotating MemTable");
active_lock.set_read_only();
let old_table = active_lock.clone();
self.immutable.write().push(old_table);
*active_lock = Arc::new(MemTable::new(self.config.clone()));
Ok(())
}
pub fn get_immutable_for_flush(&self) -> Option<Arc<MemTable>> {
let mut immutable = self.immutable.write();
if immutable.is_empty() {
None
} else {
Some(immutable.remove(0))
}
}
pub fn has_immutable(&self) -> bool {
!self.immutable.read().is_empty()
}
pub fn immutable_count(&self) -> usize {
self.immutable.read().len()
}
pub fn current_sequence(&self) -> u64 {
self.active.read().current_sequence()
}
pub fn stats(&self) -> MemTableManagerStats {
let active_stats = self.active.read().stats();
let immutable_stats: Vec<_> = self
.immutable
.read()
.iter()
.map(|table| table.stats())
.collect();
MemTableManagerStats {
active: active_stats,
immutable: immutable_stats,
}
}
pub fn force_rotate(&self) -> Result<()> {
let mut active_lock = self.active.write();
if active_lock.is_empty() {
return Ok(());
}
info!("Force rotating MemTable");
active_lock.set_read_only();
let old_table = active_lock.clone();
self.immutable.write().push(old_table);
*active_lock = Arc::new(MemTable::new(self.config.clone()));
Ok(())
}
}
impl Drop for MemTableManager {
fn drop(&mut self) {
let mut registry = BUFFER_REGISTRY.lock();
registry.retain(|(_, manager_id), _| *manager_id != self.id);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn test_config() -> MemTableConfig {
MemTableConfig {
max_size: 1024 * 1024,
max_entries: 100,
max_age: Duration::from_secs(3600),
}
}
#[test]
fn test_insert_and_get() {
let manager = MemTableManager::new(test_config());
manager.insert(b"key1", b"value1").unwrap();
manager.insert(b"key2", b"value2").unwrap();
assert_eq!(manager.get(b"key1"), Some(b"value1".to_vec()));
assert_eq!(manager.get(b"key2"), Some(b"value2".to_vec()));
assert_eq!(manager.get(b"key3"), None);
}
#[test]
fn test_delete() {
let manager = MemTableManager::new(test_config());
manager.insert(b"key1", b"value1").unwrap();
assert!(manager.contains_key(b"key1"));
manager.delete(b"key1").unwrap();
assert!(!manager.contains_key(b"key1"));
}
#[test]
fn test_rotation() {
let config = MemTableConfig {
max_entries: 10,
..test_config()
};
let manager = MemTableManager::new(config);
for i in 0..15 {
manager
.insert(format!("key{}", i).as_bytes(), b"value")
.unwrap();
}
assert!(manager.has_immutable());
}
#[test]
fn test_get_across_memtables() {
let config = MemTableConfig {
max_entries: 5,
..test_config()
};
let manager = MemTableManager::new(config);
for i in 0..12 {
manager
.insert(
format!("key{}", i).as_bytes(),
format!("value{}", i).as_bytes(),
)
.unwrap();
}
for i in 0..12 {
assert!(manager.get(format!("key{}", i).as_bytes()).is_some());
}
}
}