#![doc = include_str!("../README.md")]
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
pub use self::caches::Caches;
pub use self::migrations::{
DefaultVersionProvider, MigrationError, Migrations, Semver, VersionProvider,
};
pub use self::table::{
BoundedCfHandle, ColumnFamily, ColumnFamilyOptions, Table, UnboundedCfHandle,
};
pub use rocksdb;
mod caches;
#[cfg(feature = "metrics")]
mod metrics;
mod migrations;
mod table;
#[macro_export]
macro_rules! tables {
($(#[$($meta:tt)*])* $pub:vis struct $ident:ident<$context:ty> {
$($field_pub:vis $field:ident: $table:ty),+$(,)?
}) => {
$(#[$($meta)*])*
$pub struct $ident {
$($field_pub $field: $crate::Table<$table>,)*
}
impl $crate::Tables for $ident {
type Context = $context;
fn define(builder: $crate::WeeDbRawBuilder<Self::Context>) -> $crate::WeeDbRawBuilder<Self::Context> {
builder$(.with_table::<$table>())*
}
fn instantiate(db: &$crate::WeeDbRaw) -> Self {
Self {
$($field: db.instantiate_table(),)*
}
}
fn column_families(&self) -> impl IntoIterator<Item = $crate::ColumnFamilyDescr<'_>> {
[$($crate::ColumnFamilyDescr {
name: <$table as $crate::ColumnFamily>::NAME,
cf: self.$field.cf(),
}),*]
}
}
};
}
pub trait Tables: Send + Sync {
type Context: AsRef<Caches>;
fn define(builder: WeeDbRawBuilder<Self::Context>) -> WeeDbRawBuilder<Self::Context>;
fn instantiate(db: &WeeDbRaw) -> Self;
fn column_families(&self) -> impl IntoIterator<Item = ColumnFamilyDescr<'_>>;
}
pub struct ColumnFamilyDescr<'a> {
pub name: &'static str,
pub cf: BoundedCfHandle<'a>,
}
pub struct WeeDbBuilder<T: Tables> {
inner: WeeDbRawBuilder<T::Context>,
}
impl<T: Tables> WeeDbBuilder<T> {
pub fn new<P: AsRef<Path>>(path: P, context: T::Context) -> Self {
Self {
inner: WeeDbRawBuilder::new(path, context),
}
}
pub fn with_name(mut self, name: &'static str) -> Self {
self.inner = self.inner.with_name(name);
self
}
pub fn with_options<F>(mut self, f: F) -> Self
where
F: FnOnce(&mut rocksdb::Options, &mut T::Context),
{
self.inner = self.inner.with_options(f);
self
}
#[cfg(feature = "metrics")]
pub fn with_metrics_enabled(mut self, enabled: bool) -> Self {
self.inner = self.inner.with_metrics_enabled(enabled);
self
}
pub fn read_only(mut self, options: ReadOnly) -> Self {
self.inner = self.inner.read_only(options);
self
}
pub fn secondary(mut self, options: Secondary) -> Self {
self.inner = self.inner.secondary(options);
self
}
pub fn with_ttl_support(mut self, default_cf_ttl: Duration) -> Self {
self.inner = self.inner.with_ttl_support(default_cf_ttl);
self
}
#[allow(unused_mut)]
pub fn build(mut self) -> Result<WeeDb<T>, rocksdb::Error> {
let raw = self.inner.with_tables::<T>().build()?;
Ok(WeeDb {
inner: Arc::new(WeeDbInner {
tables: T::instantiate(&raw),
raw,
}),
})
}
}
#[repr(transparent)]
pub struct WeeDb<T> {
inner: Arc<WeeDbInner<T>>,
}
impl<T: Tables> WeeDb<T> {
pub fn builder<P: AsRef<Path>>(path: P, context: T::Context) -> WeeDbBuilder<T> {
WeeDbBuilder::new(path, context)
}
pub async fn trigger_compaction(&self) {
let mut compaction_options = rocksdb::CompactOptions::default();
compaction_options.set_exclusive_manual_compaction(true);
compaction_options
.set_bottommost_level_compaction(rocksdb::BottommostLevelCompaction::ForceOptimized);
for table in self.inner.tables.column_families() {
tracing::info!(cf = table.name, "compaction started");
let instant = Instant::now();
let bound = Option::<[u8; 0]>::None;
self.rocksdb()
.compact_range_cf_opt(&table.cf, bound, bound, &compaction_options);
tracing::info!(
cf = table.name,
elapsed_sec = %instant.elapsed().as_secs_f64(),
"compaction finished"
);
}
}
}
impl<T> WeeDb<T> {
pub fn tables(&self) -> &T {
&self.inner.tables
}
pub fn rocksdb(&self) -> &Arc<rocksdb::DB> {
self.inner.raw.rocksdb()
}
pub fn owned_snapshot(&self) -> OwnedSnapshot {
OwnedSnapshot::new(self.rocksdb().clone())
}
pub fn raw(&self) -> &WeeDbRaw {
&self.inner.raw
}
#[inline]
pub fn db_name(&self) -> Option<&str> {
self.inner.raw.db_name()
}
#[inline]
pub fn caches(&self) -> &Caches {
self.inner.raw.caches()
}
pub fn get_memory_usage_stats(&self) -> Result<Stats, rocksdb::Error> {
self.inner.raw.get_memory_usage_stats()
}
#[cfg(feature = "metrics")]
pub fn refresh_metrics(&self) {
self.inner.raw.refresh_metrics();
}
}
impl<T> std::fmt::Debug for WeeDb<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let raw = self.inner.raw.as_ref();
f.debug_struct("WeeDb")
.field("db_name", &raw.inner.db_name)
.field("tables", &raw.inner.cf_names)
.finish()
}
}
impl<T> Clone for WeeDb<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> AsRef<WeeDbRaw> for WeeDb<T> {
#[inline]
fn as_ref(&self) -> &WeeDbRaw {
&self.inner.raw
}
}
impl<T> std::ops::Deref for WeeDb<T> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
&self.inner.tables
}
}
struct WeeDbInner<T> {
tables: T,
raw: WeeDbRaw,
}
impl<T> Drop for WeeDbInner<T> {
fn drop(&mut self) {
self.raw.rocksdb().cancel_all_background_work(true);
}
}
pub struct WeeDbRawBuilder<C> {
path: PathBuf,
options: rocksdb::Options,
context: C,
descriptors: Vec<rocksdb::ColumnFamilyDescriptor>,
cf_names: Vec<&'static str>,
db_name: Option<&'static str>,
#[cfg(feature = "metrics")]
metrics_enabled: bool,
open_mode: OpenMode,
}
#[derive(Default)]
enum OpenMode {
#[default]
Primary,
ReadOnly(ReadOnly),
Secondary(Secondary),
WithTtlSupport(Duration),
}
impl<C: AsRef<Caches>> WeeDbRawBuilder<C> {
pub fn new<P: AsRef<Path>>(path: P, context: C) -> Self {
Self {
path: path.as_ref().to_path_buf(),
options: Default::default(),
context,
descriptors: Default::default(),
cf_names: Default::default(),
db_name: None,
#[cfg(feature = "metrics")]
metrics_enabled: false,
open_mode: Default::default(),
}
}
pub fn with_name(mut self, name: &'static str) -> Self {
self.db_name = Some(name);
self
}
pub fn with_options<F>(mut self, f: F) -> Self
where
F: FnOnce(&mut rocksdb::Options, &mut C),
{
f(&mut self.options, &mut self.context);
self
}
pub fn with_table<T>(mut self) -> Self
where
T: ColumnFamilyOptions<C>,
{
let mut opts = Default::default();
T::options(&mut opts, &mut self.context);
let ttl = T::ttl();
self.descriptors
.push(rocksdb::ColumnFamilyDescriptor::new_with_ttl(
T::NAME,
opts,
ttl,
));
self.cf_names.push(T::NAME);
self
}
pub fn with_tables<T: Tables<Context = C>>(self) -> Self {
T::define(self)
}
#[cfg(feature = "metrics")]
pub fn with_metrics_enabled(mut self, enabled: bool) -> Self {
self.metrics_enabled = enabled;
self
}
pub fn read_only(mut self, options: ReadOnly) -> Self {
self.open_mode = OpenMode::ReadOnly(options);
self
}
pub fn secondary(mut self, options: Secondary) -> Self {
self.open_mode = OpenMode::Secondary(options);
self
}
pub fn with_ttl_support(mut self, default_cf_ttl: Duration) -> Self {
self.open_mode = OpenMode::WithTtlSupport(default_cf_ttl);
self
}
#[allow(unused_mut)]
pub fn build(mut self) -> Result<WeeDbRaw, rocksdb::Error> {
#[cfg(feature = "metrics")]
if self.metrics_enabled {
self.options.enable_statistics();
self.options
.set_statistics_level(rocksdb::statistics::StatsLevel::ExceptDetailedTimers);
}
let rocksdb = match self.open_mode {
OpenMode::Primary => {
rocksdb::DB::open_cf_descriptors(&self.options, self.path, self.descriptors)
}
OpenMode::ReadOnly(options) => rocksdb::DB::open_cf_descriptors_read_only(
&self.options,
self.path,
self.descriptors,
options.error_if_log_file_exist,
),
OpenMode::Secondary(options) => rocksdb::DB::open_cf_descriptors_as_secondary(
&self.options,
self.path,
options.secondary_path,
self.descriptors,
),
OpenMode::WithTtlSupport(default_cf_ttl) => rocksdb::DB::open_cf_descriptors_with_ttl(
&self.options,
self.path,
self.descriptors,
default_cf_ttl,
),
}
.map(Arc::new)?;
let db = WeeDbRawInner {
rocksdb,
caches: self.context.as_ref().clone(),
db_name: self.db_name,
cf_names: self.cf_names,
#[cfg(feature = "metrics")]
options: self.options,
#[cfg(feature = "metrics")]
metrics_enabled: self.metrics_enabled,
};
#[cfg(feature = "metrics")]
if self.metrics_enabled {
db.register_metrics();
}
Ok(WeeDbRaw {
inner: Arc::new(db),
})
}
}
#[derive(Default, Clone, Copy)]
pub struct ReadOnly {
pub error_if_log_file_exist: bool,
}
#[derive(Clone)]
pub struct Secondary {
pub secondary_path: PathBuf,
}
#[derive(Clone)]
#[repr(transparent)]
pub struct WeeDbRaw {
inner: Arc<WeeDbRawInner>,
}
impl WeeDbRaw {
pub fn builder<P: AsRef<Path>, C: AsRef<Caches>>(path: P, context: C) -> WeeDbRawBuilder<C> {
WeeDbRawBuilder::new(path, context)
}
pub fn downgrade(this: &Self) -> WeakWeeDbRaw {
WeakWeeDbRaw {
inner: Arc::downgrade(&this.inner),
}
}
pub fn ptr_eq(this: &Self, other: &Self) -> bool {
Arc::ptr_eq(&this.inner, &other.inner)
}
pub fn instantiate_table<T: ColumnFamily>(&self) -> Table<T> {
Table::new(self.inner.rocksdb.clone())
}
#[inline]
pub fn rocksdb(&self) -> &Arc<rocksdb::DB> {
&self.inner.rocksdb
}
#[inline]
pub fn is_same_instance(&self, db: &Arc<rocksdb::DB>) -> bool {
Arc::ptr_eq(&self.inner.rocksdb, db)
}
pub fn owned_snapshot(&self) -> OwnedSnapshot {
OwnedSnapshot::new(self.rocksdb().clone())
}
#[inline]
pub fn db_name(&self) -> Option<&'static str> {
self.inner.db_name
}
pub fn cf_names(&self) -> &[&'static str] {
&self.inner.cf_names
}
#[inline]
pub fn caches(&self) -> &Caches {
&self.inner.caches
}
pub fn get_memory_usage_stats(&self) -> Result<Stats, rocksdb::Error> {
self.inner.get_memory_usage_stats()
}
#[cfg(feature = "metrics")]
pub fn refresh_metrics(&self) {
self.inner.refresh_metrics();
}
pub fn compact(&self) {
let mut compaction_options = rocksdb::CompactOptions::default();
compaction_options.set_exclusive_manual_compaction(true);
compaction_options
.set_bottommost_level_compaction(rocksdb::BottommostLevelCompaction::ForceOptimized);
for &cf_name in &self.inner.cf_names {
let Some(cf) = self.rocksdb().cf_handle(cf_name) else {
tracing::warn!(cf = cf_name, "unknown column family");
continue;
};
tracing::info!(cf = cf_name, "compaction started");
let instant = Instant::now();
let bound = Option::<[u8; 0]>::None;
self.rocksdb()
.compact_range_cf_opt(&cf, bound, bound, &compaction_options);
tracing::info!(
cf = cf_name,
elapsed_sec = %instant.elapsed().as_secs_f64(),
"compaction finished"
);
}
}
}
impl std::fmt::Debug for WeeDbRaw {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WeeDbRaw")
.field("db_name", &self.inner.db_name)
.field("cf_names", &self.inner.cf_names)
.finish()
}
}
impl AsRef<WeeDbRaw> for WeeDbRaw {
#[inline]
fn as_ref(&self) -> &WeeDbRaw {
self
}
}
impl Eq for WeeDbRaw {}
impl PartialEq for WeeDbRaw {
#[inline]
fn eq(&self, other: &Self) -> bool {
Self::ptr_eq(self, other)
}
}
#[derive(Clone)]
#[repr(transparent)]
pub struct WeakWeeDbRaw {
inner: Weak<WeeDbRawInner>,
}
impl WeakWeeDbRaw {
pub fn ptr_eq(this: &Self, other: &Self) -> bool {
Weak::ptr_eq(&this.inner, &other.inner)
}
pub fn upgrade(&self) -> Option<WeeDbRaw> {
self.inner.upgrade().map(|inner| WeeDbRaw { inner })
}
}
impl Eq for WeakWeeDbRaw {}
impl PartialEq for WeakWeeDbRaw {
#[inline]
fn eq(&self, other: &Self) -> bool {
Self::ptr_eq(self, other)
}
}
struct WeeDbRawInner {
rocksdb: Arc<rocksdb::DB>,
caches: Caches,
db_name: Option<&'static str>,
cf_names: Vec<&'static str>,
#[cfg(feature = "metrics")]
options: rocksdb::Options,
#[cfg(feature = "metrics")]
metrics_enabled: bool,
}
impl WeeDbRawInner {
fn get_memory_usage_stats(&self) -> Result<Stats, rocksdb::Error> {
let whole_db_stats = rocksdb::perf::get_memory_usage_stats(
Some(&[&self.rocksdb]),
Some(&[&self.caches.block_cache]),
)?;
let block_cache_usage = self.caches.block_cache.get_usage();
let block_cache_pined_usage = self.caches.block_cache.get_pinned_usage();
Ok(Stats {
whole_db_stats,
block_cache_usage,
block_cache_pined_usage,
})
}
}
pub struct Stats {
pub whole_db_stats: rocksdb::perf::MemoryUsageStats,
pub block_cache_usage: usize,
pub block_cache_pined_usage: usize,
}
pub struct OwnedSnapshot {
inner: rocksdb::Snapshot<'static>,
db: Arc<rocksdb::DB>,
}
impl OwnedSnapshot {
pub fn new(db: Arc<rocksdb::DB>) -> Self {
use rocksdb::Snapshot;
unsafe fn extend_lifetime<'a>(r: Snapshot<'a>) -> Snapshot<'static> {
unsafe { std::mem::transmute::<Snapshot<'a>, Snapshot<'static>>(r) }
}
let inner = unsafe { extend_lifetime(db.as_ref().snapshot()) };
Self { inner, db }
}
#[inline]
pub fn db(&self) -> &Arc<rocksdb::DB> {
&self.db
}
}
impl std::ops::Deref for OwnedSnapshot {
type Target = rocksdb::Snapshot<'static>;
#[inline(always)]
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct OwnedRawIterator {
inner: rocksdb::DBRawIterator<'static>,
db: Arc<rocksdb::DB>,
}
impl OwnedRawIterator {
pub unsafe fn new(db: Arc<rocksdb::DB>, iter: rocksdb::DBRawIterator<'_>) -> Self {
use rocksdb::DBRawIterator;
unsafe fn extend_lifetime<'a>(r: DBRawIterator<'a>) -> DBRawIterator<'static> {
unsafe { std::mem::transmute::<DBRawIterator<'a>, DBRawIterator<'static>>(r) }
}
let inner = unsafe { extend_lifetime(iter) };
Self { inner, db }
}
#[inline]
pub fn db(&self) -> &Arc<rocksdb::DB> {
&self.db
}
}
impl std::ops::Deref for OwnedRawIterator {
type Target = rocksdb::DBRawIterator<'static>;
#[inline(always)]
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl std::ops::DerefMut for OwnedRawIterator {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
pub struct OwnedPinnableSlice {
inner: rocksdb::DBPinnableSlice<'static>,
db: Arc<rocksdb::DB>,
}
impl OwnedPinnableSlice {
pub unsafe fn new(db: Arc<rocksdb::DB>, data: rocksdb::DBPinnableSlice<'_>) -> Self {
use rocksdb::DBPinnableSlice;
unsafe fn extend_lifetime<'a>(r: DBPinnableSlice<'a>) -> DBPinnableSlice<'static> {
unsafe { std::mem::transmute::<DBPinnableSlice<'a>, DBPinnableSlice<'static>>(r) }
}
let inner = unsafe { extend_lifetime(data) };
Self { inner, db }
}
#[inline]
pub fn db(&self) -> &Arc<rocksdb::DB> {
&self.db
}
}
impl AsRef<[u8]> for OwnedPinnableSlice {
fn as_ref(&self) -> &[u8] {
self
}
}
impl std::ops::Deref for OwnedPinnableSlice {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[cfg(test)]
mod tests {
use super::*;
struct MyTable;
impl ColumnFamily for MyTable {
const NAME: &'static str = "my_table";
fn read_options(opts: &mut rocksdb::ReadOptions) {
opts.set_verify_checksums(false);
}
fn write_options(_: &mut rocksdb::WriteOptions) {
}
}
impl ColumnFamilyOptions<Caches> for MyTable {
fn options(opts: &mut rocksdb::Options, caches: &mut Caches) {
opts.set_write_buffer_size(128 * 1024 * 1024);
let mut block_factory = rocksdb::BlockBasedOptions::default();
block_factory.set_block_cache(&caches.block_cache);
block_factory.set_data_block_index_type(rocksdb::DataBlockIndexType::BinaryAndHash);
opts.set_block_based_table_factory(&block_factory);
opts.set_optimize_filters_for_hits(true);
}
}
#[test]
fn db_wrapper() -> Result<(), Box<dyn std::error::Error>> {
let tempdir = tempfile::tempdir()?;
tables! {
struct MyTables<Caches> {
pub my_table: MyTable,
}
}
let db = WeeDb::<MyTables>::builder(&tempdir, Caches::default())
.with_name("test")
.with_options(|opts, _| {
opts.create_if_missing(true);
opts.create_missing_column_families(true);
})
.build()?;
let mut migrations = Migrations::with_target_version([0, 1, 0]);
migrations.register([0, 0, 0], [0, 1, 0], |_| {
Ok(())
})?;
db.apply(migrations)?;
db.tables().my_table.insert(b"123", b"321")?;
let value = db.tables().my_table.get(b"123")?;
assert_eq!(value.as_deref(), Some(b"321".as_slice()));
Ok(())
}
#[test]
fn caches_and_builder() -> Result<(), Box<dyn std::error::Error>> {
let tempdir = tempfile::tempdir()?;
let caches = Caches::default();
let db = WeeDbRaw::builder(&tempdir, caches)
.with_name("test")
.with_options(|opts, _| {
opts.set_level_compaction_dynamic_level_bytes(true);
opts.set_zstd_max_train_bytes(32 * 1024 * 1024);
opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
opts.set_log_level(rocksdb::LogLevel::Error);
opts.set_keep_log_file_num(2);
opts.set_recycle_log_file_num(2);
opts.create_if_missing(true);
opts.create_missing_column_families(true);
})
.with_table::<MyTable>() .build()?;
let mut migrations = Migrations::with_target_version([0, 1, 0]);
migrations.register([0, 0, 0], [0, 1, 0], |_| {
Ok(())
})?;
db.apply(migrations)?;
let my_table = db.instantiate_table::<MyTable>();
my_table.insert(b"asd", b"123")?;
assert!(my_table.get(b"asd")?.is_some());
Ok(())
}
}