use std::collections::HashSet;
use std::fs;
use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::thread::JoinHandle;
use crate::batch::{Batch, Op};
use crate::config::LsmConfig;
use crate::error::{Error, Result};
use crate::manifest::{self, Manifest};
use crate::memtable::MemTable;
use crate::merge::Merge;
use crate::record::Record;
use crate::scan::Scan;
use crate::sstable::{SsTable, SsTableWriter};
#[derive(Debug)]
struct Inner {
memtable: MemTable,
runs: Vec<Arc<SsTable>>,
}
#[derive(Debug, Default)]
struct CompactionState {
pending: bool,
running: bool,
shutdown: bool,
generation: u64,
}
#[derive(Debug)]
struct Engine {
dir: PathBuf,
config: LsmConfig,
inner: RwLock<Inner>,
next_seq: AtomicU64,
compacting: AtomicBool,
compaction: Mutex<CompactionState>,
cond: Condvar,
last_error: Mutex<Option<Error>>,
}
#[derive(Debug)]
pub struct Lsm {
engine: Arc<Engine>,
compactor: Option<JoinHandle<()>>,
}
impl Lsm {
pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
Self::open_with(dir, LsmConfig::default())
}
pub fn open_with(dir: impl AsRef<Path>, config: LsmConfig) -> Result<Self> {
let dir = dir.as_ref().to_path_buf();
fs::create_dir_all(&dir).map_err(|e| Error::io("create database directory", e))?;
let manifest = Manifest::load(&dir)?;
let (run_names, manifest_seq) = match manifest {
Some(m) => (m.runs, m.next_seq),
None => (Vec::new(), 0),
};
let live: HashSet<&str> = run_names.iter().map(String::as_str).collect();
let mut runs = Vec::with_capacity(run_names.len());
for name in &run_names {
let path = dir.join(name);
if !path.exists() {
return Err(Error::corruption("manifest references a missing run"));
}
runs.push(Arc::new(SsTable::open(&path)?));
}
let mut next_seq = manifest_seq;
for entry in fs::read_dir(&dir).map_err(|e| Error::io("scan database directory", e))? {
let entry = entry.map_err(|e| Error::io("read directory entry", e))?;
let name = entry.file_name().to_string_lossy().into_owned();
if name.ends_with(".tmp") {
fs::remove_file(entry.path()).map_err(|e| Error::io("remove temporary file", e))?;
} else if let Some(seq) = manifest::seq_of(&name) {
next_seq = next_seq.max(seq + 1);
if !live.contains(name.as_str()) {
fs::remove_file(entry.path()).map_err(|e| Error::io("remove orphan run", e))?;
}
}
}
let engine = Arc::new(Engine {
dir,
config,
inner: RwLock::new(Inner {
memtable: MemTable::new(),
runs,
}),
next_seq: AtomicU64::new(next_seq),
compacting: AtomicBool::new(false),
compaction: Mutex::new(CompactionState::default()),
cond: Condvar::new(),
last_error: Mutex::new(None),
});
let compactor = {
let engine = Arc::clone(&engine);
std::thread::Builder::new()
.name("lsm-compactor".to_owned())
.spawn(move || compactor_loop(&engine))
.map_err(|e| Error::io("spawn compaction thread", e))?
};
Ok(Lsm {
engine,
compactor: Some(compactor),
})
}
pub fn put(&self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Result<()> {
self.engine.put(key.as_ref(), value.as_ref())
}
pub fn delete(&self, key: impl AsRef<[u8]>) -> Result<()> {
self.engine.delete(key.as_ref())
}
pub fn write(&self, batch: Batch) -> Result<()> {
self.engine.write(batch)
}
pub fn get(&self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> {
self.engine.get(key.as_ref())
}
pub fn scan<R>(&self, range: R) -> Result<Scan>
where
R: RangeBounds<Vec<u8>>,
{
self.engine.scan(range)
}
pub fn flush(&self) -> Result<()> {
self.engine.flush()
}
#[cfg(test)]
pub(crate) fn compact_now(&self) -> Result<()> {
self.engine.compact_once()
}
#[cfg(test)]
pub(crate) fn run_count(&self) -> usize {
self.engine.read_guard().runs.len()
}
#[cfg(test)]
pub(crate) fn wait_for_idle(&self) {
let mut state = self
.engine
.compaction
.lock()
.unwrap_or_else(|p| p.into_inner());
while state.pending || state.running {
state = self
.engine
.cond
.wait(state)
.unwrap_or_else(|p| p.into_inner());
}
}
}
impl Drop for Lsm {
fn drop(&mut self) {
{
let mut state = self
.engine
.compaction
.lock()
.unwrap_or_else(|p| p.into_inner());
state.shutdown = true;
}
self.engine.cond.notify_all();
if let Some(handle) = self.compactor.take() {
let _ = handle.join();
}
}
}
impl Engine {
fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let mut inner = self.write_guard();
inner.memtable.put(key.to_vec(), value.to_vec());
self.maybe_flush(&mut inner)
}
fn delete(&self, key: &[u8]) -> Result<()> {
let mut inner = self.write_guard();
inner.memtable.delete(key.to_vec());
self.maybe_flush(&mut inner)
}
fn write(&self, batch: Batch) -> Result<()> {
let mut inner = self.write_guard();
for (key, op) in batch.into_ops() {
match op {
Op::Put(value) => inner.memtable.put(key, value),
Op::Delete => inner.memtable.delete(key),
}
}
self.maybe_flush(&mut inner)
}
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let runs = {
let inner = self.read_guard();
match inner.memtable.get(key) {
Some(Record::Value(value)) => return Ok(Some(value.clone())),
Some(Record::Tombstone) => return Ok(None),
None => inner.runs.clone(),
}
};
for run in &runs {
match run.lookup(key)? {
Some(Record::Value(value)) => return Ok(Some(value)),
Some(Record::Tombstone) => return Ok(None),
None => {}
}
}
Ok(None)
}
fn scan<R>(&self, range: R) -> Result<Scan>
where
R: RangeBounds<Vec<u8>>,
{
let (mem, runs) = {
let inner = self.read_guard();
let mem: Vec<(Vec<u8>, Record)> = inner
.memtable
.iter()
.filter(|(k, _)| matches!(position(&range, k), Pos::In))
.map(|(k, r)| (k.clone(), r.clone()))
.collect();
(mem, inner.runs.clone())
};
let cursors = runs.iter().map(|r| r.cursor()).collect();
let mut out = Vec::new();
for item in Merge::new(mem, cursors) {
let (key, value) = item?;
match position(&range, &key) {
Pos::Below => {}
Pos::In => out.push((key, value)),
Pos::Above => break, }
}
Ok(Scan::new(out))
}
fn flush(&self) -> Result<()> {
let mut inner = self.write_guard();
if inner.memtable.is_empty() {
return Ok(());
}
self.flush_locked(&mut inner)
}
fn maybe_flush(&self, inner: &mut Inner) -> Result<()> {
if !inner.memtable.is_empty()
&& inner.memtable.approx_size() >= self.config.memtable_capacity_bytes()
{
self.flush_locked(inner)?;
}
Ok(())
}
fn flush_locked(&self, inner: &mut Inner) -> Result<()> {
let entries = inner.memtable.take();
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let name = manifest::run_filename(seq);
let tmp = self.dir.join(format!("{name}.tmp"));
let final_path = self.dir.join(&name);
let mut writer = SsTableWriter::create(&tmp)?;
for (key, record) in &entries {
writer.push(key, record)?;
}
writer.finish()?;
fs::rename(&tmp, &final_path).map_err(|e| Error::io("install flushed run", e))?;
let run = Arc::new(SsTable::open(&final_path)?);
let mut new_runs = Vec::with_capacity(inner.runs.len() + 1);
new_runs.push(run);
new_runs.extend(inner.runs.iter().cloned());
let names: Vec<String> = new_runs.iter().map(|r| r.file_name()).collect();
Manifest::store(&self.dir, self.next_seq.load(Ordering::SeqCst), &names)?;
inner.runs = new_runs;
if inner.runs.len() >= self.config.compaction_trigger_runs() {
self.signal_compaction();
}
Ok(())
}
fn compact_once(&self) -> Result<()> {
if self.compacting.swap(true, Ordering::AcqRel) {
return Ok(()); }
let result = self.compact_inner();
self.compacting.store(false, Ordering::Release);
result
}
fn compact_inner(&self) -> Result<()> {
let inputs: Vec<Arc<SsTable>> = {
let inner = self.read_guard();
if inner.runs.len() < 2 {
return Ok(());
}
inner.runs.clone()
};
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let name = manifest::run_filename(seq);
let tmp = self.dir.join(format!("{name}.tmp"));
let final_path = self.dir.join(&name);
{
let mut writer = SsTableWriter::create(&tmp)?;
let cursors = inputs.iter().map(|r| r.cursor()).collect();
for item in Merge::new(Vec::new(), cursors) {
let (key, value) = item?;
writer.push(&key, &Record::Value(value))?;
}
writer.finish()?;
}
fs::rename(&tmp, &final_path).map_err(|e| Error::io("install compacted run", e))?;
let output = Arc::new(SsTable::open(&final_path)?);
{
let mut inner = self.write_guard();
let mut new_runs: Vec<Arc<SsTable>> = inner
.runs
.iter()
.filter(|r| !inputs.iter().any(|i| Arc::ptr_eq(i, r)))
.cloned()
.collect();
new_runs.push(Arc::clone(&output));
let names: Vec<String> = new_runs.iter().map(|r| r.file_name()).collect();
Manifest::store(&self.dir, self.next_seq.load(Ordering::SeqCst), &names)?;
for input in &inputs {
input.mark_obsolete();
}
inner.runs = new_runs;
}
drop(inputs);
Ok(())
}
fn signal_compaction(&self) {
let mut state = self.compaction.lock().unwrap_or_else(|p| p.into_inner());
state.pending = true;
self.cond.notify_all();
}
fn read_guard(&self) -> RwLockReadGuard<'_, Inner> {
self.inner.read().unwrap_or_else(|p| p.into_inner())
}
fn write_guard(&self) -> RwLockWriteGuard<'_, Inner> {
self.inner.write().unwrap_or_else(|p| p.into_inner())
}
}
fn compactor_loop(engine: &Engine) {
loop {
{
let mut state = engine.compaction.lock().unwrap_or_else(|p| p.into_inner());
while !state.pending && !state.shutdown {
state = engine.cond.wait(state).unwrap_or_else(|p| p.into_inner());
}
if state.shutdown {
return;
}
state.pending = false;
state.running = true;
}
let result = engine.compact_once();
{
let mut state = engine.compaction.lock().unwrap_or_else(|p| p.into_inner());
state.running = false;
state.generation += 1;
if let Err(err) = result {
*engine.last_error.lock().unwrap_or_else(|p| p.into_inner()) = Some(err);
}
engine.cond.notify_all();
}
}
}
enum Pos {
Below,
In,
Above,
}
fn position<R: RangeBounds<Vec<u8>>>(range: &R, key: &[u8]) -> Pos {
let below = match range.start_bound() {
Bound::Included(s) => key < s.as_slice(),
Bound::Excluded(s) => key <= s.as_slice(),
Bound::Unbounded => false,
};
if below {
return Pos::Below;
}
let above = match range.end_bound() {
Bound::Included(e) => key > e.as_slice(),
Bound::Excluded(e) => key >= e.as_slice(),
Bound::Unbounded => false,
};
if above { Pos::Above } else { Pos::In }
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
fn db_no_autocompact() -> (tempfile::TempDir, Lsm) {
let dir = tempfile::tempdir().unwrap();
let db =
Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(usize::MAX)).unwrap();
(dir, db)
}
fn db() -> (tempfile::TempDir, Lsm) {
let dir = tempfile::tempdir().unwrap();
let db = Lsm::open(dir.path()).unwrap();
(dir, db)
}
#[test]
fn test_put_get_roundtrip() {
let (_d, db) = db();
db.put(b"k", b"v").unwrap();
assert_eq!(db.get(b"k").unwrap(), Some(b"v".to_vec()));
}
#[test]
fn test_get_absent_is_none() {
let (_d, db) = db();
assert_eq!(db.get(b"absent").unwrap(), None);
}
#[test]
fn test_overwrite_across_runs() {
let (_d, db) = db_no_autocompact();
db.put(b"k", b"old").unwrap();
db.flush().unwrap();
db.put(b"k", b"new").unwrap();
db.flush().unwrap();
assert_eq!(db.run_count(), 2);
assert_eq!(db.get(b"k").unwrap(), Some(b"new".to_vec()));
}
#[test]
fn test_delete_masks_value_across_runs() {
let (_d, db) = db_no_autocompact();
db.put(b"k", b"v").unwrap();
db.flush().unwrap();
db.delete(b"k").unwrap();
db.flush().unwrap();
assert_eq!(db.get(b"k").unwrap(), None);
}
#[test]
fn test_compaction_merges_to_single_run() {
let (_d, db) = db_no_autocompact();
for i in 0..5u32 {
db.put(format!("k{i}").into_bytes(), format!("v{i}").into_bytes())
.unwrap();
db.flush().unwrap();
}
assert_eq!(db.run_count(), 5);
db.compact_now().unwrap();
assert_eq!(db.run_count(), 1);
for i in 0..5u32 {
assert_eq!(
db.get(format!("k{i}").into_bytes()).unwrap(),
Some(format!("v{i}").into_bytes())
);
}
}
#[test]
fn test_compaction_drops_tombstones_and_keeps_latest() {
let (_d, db) = db_no_autocompact();
db.put(b"keep", b"1").unwrap();
db.put(b"gone", b"x").unwrap();
db.flush().unwrap();
db.put(b"keep", b"2").unwrap(); db.delete(b"gone").unwrap(); db.flush().unwrap();
db.compact_now().unwrap();
assert_eq!(db.run_count(), 1);
assert_eq!(db.get(b"keep").unwrap(), Some(b"2".to_vec()));
assert_eq!(db.get(b"gone").unwrap(), None);
assert_eq!(db.scan(..).unwrap().count(), 1);
}
#[test]
fn test_reopen_reads_all_runs() {
let dir = tempfile::tempdir().unwrap();
{
let db = Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(usize::MAX))
.unwrap();
db.put(b"a", b"1").unwrap();
db.flush().unwrap();
db.put(b"b", b"2").unwrap();
db.flush().unwrap();
db.put(b"a", b"updated").unwrap();
db.flush().unwrap();
}
let db = Lsm::open(dir.path()).unwrap();
assert_eq!(db.get(b"a").unwrap(), Some(b"updated".to_vec()));
assert_eq!(db.get(b"b").unwrap(), Some(b"2".to_vec()));
}
#[test]
fn test_reopen_after_compaction() {
let dir = tempfile::tempdir().unwrap();
{
let db = Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(usize::MAX))
.unwrap();
for i in 0..4u32 {
db.put(format!("k{i}").into_bytes(), b"v").unwrap();
db.flush().unwrap();
}
db.compact_now().unwrap();
assert_eq!(db.run_count(), 1);
}
let db = Lsm::open(dir.path()).unwrap();
assert_eq!(db.run_count(), 1);
assert_eq!(db.scan(..).unwrap().count(), 4);
}
#[test]
fn test_background_compaction_triggers() {
let dir = tempfile::tempdir().unwrap();
let db = Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(3)).unwrap();
for i in 0..10u32 {
db.put(format!("k{i:02}").into_bytes(), b"v").unwrap();
db.flush().unwrap();
}
db.wait_for_idle();
assert!(db.run_count() <= 3, "run count was {}", db.run_count());
for i in 0..10u32 {
assert_eq!(
db.get(format!("k{i:02}").into_bytes()).unwrap(),
Some(b"v".to_vec())
);
}
}
#[test]
fn test_scan_merges_across_runs() {
let (_d, db) = db_no_autocompact();
db.put(b"a", b"old-a").unwrap();
db.put(b"c", b"3").unwrap();
db.flush().unwrap();
db.put(b"a", b"new-a").unwrap();
db.put(b"b", b"2").unwrap();
db.delete(b"c").unwrap();
db.flush().unwrap();
let got: Vec<_> = db.scan(..).unwrap().collect();
assert_eq!(
got,
vec![
(b"a".to_vec(), b"new-a".to_vec()),
(b"b".to_vec(), b"2".to_vec())
]
);
}
#[test]
fn test_scan_bounded_range() {
let (_d, db) = db();
for (k, v) in [("a", "1"), ("b", "2"), ("c", "3"), ("d", "4")] {
db.put(k.as_bytes(), v.as_bytes()).unwrap();
}
let got: Vec<_> = db.scan(b"b".to_vec()..b"d".to_vec()).unwrap().collect();
assert_eq!(
got,
vec![
(b"b".to_vec(), b"2".to_vec()),
(b"c".to_vec(), b"3".to_vec())
]
);
}
#[test]
fn test_empty_value_roundtrips_through_flush() {
let (_d, db) = db_no_autocompact();
db.put(b"k", b"").unwrap();
db.flush().unwrap();
assert_eq!(db.get(b"k").unwrap(), Some(Vec::new()));
db.compact_now().unwrap();
assert_eq!(db.get(b"k").unwrap(), Some(Vec::new()));
}
#[test]
fn test_engine_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Lsm>();
}
}