pub(crate) mod compaction;
pub(crate) mod encoding;
pub(crate) mod engine;
pub(crate) mod manifest;
pub(crate) mod memtable;
pub(crate) mod sstable;
pub(crate) mod wal;
use std::path::Path;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use engine::{Engine, EngineConfig, EngineError};
use thiserror::Error;
use tracing::{debug, error, info};
pub type KeyValue = (Vec<u8>, Vec<u8>);
pub use compaction::CompactionStrategyType;
pub struct DbConfig {
pub write_buffer_size: usize,
pub compaction_strategy: CompactionStrategyType,
pub min_compaction_threshold: usize,
pub max_compaction_threshold: usize,
pub tombstone_compaction_ratio: f64,
pub tombstone_compaction_interval: usize,
pub tombstone_bloom_fallback: bool,
pub tombstone_range_drop: bool,
pub thread_pool_size: usize,
}
impl Default for DbConfig {
fn default() -> Self {
Self {
write_buffer_size: 64 * 1024,
compaction_strategy: CompactionStrategyType::Stcs,
min_compaction_threshold: 4,
max_compaction_threshold: 32,
tombstone_compaction_ratio: 0.3,
tombstone_compaction_interval: 0,
tombstone_bloom_fallback: true,
tombstone_range_drop: true,
thread_pool_size: 2,
}
}
}
impl DbConfig {
fn validate(&self) -> Result<(), DbError> {
if self.write_buffer_size < 1024 || self.write_buffer_size > 256 * 1024 * 1024 {
return Err(DbError::InvalidConfig(
"write_buffer_size must be in [1024, 268435456]".into(),
));
}
if self.min_compaction_threshold < 2 || self.min_compaction_threshold > 64 {
return Err(DbError::InvalidConfig(
"min_compaction_threshold must be in [2, 64]".into(),
));
}
if self.max_compaction_threshold < self.min_compaction_threshold
|| self.max_compaction_threshold > 256
{
return Err(DbError::InvalidConfig(
"max_compaction_threshold must be in [min_compaction_threshold, 256]".into(),
));
}
if self.tombstone_compaction_ratio <= 0.0 || self.tombstone_compaction_ratio > 1.0 {
return Err(DbError::InvalidConfig(
"tombstone_compaction_ratio must be in (0.0, 1.0]".into(),
));
}
if self.tombstone_compaction_interval > 604_800 {
return Err(DbError::InvalidConfig(
"tombstone_compaction_interval must be in [0, 604800]".into(),
));
}
if self.thread_pool_size < 1 || self.thread_pool_size > 32 {
return Err(DbError::InvalidConfig(
"thread_pool_size must be in [1, 32]".into(),
));
}
Ok(())
}
fn to_engine_config(&self) -> EngineConfig {
EngineConfig {
write_buffer_size: self.write_buffer_size,
compaction_strategy: self.compaction_strategy,
bucket_low: 0.5,
bucket_high: 1.5,
min_sstable_size: 50,
min_threshold: self.min_compaction_threshold,
max_threshold: self.max_compaction_threshold,
tombstone_ratio_threshold: self.tombstone_compaction_ratio,
tombstone_compaction_interval: self.tombstone_compaction_interval,
tombstone_bloom_fallback: self.tombstone_bloom_fallback,
tombstone_range_drop: self.tombstone_range_drop,
thread_pool_size: self.thread_pool_size,
}
}
}
#[derive(Debug, Error)]
pub enum DbError {
#[error("database is closed")]
Closed,
#[error("invalid config: {0}")]
InvalidConfig(String),
#[error("invalid argument: {0}")]
InvalidArgument(String),
#[error("{0}")]
Engine(#[from] EngineError),
}
struct BackgroundPool {
sender: crossbeam::channel::Sender<Box<dyn FnOnce() + Send>>,
workers: Vec<thread::JoinHandle<()>>,
}
pub struct Db {
engine: Engine,
bg: Mutex<Option<BackgroundPool>>,
closed: AtomicBool,
}
impl std::fmt::Debug for Db {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Db")
.field("closed", &self.closed.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl Db {
pub fn open(path: impl AsRef<Path>, config: DbConfig) -> Result<Self, DbError> {
config.validate()?;
let pool_size = config.thread_pool_size;
let engine_config = config.to_engine_config();
let engine = Engine::open(&path, engine_config)?;
let (sender, receiver) = crossbeam::channel::unbounded::<Box<dyn FnOnce() + Send>>();
let mut workers = Vec::with_capacity(pool_size);
for id in 0..pool_size {
let rx = receiver.clone();
let handle = thread::Builder::new()
.name(format!("aeternusdb-bg-{id}"))
.spawn(move || {
while let Ok(task) = rx.recv() {
task();
}
})
.map_err(|e| {
DbError::Engine(EngineError::Internal(format!(
"failed to spawn background thread {id}: {e}"
)))
})?;
workers.push(handle);
}
drop(receiver);
info!(path = %path.as_ref().display(), pool_size, "database opened");
Ok(Self {
engine,
bg: Mutex::new(Some(BackgroundPool { sender, workers })),
closed: AtomicBool::new(false),
})
}
pub fn close(&self) -> Result<(), DbError> {
if self.closed.swap(true, Ordering::AcqRel) {
return Ok(()); }
self.shutdown_pool();
self.engine.close()?;
info!("database closed");
Ok(())
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), DbError> {
self.check_open()?;
if key.is_empty() {
return Err(DbError::InvalidArgument("key must not be empty".into()));
}
if value.is_empty() {
return Err(DbError::InvalidArgument("value must not be empty".into()));
}
let frozen = self.engine.put(key.to_vec(), value.to_vec())?;
if frozen {
self.schedule_flush();
}
Ok(())
}
pub fn delete(&self, key: &[u8]) -> Result<(), DbError> {
self.check_open()?;
if key.is_empty() {
return Err(DbError::InvalidArgument("key must not be empty".into()));
}
let frozen = self.engine.delete(key.to_vec())?;
if frozen {
self.schedule_flush();
}
Ok(())
}
pub fn delete_range(&self, start: &[u8], end: &[u8]) -> Result<(), DbError> {
self.check_open()?;
if start.is_empty() || end.is_empty() {
return Err(DbError::InvalidArgument(
"start and end keys must not be empty".into(),
));
}
if start >= end {
return Err(DbError::InvalidArgument(
"start must be less than end".into(),
));
}
let frozen = self.engine.delete_range(start.to_vec(), end.to_vec())?;
if frozen {
self.schedule_flush();
}
Ok(())
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, DbError> {
self.check_open()?;
if key.is_empty() {
return Err(DbError::InvalidArgument("key must not be empty".into()));
}
Ok(self.engine.get(key.to_vec())?)
}
pub fn scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<KeyValue>, DbError> {
self.check_open()?;
if start.is_empty() || end.is_empty() {
return Err(DbError::InvalidArgument(
"start and end keys must not be empty".into(),
));
}
if start >= end {
return Ok(Vec::new());
}
let results: Vec<_> = self.engine.scan(start, end)?.collect();
Ok(results)
}
pub fn major_compact(&self) -> Result<bool, DbError> {
self.check_open()?;
Ok(self.engine.major_compact()?)
}
fn check_open(&self) -> Result<(), DbError> {
if self.closed.load(Ordering::Acquire) {
return Err(DbError::Closed);
}
Ok(())
}
fn schedule_flush(&self) {
let guard = self.bg.lock().unwrap();
if let Some(bg) = guard.as_ref() {
let engine = self.engine.clone();
let _ = bg.sender.send(Box::new(move || {
match engine.flush_oldest_frozen() {
Ok(true) => debug!("background: flushed frozen memtable"),
Ok(false) => return,
Err(e) => {
error!("background flush failed: {e}");
return;
}
}
loop {
match engine.minor_compact() {
Ok(true) => debug!("background: minor compaction round"),
Ok(false) => break,
Err(e) => {
error!("background minor compaction failed: {e}");
break;
}
}
}
match engine.tombstone_compact() {
Ok(true) => debug!("background: tombstone compaction"),
Ok(false) => {}
Err(e) => {
error!("background tombstone compaction failed: {e}");
}
}
}));
}
}
fn shutdown_pool(&self) {
if let Some(bg) = self.bg.lock().unwrap().take() {
drop(bg.sender);
for worker in bg.workers {
let _ = worker.join();
}
}
}
}
impl Drop for Db {
fn drop(&mut self) {
if !self.closed.load(Ordering::Acquire) {
self.shutdown_pool();
let _ = self.engine.close();
}
}
}