use std::{
collections::VecDeque,
sync::{Arc, RwLock},
};
use crate::types::{sequence::SeqNum, Result};
use crate::wal::batch::WriteBatch;
use tracing::debug;
use crate::memtable::{iterator::MemEntry, memtable::Memtable, skiplist::EntryValue};
struct MemtableSet {
active: Arc<Memtable>,
immutable: VecDeque<Arc<Memtable>>,
}
pub struct MemtableManager {
inner: RwLock<MemtableSet>,
flush_threshold: usize,
max_immutable: usize,
pub flush_complete: Arc<tokio::sync::Notify>,
pub immutable_available: Arc<tokio::sync::Notify>,
}
impl MemtableManager {
pub fn new(first_seq: SeqNum, flush_threshold: usize, max_immutable: usize) -> Self {
let active = Arc::new(Memtable::new(first_seq, flush_threshold));
Self {
inner: RwLock::new(MemtableSet {
active,
immutable: VecDeque::new(),
}),
flush_threshold,
max_immutable,
flush_complete: Arc::new(tokio::sync::Notify::new()),
immutable_available: Arc::new(tokio::sync::Notify::new()),
}
}
pub fn apply_batch(&self, batch: &WriteBatch) -> Result<bool> {
let set = self.inner.read().unwrap();
set.active.apply_batch(batch)
}
pub fn rotate(&self, new_first_seq: SeqNum) -> Arc<Memtable> {
debug!(new_first_seq = new_first_seq.0, "rotating memtable");
let mut set = self.inner.write().unwrap();
set.active.seal();
let sealed = set.active.clone();
set.active = Arc::new(Memtable::new(new_first_seq, self.flush_threshold));
set.immutable.push_back(sealed.clone());
drop(set);
metrics::counter!("merutable.memtable.rotations_total").increment(1);
self.immutable_available.notify_waiters();
sealed
}
pub fn drop_flushed(&self, first_seq: SeqNum) {
debug!(first_seq = first_seq.0, "dropping flushed memtable");
let mut set = self.inner.write().unwrap();
set.immutable.retain(|m| m.first_seq != first_seq);
self.flush_complete.notify_waiters();
}
pub fn oldest_immutable(&self) -> Option<Arc<Memtable>> {
let set = self.inner.read().unwrap();
set.immutable.front().cloned()
}
pub fn immutable_count(&self) -> usize {
self.inner.read().unwrap().immutable.len()
}
pub fn should_stall(&self) -> bool {
self.immutable_count() >= self.max_immutable
}
pub fn active_should_flush(&self) -> bool {
self.inner.read().unwrap().active.should_flush()
}
pub fn active_size_bytes(&self) -> usize {
self.inner.read().unwrap().active.size_bytes()
}
pub fn active_entry_count(&self) -> u64 {
self.inner.read().unwrap().active.entry_count()
}
pub fn flush_threshold(&self) -> usize {
self.flush_threshold
}
pub fn get(&self, user_key_bytes: &[u8], read_seq: SeqNum) -> Option<EntryValue> {
let set = self.inner.read().unwrap();
if let Some(e) = set.active.get(user_key_bytes, read_seq) {
return Some(e);
}
for mem in set.immutable.iter().rev() {
if let Some(e) = mem.get(user_key_bytes, read_seq) {
return Some(e);
}
}
None
}
pub fn snapshot_entries(&self, read_seq: SeqNum) -> Vec<Vec<MemEntry>> {
let set = self.inner.read().unwrap();
let mut snapshots: Vec<Vec<MemEntry>> = Vec::new();
snapshots.push(set.active.iter(read_seq).collect());
for mem in set.immutable.iter().rev() {
snapshots.push(mem.iter(read_seq).collect());
}
snapshots
}
pub fn snapshot_all_versions(&self, read_seq: SeqNum) -> Vec<MemEntry> {
let set = self.inner.read().unwrap();
let mut out: Vec<MemEntry> = set.active.iter_all_versions(read_seq);
for mem in set.immutable.iter().rev() {
out.extend(mem.iter_all_versions(read_seq));
}
out
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::sequence::{OpType, SeqNum};
use crate::wal::batch::WriteBatch;
use bytes::Bytes;
fn make_batch(seq: u64, key: &str, val: &str) -> WriteBatch {
let mut b = WriteBatch::new(SeqNum(seq));
let pk_bytes = encode_test_pk(key);
b.put(pk_bytes, Bytes::from(val.to_string()));
b
}
fn encode_test_pk(key: &str) -> Bytes {
Bytes::from(key.to_string())
}
#[test]
fn apply_and_get() {
let mgr = MemtableManager::new(SeqNum(1), 64 * 1024 * 1024, 4);
let batch = make_batch(1, "hello", "world");
mgr.apply_batch(&batch).unwrap();
let pk = encode_test_pk("hello");
let found = mgr.get(&pk, SeqNum(1));
assert!(found.is_some());
let e = found.unwrap();
assert_eq!(e.op_type, OpType::Put);
assert_eq!(e.value, Bytes::from("world"));
}
#[test]
fn read_seq_isolation() {
let mgr = MemtableManager::new(SeqNum(1), 64 * 1024 * 1024, 4);
let batch = make_batch(5, "key", "value_at_5");
mgr.apply_batch(&batch).unwrap();
let pk = encode_test_pk("key");
assert!(mgr.get(&pk, SeqNum(4)).is_none());
assert!(mgr.get(&pk, SeqNum(5)).is_some());
}
#[test]
fn rotate_and_lookup_in_immutable() {
let mgr = MemtableManager::new(SeqNum(1), 64 * 1024 * 1024, 4);
let batch = make_batch(1, "alpha", "val1");
mgr.apply_batch(&batch).unwrap();
let _sealed = mgr.rotate(SeqNum(2));
let batch2 = make_batch(2, "beta", "val2");
mgr.apply_batch(&batch2).unwrap();
let pk_alpha = encode_test_pk("alpha");
assert!(mgr.get(&pk_alpha, SeqNum(1)).is_some());
let pk_beta = encode_test_pk("beta");
assert!(mgr.get(&pk_beta, SeqNum(2)).is_some());
}
#[test]
fn drop_flushed_removes_immutable() {
let mgr = MemtableManager::new(SeqNum(1), 64 * 1024 * 1024, 4);
let _sealed = mgr.rotate(SeqNum(2));
assert_eq!(mgr.immutable_count(), 1);
mgr.drop_flushed(SeqNum(1));
assert_eq!(mgr.immutable_count(), 0);
}
}