use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use thiserror::Error;
use crate::manifest::{Manifest, ManifestError, ManifestSstEntry};
use crate::memtable::{FrozenMemtable, Memtable, MemtableError, MemtableGetResult};
use crate::sstable::{self, SSTable, SSTableError};
mod encoding_impls;
pub mod utils;
mod visibility;
pub use utils::{PointEntry, RangeTombstone, Record, RecordEntry};
pub use visibility::VisibilityFilter;
#[cfg(test)]
mod tests;
pub const MANIFEST_DIR: &str = "manifest";
pub const MEMTABLE_DIR: &str = "memtables";
pub const SSTABLE_DIR: &str = "sstables";
#[derive(Debug, Error)]
pub enum EngineError {
#[error("Manifest error: {0}")]
Manifest(#[from] ManifestError),
#[error("Memtable error: {0}")]
Memtable(#[from] MemtableError),
#[error("SSTable error: {0}")]
SSTable(#[from] SSTableError),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Internal error: {0}")]
Internal(String),
}
pub struct EngineConfig {
pub write_buffer_size: usize,
pub compaction_strategy: crate::compaction::CompactionStrategyType,
pub bucket_low: f64,
pub bucket_high: f64,
pub min_sstable_size: usize,
pub min_threshold: usize,
pub max_threshold: usize,
pub tombstone_ratio_threshold: f64,
pub tombstone_compaction_interval: usize,
pub tombstone_bloom_fallback: bool,
pub tombstone_range_drop: bool,
pub thread_pool_size: usize,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
write_buffer_size: 64 * 1024,
compaction_strategy: crate::compaction::CompactionStrategyType::Stcs,
bucket_low: 0.5,
bucket_high: 1.5,
min_sstable_size: 50,
min_threshold: 4,
max_threshold: 32,
tombstone_ratio_threshold: 0.3,
tombstone_compaction_interval: 0,
tombstone_bloom_fallback: true,
tombstone_range_drop: true,
thread_pool_size: 2,
}
}
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct EngineStats {
pub frozen_count: usize,
pub sstables_count: usize,
pub total_sst_size_bytes: u64,
pub sst_sizes: Vec<u64>,
}
struct EngineInner {
manifest: Manifest,
active: Memtable,
frozen: Vec<Arc<FrozenMemtable>>,
sstables: Vec<Arc<SSTable>>,
data_dir: PathBuf,
config: EngineConfig,
}
pub struct Engine {
inner: Arc<RwLock<EngineInner>>,
}
impl Clone for Engine {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl Engine {
fn read_lock(&self) -> Result<std::sync::RwLockReadGuard<'_, EngineInner>, EngineError> {
self.inner
.read()
.map_err(|_| EngineError::Internal("RwLock poisoned".into()))
}
fn write_lock(&self) -> Result<std::sync::RwLockWriteGuard<'_, EngineInner>, EngineError> {
self.inner
.write()
.map_err(|_| EngineError::Internal("RwLock poisoned".into()))
}
fn write_with_retry(
inner: &mut EngineInner,
mut op: impl FnMut(&mut Memtable) -> Result<(), MemtableError>,
) -> Result<bool, EngineError> {
match op(&mut inner.active) {
Ok(()) => Ok(false),
Err(MemtableError::FlushRequired) => {
Self::freeze_active(inner)?;
op(&mut inner.active)?;
let max_lsn = inner.active.max_lsn().unwrap_or(0);
inner.manifest.update_lsn(max_lsn)?;
Ok(true)
}
Err(e) => Err(e.into()),
}
}
pub fn open(path: impl AsRef<Path>, config: EngineConfig) -> Result<Self, EngineError> {
let base = path.as_ref();
let manifest_dir = base.join(MANIFEST_DIR);
let memtable_dir = base.join(MEMTABLE_DIR);
let sstable_dir = base.join(SSTABLE_DIR);
fs::create_dir_all(&manifest_dir)?;
fs::create_dir_all(&memtable_dir)?;
fs::create_dir_all(&sstable_dir)?;
let manifest = Manifest::open(&manifest_dir)?;
let manifest_last_lsn = manifest.get_last_lsn()?;
let active_wal_nr = manifest.get_active_wal()?;
let active_wal_path = memtable_dir.join(format!("{:06}.log", active_wal_nr));
let memtable = Memtable::new(active_wal_path, None, config.write_buffer_size)?;
let frozen_wals = manifest.get_frozen_wals()?;
let mut frozen_memtables = Vec::new();
for wal_nr in frozen_wals {
let frozen_wal_path = memtable_dir.join(format!("{:06}.log", wal_nr));
let memtable = Memtable::new(frozen_wal_path, None, config.write_buffer_size)?;
frozen_memtables.push(memtable.frozen()?);
}
let sstables = manifest.get_sstables()?;
for entry in fs::read_dir(&sstable_dir)? {
let entry = entry?;
let file_path = entry.path();
if file_path.is_file()
&& file_path.extension().and_then(|s| s.to_str()) == Some("sst")
&& let Some(file_name) = file_path.file_name().and_then(|s| s.to_str())
&& let Some(id) = file_name
.strip_suffix(".sst")
.and_then(|s| s.parse::<u64>().ok())
&& !sstables.iter().any(|entry| entry.id == id)
{
fs::remove_file(&file_path)?;
}
}
let mut sstable_handles = Vec::new();
for sstable_entry in sstables {
let mut sstable = SSTable::open(&sstable_entry.path)?;
sstable.set_id(sstable_entry.id);
sstable_handles.push(sstable);
}
let mut max_lsn = manifest_last_lsn;
if memtable.max_lsn().unwrap_or(0) > max_lsn {
max_lsn = memtable.max_lsn().unwrap_or(0);
}
for frozen in frozen_memtables.iter() {
if frozen.max_lsn().unwrap_or(0) > max_lsn {
max_lsn = frozen.max_lsn().unwrap_or(0);
}
}
for sstable in sstable_handles.iter() {
if sstable.max_lsn() > max_lsn {
max_lsn = sstable.max_lsn();
}
}
if memtable.max_lsn().unwrap_or(0) != max_lsn {
memtable.inject_max_lsn(max_lsn + 1);
}
frozen_memtables.sort_by_key(|f| std::cmp::Reverse(f.wal_seq()));
sstable_handles.sort_by_key(|s| std::cmp::Reverse(s.max_lsn()));
let inner = EngineInner {
manifest,
active: memtable,
frozen: frozen_memtables.into_iter().map(Arc::new).collect(),
sstables: sstable_handles.into_iter().map(Arc::new).collect(),
data_dir: base.to_path_buf(),
config,
};
Ok(Self {
inner: Arc::new(RwLock::new(inner)),
})
}
pub fn close(&self) -> Result<(), EngineError> {
let mut inner = self.write_lock()?;
while !inner.frozen.is_empty() {
Self::flush_frozen_to_sstable_inner(&mut inner)?;
}
let max_lsn = inner.active.max_lsn().unwrap_or(0);
inner.manifest.update_lsn(max_lsn)?;
inner.manifest.checkpoint()?;
let manifest_dir = inner.data_dir.join(MANIFEST_DIR);
let memtable_dir = inner.data_dir.join(MEMTABLE_DIR);
let sstable_dir = inner.data_dir.join(SSTABLE_DIR);
for dir_path in [&manifest_dir, &memtable_dir, &sstable_dir] {
if let Ok(dir) = fs::File::open(dir_path) {
dir.sync_all()?;
}
}
if let Ok(root) = fs::File::open(&inner.data_dir) {
root.sync_all()?;
}
Ok(())
}
pub fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<bool, EngineError> {
let mut inner = self.write_lock()?;
tracing::trace!(key_len = key.len(), value_len = value.len(), "engine put");
Self::write_with_retry(&mut inner, |active| active.put(key.clone(), value.clone()))
}
pub fn delete(&self, key: Vec<u8>) -> Result<bool, EngineError> {
let mut inner = self.write_lock()?;
tracing::trace!(key_len = key.len(), "engine delete");
Self::write_with_retry(&mut inner, |active| active.delete(key.clone()))
}
pub fn delete_range(&self, start_key: Vec<u8>, end_key: Vec<u8>) -> Result<bool, EngineError> {
let mut inner = self.write_lock()?;
tracing::trace!(
start_len = start_key.len(),
end_len = end_key.len(),
"engine delete_range"
);
Self::write_with_retry(&mut inner, |active| {
active.delete_range(start_key.clone(), end_key.clone())
})
}
pub fn get(&self, key: Vec<u8>) -> Result<Option<Vec<u8>>, EngineError> {
tracing::trace!(key_len = key.len(), "engine get");
let inner = self.read_lock()?;
match inner.active.get(&key)? {
MemtableGetResult::Put(value) => return Ok(Some(value)),
MemtableGetResult::Delete | MemtableGetResult::RangeDelete => return Ok(None),
MemtableGetResult::NotFound => {}
}
for frozen in &inner.frozen {
match frozen.get(&key)? {
MemtableGetResult::Put(value) => return Ok(Some(value)),
MemtableGetResult::Delete | MemtableGetResult::RangeDelete => {
return Ok(None);
}
MemtableGetResult::NotFound => {}
}
}
let mut best_sst: Option<sstable::GetResult> = None;
let mut best_lsn: u64 = 0;
for sst in &inner.sstables {
if sst.max_lsn() <= best_lsn {
break;
}
match sst.get(&key)? {
sstable::GetResult::NotFound => {}
result => {
let lsn = result.lsn();
if lsn > best_lsn {
best_lsn = lsn;
best_sst = Some(result);
}
}
}
}
match best_sst {
Some(sstable::GetResult::Put { value, .. }) => Ok(Some(value)),
Some(sstable::GetResult::Delete { .. } | sstable::GetResult::RangeDelete { .. }) => {
Ok(None)
}
_ => Ok(None),
}
}
pub fn scan(
&self,
start_key: &[u8],
end_key: &[u8],
) -> Result<impl Iterator<Item = (Vec<u8>, Vec<u8>)>, EngineError> {
tracing::trace!(
start_len = start_key.len(),
end_len = end_key.len(),
"engine scan"
);
let merged = self.raw_scan(start_key, end_key)?;
Ok(VisibilityFilter::new(merged))
}
fn raw_scan(
&self,
start_key: &[u8],
end_key: &[u8],
) -> Result<utils::MergeIterator<'static>, EngineError> {
let (active_records, frozen_snapshot, sstable_snapshot) = {
let inner = self.read_lock()?;
let active_records: Vec<_> = inner.active.scan(start_key, end_key)?.collect();
let frozen: Vec<Arc<FrozenMemtable>> = inner.frozen.iter().map(Arc::clone).collect();
let sstables: Vec<Arc<SSTable>> = inner.sstables.iter().map(Arc::clone).collect();
(active_records, frozen, sstables)
};
let mut iters: Vec<Box<dyn Iterator<Item = Record>>> = Vec::new();
iters.push(Box::new(active_records.into_iter()));
for fm in &frozen_snapshot {
let records: Vec<_> = fm.scan(start_key, end_key)?.collect();
iters.push(Box::new(records.into_iter()));
}
for sst in &sstable_snapshot {
let scan = SSTable::scan_owned(sst, start_key, end_key)?;
iters.push(Box::new(scan));
}
Ok(utils::MergeIterator::new(iters))
}
#[allow(dead_code)]
pub fn stats(&self) -> Result<EngineStats, EngineError> {
let inner = self.read_lock()?;
let sst_sizes: Vec<u64> = inner.sstables.iter().map(|s| s.file_size()).collect();
let total_sst_size_bytes: u64 = sst_sizes.iter().sum();
Ok(EngineStats {
frozen_count: inner.frozen.len(),
sstables_count: inner.sstables.len(),
total_sst_size_bytes,
sst_sizes,
})
}
fn freeze_active(inner: &mut EngineInner) -> Result<(), EngineError> {
let frozen_wal_id = inner.active.wal_seq();
let current_max_lsn = inner.active.max_lsn().unwrap_or(0);
let new_active_wal_id = frozen_wal_id + 1;
let wal_path = inner
.data_dir
.join(MEMTABLE_DIR)
.join(format!("{:06}.log", new_active_wal_id));
let new_active = Memtable::new(wal_path, None, inner.config.write_buffer_size)?;
let old_active = std::mem::replace(&mut inner.active, new_active);
let frozen = old_active.frozen()?;
inner.frozen.insert(0, Arc::new(frozen));
inner.active.inject_max_lsn(current_max_lsn);
inner.manifest.add_frozen_wal(frozen_wal_id)?;
inner.manifest.set_active_wal(new_active_wal_id)?;
Ok(())
}
pub fn flush_oldest_frozen(&self) -> Result<bool, EngineError> {
let mut inner = self.write_lock()?;
if inner.frozen.is_empty() {
return Ok(false);
}
Self::flush_frozen_to_sstable_inner(&mut inner)?;
Ok(true)
}
#[allow(dead_code)]
pub fn flush_all_frozen(&self) -> Result<usize, EngineError> {
let mut inner = self.write_lock()?;
let mut count = 0usize;
while !inner.frozen.is_empty() {
Self::flush_frozen_to_sstable_inner(&mut inner)?;
count += 1;
}
Ok(count)
}
fn next_sstable_id(inner: &mut EngineInner) -> Result<u64, EngineError> {
Ok(inner.manifest.allocate_sst_id()?)
}
fn flush_frozen_to_sstable_inner(inner: &mut EngineInner) -> Result<(), EngineError> {
if inner.frozen.is_empty() {
return Ok(());
}
let frozen = inner
.frozen
.pop()
.ok_or_else(|| EngineError::Internal("frozen list became empty unexpectedly".into()))?;
let frozen_wal_id = frozen.wal_seq();
let mut point_entries = Vec::new();
let mut range_tombstones = Vec::new();
for record in frozen.iter_for_flush()? {
match record.into_entry() {
RecordEntry::Point(pe) => point_entries.push(pe),
RecordEntry::Range(rt) => range_tombstones.push(rt),
}
}
let sstable_id = Self::next_sstable_id(inner)?;
let sstable_path = inner
.data_dir
.join(SSTABLE_DIR)
.join(format!("{:06}.sst", sstable_id));
let point_count = point_entries.len();
let range_count = range_tombstones.len();
sstable::SstWriter::new(&sstable_path).build(
point_entries.into_iter(),
point_count,
range_tombstones.into_iter(),
range_count,
)?;
let mut sstable = SSTable::open(&sstable_path)?;
sstable.set_id(sstable_id);
inner.sstables.insert(0, Arc::new(sstable));
inner.manifest.add_sstable(ManifestSstEntry {
id: sstable_id,
path: sstable_path,
})?;
inner.manifest.remove_frozen_wal(frozen_wal_id)?;
Ok(())
}
fn run_compaction(
&self,
strategy: &dyn crate::compaction::CompactionStrategy,
) -> Result<bool, EngineError> {
let mut inner = self.write_lock()?;
let inner = &mut *inner; let sst_count = inner.sstables.len();
let data_dir_str = inner.data_dir.to_string_lossy();
let result = strategy
.compact(
&inner.sstables,
&mut inner.manifest,
&data_dir_str,
&inner.config,
)
.map_err(|e| EngineError::Internal(format!("Compaction failed: {e}")))?;
match result {
None => {
tracing::debug!(sst_count, "compaction strategy found nothing to do");
Ok(false)
}
Some(cr) => {
tracing::info!(
sst_count_before = sst_count,
removed = cr.removed_ids.len(),
new_id = ?cr.new_sst_id,
"compaction applied"
);
Self::apply_compaction_result(inner, cr)?;
Ok(true)
}
}
}
fn compact_with(
&self,
selector: fn(
&crate::compaction::CompactionStrategyType,
) -> Box<dyn crate::compaction::CompactionStrategy>,
) -> Result<bool, EngineError> {
let strategy = {
let inner = self.read_lock()?;
selector(&inner.config.compaction_strategy)
};
self.run_compaction(strategy.as_ref())
}
pub fn minor_compact(&self) -> Result<bool, EngineError> {
self.compact_with(crate::compaction::CompactionStrategyType::minor)
}
pub fn tombstone_compact(&self) -> Result<bool, EngineError> {
self.compact_with(crate::compaction::CompactionStrategyType::tombstone)
}
pub fn major_compact(&self) -> Result<bool, EngineError> {
self.compact_with(crate::compaction::CompactionStrategyType::major)
}
fn apply_compaction_result(
inner: &mut EngineInner,
cr: crate::compaction::CompactionResult,
) -> Result<(), EngineError> {
inner
.sstables
.retain(|sst| !cr.removed_ids.contains(&sst.id()));
if let Some(ref path) = cr.new_sst_path {
let mut new_sst = SSTable::open(path)?;
new_sst.set_id(cr.new_sst_id.unwrap_or(0));
inner.sstables.push(Arc::new(new_sst));
}
inner
.sstables
.sort_by_key(|s| std::cmp::Reverse(s.max_lsn()));
Ok(())
}
}