use std::collections::HashMap;
use std::sync::Arc;
use fail_parallel::FailPointRegistry;
use log::info;
use log::warn;
use object_store::path::Path;
use object_store::ObjectStore;
use rand::RngCore;
use tokio::runtime::Handle;
use crate::admin::Admin;
use crate::batch_write::WriteBatchEventHandler;
use crate::batch_write::WRITE_BATCH_TASK_NAME;
use crate::cached_object_store::CachedObjectStore;
#[cfg(feature = "compaction_filters")]
use crate::compaction_filter::CompactionFilterSupplier;
use crate::compactions_store::CompactionsStore;
use crate::compactor::stats::CompactionStats;
use crate::compactor::CompactorEventHandler;
use crate::compactor::CompactorMessage;
use crate::compactor::SizeTieredCompactionSchedulerSupplier;
use crate::compactor::COMPACTOR_TASK_NAME;
use crate::compactor::{CompactionSchedulerSupplier, Compactor};
use crate::compactor_executor::{TokioCompactionExecutor, TokioCompactionExecutorOptions};
use crate::config::CompactorOptions;
use crate::config::DbReaderOptions;
use crate::config::GarbageCollectorOptions;
use crate::config::{Settings, SstBlockSize};
use crate::db::Db;
use crate::db::DbInner;
use crate::db_cache::SplitCache;
use crate::db_cache::{DbCache, DbCacheWrapper};
use crate::db_reader::DbReader;
use crate::db_state::ManifestCore;
use crate::db_status::{ClosedResultWriter, DbStatusManager};
use crate::dispatcher::MessageHandlerExecutor;
use crate::error::SlateDBError;
use crate::format::sst::{BlockTransformer, SsTableFormat};
use crate::garbage_collector::GarbageCollector;
use crate::garbage_collector::GC_TASK_NAME;
use crate::instrumented_object_store::{InstrumentedObjectStore, ObjectStoreComponent};
use crate::manifest::store::{FenceableManifest, ManifestStore, StoredManifest};
use crate::memtable_flusher::MemtableFlusher;
use crate::merge_operator::MergeOperatorType;
use crate::object_stores::ObjectStoreType;
use crate::object_stores::ObjectStores;
use crate::paths::PathResolver;
use crate::rand::DbRand;
use crate::retrying_object_store::RetryingObjectStore;
use crate::store_provider::DefaultStoreProvider;
use crate::tablestore::TableStore;
use crate::utils::SafeSender;
use crate::utils::WatchableOnceCell;
use slatedb_common::clock::DefaultSystemClock;
use slatedb_common::clock::SystemClock;
use slatedb_common::metrics::MetricLevel;
use slatedb_common::metrics::MetricsRecorder;
use slatedb_common::metrics::MetricsRecorderHelper;
use slatedb_common::metrics::NoopMetricsRecorder;
pub struct DbBuilder<P: Into<Path>> {
path: P,
settings: Settings,
main_object_store: Arc<dyn ObjectStore>,
wal_object_store: Option<Arc<dyn ObjectStore>>,
db_cache: Option<Arc<dyn DbCache>>,
system_clock: Option<Arc<dyn SystemClock>>,
gc_runtime: Option<Handle>,
compactor_builder: Option<CompactorBuilder<Path>>,
gc_builder: Option<GarbageCollectorBuilder<Path>>,
fp_registry: Arc<FailPointRegistry>,
seed: Option<u64>,
sst_block_size: Option<SstBlockSize>,
merge_operator: Option<MergeOperatorType>,
block_transformer: Option<Arc<dyn BlockTransformer>>,
metrics_recorder: Arc<dyn MetricsRecorder>,
}
impl<P: Into<Path>> DbBuilder<P> {
pub fn new(path: P, main_object_store: Arc<dyn ObjectStore>) -> Self {
Self {
path,
main_object_store,
settings: Settings::default(),
wal_object_store: None,
db_cache: default_db_cache(),
system_clock: None,
gc_runtime: None,
compactor_builder: None,
gc_builder: None,
fp_registry: Arc::new(FailPointRegistry::new()),
seed: None,
sst_block_size: None,
merge_operator: None,
block_transformer: None,
metrics_recorder: Arc::new(NoopMetricsRecorder::new()),
}
}
pub fn with_settings(mut self, settings: Settings) -> Self {
if self.compactor_builder.is_some() && settings.compactor_options.is_some() {
warn!("compactor_builder and settings.compactor_options both set; compactor_builder will take precedence");
}
if self.gc_builder.is_some() && settings.garbage_collector_options.is_some() {
warn!("gc_builder and settings.garbage_collector_options both set; gc_builder will take precedence");
}
self.settings = settings;
self
}
pub fn with_wal_object_store(mut self, wal_object_store: Arc<dyn ObjectStore>) -> Self {
self.wal_object_store = Some(wal_object_store);
self
}
pub fn with_db_cache(mut self, db_cache: Arc<dyn DbCache>) -> Self {
self.db_cache = Some(db_cache);
self
}
pub fn with_db_cache_disabled(mut self) -> Self {
self.db_cache = None;
self
}
pub fn with_system_clock(mut self, clock: Arc<dyn SystemClock>) -> Self {
self.system_clock = Some(clock);
self
}
pub fn with_gc_runtime(mut self, gc_runtime: Handle) -> Self {
self.gc_runtime = Some(gc_runtime);
self
}
pub fn with_compactor_builder(mut self, compactor_builder: CompactorBuilder<P>) -> Self {
self.compactor_builder = Some(compactor_builder.into_path_builder());
self
}
pub fn with_gc_builder(mut self, gc_builder: GarbageCollectorBuilder<P>) -> Self {
self.gc_builder = Some(gc_builder.into_path_builder());
self
}
pub fn with_metrics_recorder(mut self, recorder: Arc<dyn MetricsRecorder>) -> Self {
self.metrics_recorder = recorder;
self
}
pub fn with_fp_registry(mut self, fp_registry: Arc<FailPointRegistry>) -> Self {
self.fp_registry = fp_registry;
self
}
pub fn with_seed(mut self, seed: u64) -> Self {
self.seed = Some(seed);
self
}
pub fn with_sst_block_size(mut self, block_size: SstBlockSize) -> Self {
self.sst_block_size = Some(block_size);
self
}
pub fn with_merge_operator(mut self, merge_operator: MergeOperatorType) -> Self {
self.merge_operator = Some(merge_operator);
self
}
pub fn with_block_transformer(mut self, block_transformer: Arc<dyn BlockTransformer>) -> Self {
self.block_transformer = Some(block_transformer);
self
}
pub async fn build(self) -> Result<Db, crate::Error> {
if self.settings.l0_flush_parallelism == 0 {
return Err(crate::Error::invalid(
"invalid configuration: l0_flush_parallelism must be at least 1".into(),
));
}
let path = self.path.into();
let wal_object_store_uri = self.wal_object_store.as_ref().map(|_| String::new());
let rand = Arc::new(self.seed.map(DbRand::new).unwrap_or_default());
let system_clock = self
.system_clock
.unwrap_or_else(|| Arc::new(DefaultSystemClock::new()));
let recorder = MetricsRecorderHelper::new(self.metrics_recorder, MetricLevel::default());
let retrying_main_object_store = instrumented_retrying_object_store(
self.main_object_store,
&recorder,
ObjectStoreComponent::Db,
ObjectStoreType::Main,
rand.clone(),
system_clock.clone(),
);
let retrying_wal_object_store: Option<Arc<dyn ObjectStore>> =
self.wal_object_store.map(|s| {
instrumented_retrying_object_store(
s,
&recorder,
ObjectStoreComponent::Db,
ObjectStoreType::Wal,
rand.clone(),
system_clock.clone(),
)
});
if let Ok(settings_json) = self.settings.to_json_string() {
info!(
"opening SlateDB database [path={}, settings={}]",
path, settings_json
);
} else {
info!(
"opening SlateDB database [path={}, settings={:?}]",
path, self.settings
);
}
let block_format = {
#[cfg(test)]
{
self.settings.block_format
}
#[cfg(not(test))]
{
None
}
};
let sst_format = SsTableFormat {
min_filter_keys: self.settings.min_filter_keys,
filter_bits_per_key: self.settings.filter_bits_per_key,
compression_codec: self.settings.compression_codec,
block_size: self.sst_block_size.unwrap_or_default().as_bytes(),
block_transformer: self.block_transformer.clone(),
block_format,
..SsTableFormat::default()
};
let cached_object_store = CachedObjectStore::from_config(
retrying_main_object_store.clone(),
&self.settings.object_store_cache_options,
&recorder,
system_clock.clone(),
rand.clone(),
)
.await?;
let maybe_cached_main_object_store: Arc<dyn ObjectStore> = match &cached_object_store {
Some(cached_store) => cached_store.clone(),
None => retrying_main_object_store.clone(),
};
let manifest_store = Arc::new(ManifestStore::new(
&path,
maybe_cached_main_object_store.clone(),
));
let compactions_store = Arc::new(CompactionsStore::new(
&path,
maybe_cached_main_object_store.clone(),
));
let latest_manifest =
StoredManifest::try_load(manifest_store.clone(), system_clock.clone()).await?;
if let Some(latest_manifest) = &latest_manifest {
if latest_manifest.db_state().wal_object_store_uri != wal_object_store_uri {
return Err(SlateDBError::WalStoreReconfigurationError.into());
}
}
let external_ssts = match &latest_manifest {
Some(latest_stored_manifest) => latest_stored_manifest.manifest().external_ssts(),
None => HashMap::new(),
};
let path_resolver = PathResolver::new_with_external_ssts(path.clone(), external_ssts);
let table_store = Arc::new(TableStore::new_with_fp_registry(
ObjectStores::new(
maybe_cached_main_object_store.clone(),
retrying_wal_object_store.clone(),
),
sst_format.clone(),
path_resolver.clone(),
self.fp_registry.clone(),
self.db_cache.as_ref().map(|c| {
Arc::new(DbCacheWrapper::new(
c.clone(),
&recorder,
system_clock.clone(),
)) as Arc<dyn DbCache>
}),
));
let replay_after_wal_id = match &latest_manifest {
Some(latest_stored_manifest) => latest_stored_manifest.db_state().replay_after_wal_id,
None => 0,
};
let next_wal_id = table_store.next_wal_sst_id(replay_after_wal_id).await?;
let stored_manifest = match latest_manifest {
Some(manifest) => manifest,
None => {
let state = ManifestCore::new_with_wal_object_store(wal_object_store_uri);
StoredManifest::create_new_db(manifest_store.clone(), state, system_clock.clone())
.await?
}
};
let mut manifest = FenceableManifest::init_writer(
stored_manifest,
self.settings.manifest_update_timeout,
system_clock.clone(),
)
.await?;
let manifest_dirty = manifest.prepare_dirty()?;
let status_manager = DbStatusManager::new_with_manifest(
manifest_dirty.value.core.last_l0_seq,
manifest_dirty.clone().into(),
);
let reader = status_manager.result_reader();
let (write_tx, write_rx) = SafeSender::unbounded_channel(reader);
let memtable_flusher = Arc::new(MemtableFlusher::new(&status_manager));
let inner = Arc::new(
DbInner::new(
self.settings.clone(),
system_clock.clone(),
rand.clone(),
table_store.clone(),
manifest_dirty,
Arc::clone(&memtable_flusher),
write_tx,
recorder.clone(),
self.fp_registry.clone(),
self.merge_operator.clone(),
status_manager.clone(),
)
.await?,
);
if inner.wal_enabled {
inner.fence_writers(&mut manifest, next_wal_id).await?;
}
let tokio_handle = Handle::current();
let task_executor = Arc::new(MessageHandlerExecutor::new(
Arc::new(status_manager),
system_clock.clone(),
));
if inner.wal_enabled {
inner.wal_buffer.init(task_executor.clone()).await?;
};
task_executor.add_handler(
WRITE_BATCH_TASK_NAME.to_string(),
Box::new(WriteBatchEventHandler::new(inner.clone())),
write_rx,
&tokio_handle,
)?;
let uncached_table_store = Arc::new(TableStore::new_with_fp_registry(
ObjectStores::new(
retrying_main_object_store.clone(),
retrying_wal_object_store.clone(),
),
sst_format,
path_resolver.clone(),
self.fp_registry.clone(),
None,
));
let compactor_builder = self.compactor_builder.or_else(|| {
self.settings.compactor_options.as_ref().map(|opts| {
CompactorBuilder::new(path.clone(), retrying_main_object_store.clone())
.with_options(opts.clone())
})
});
if let Some(compactor_builder) = compactor_builder {
let mut builder = compactor_builder
.with_system_clock(system_clock.clone())
.with_recorder(recorder.clone())
.with_seed(rand.rng().next_u64());
if let Some(operator) = self.merge_operator {
builder = builder.with_merge_operator(operator);
}
let (handler, rx) = builder
.build_handler(
uncached_table_store.clone(),
manifest_store.clone(),
compactions_store.clone(),
)
.await?;
task_executor.add_handler(
COMPACTOR_TASK_NAME.to_string(),
Box::new(handler),
rx,
&tokio_handle,
)?;
}
let gc_builder = self.gc_builder.or_else(|| {
self.settings
.garbage_collector_options
.filter(|opts| !opts.is_empty())
.map(|opts| {
GarbageCollectorBuilder::new(path.clone(), retrying_main_object_store.clone())
.with_options(opts)
})
});
if let Some(gc_builder) = gc_builder {
let gc = gc_builder
.with_system_clock(system_clock.clone())
.with_recorder(recorder.clone())
.with_seed(rand.rng().next_u64())
.build_collector(
uncached_table_store.clone(),
manifest_store.clone(),
compactions_store.clone(),
);
let (_, rx) = async_channel::unbounded();
let gc_handle = self.gc_runtime.as_ref().unwrap_or(&tokio_handle);
task_executor.add_handler(GC_TASK_NAME.to_string(), Box::new(gc), rx, gc_handle)?;
}
memtable_flusher.start(
inner.clone(),
manifest,
&tokio_handle,
&task_executor,
&inner.status_manager,
)?;
task_executor.monitor_on(&tokio_handle)?;
inner.replay_wal().await?;
if let Some(cached_obj_store) = cached_object_store {
inner
.preload_cache(&cached_obj_store, &path_resolver)
.await?;
}
Ok(Db {
inner,
task_executor,
})
}
}
pub struct AdminBuilder<P: Into<Path>> {
path: P,
main_object_store: Arc<dyn ObjectStore>,
wal_object_store: Option<Arc<dyn ObjectStore>>,
system_clock: Arc<dyn SystemClock>,
rand: Arc<DbRand>,
#[cfg(feature = "compaction_filters")]
compaction_filter_supplier: Option<Arc<dyn CompactionFilterSupplier>>,
}
impl<P: Into<Path>> AdminBuilder<P> {
pub fn new(path: P, main_object_store: Arc<dyn ObjectStore>) -> Self {
Self {
path,
main_object_store,
wal_object_store: None,
system_clock: Arc::new(DefaultSystemClock::new()),
rand: Arc::new(DbRand::default()),
#[cfg(feature = "compaction_filters")]
compaction_filter_supplier: None,
}
}
pub fn with_system_clock(mut self, system_clock: Arc<dyn SystemClock>) -> Self {
self.system_clock = system_clock;
self
}
pub fn with_wal_object_store(mut self, wal_object_store: Arc<dyn ObjectStore>) -> Self {
self.wal_object_store = Some(wal_object_store);
self
}
pub fn with_seed(mut self, seed: u64) -> Self {
self.rand = Arc::new(DbRand::new(seed));
self
}
#[cfg(feature = "compaction_filters")]
pub fn with_compaction_filter_supplier(
mut self,
supplier: Arc<dyn CompactionFilterSupplier>,
) -> Self {
self.compaction_filter_supplier = Some(supplier);
self
}
pub fn build(self) -> Admin {
Admin {
path: self.path.into(),
object_stores: ObjectStores::new(self.main_object_store, self.wal_object_store),
system_clock: self.system_clock,
rand: self.rand,
#[cfg(feature = "compaction_filters")]
compaction_filter_supplier: self.compaction_filter_supplier,
}
}
}
pub struct GarbageCollectorBuilder<P: Into<Path>> {
path: P,
main_object_store: Arc<dyn ObjectStore>,
wal_object_store: Option<Arc<dyn ObjectStore>>,
options: GarbageCollectorOptions,
recorder: MetricsRecorderHelper,
system_clock: Arc<dyn SystemClock>,
rand: Arc<DbRand>,
}
impl<P: Into<Path>> GarbageCollectorBuilder<P> {
pub fn new(path: P, main_object_store: Arc<dyn ObjectStore>) -> Self {
Self {
path,
main_object_store,
wal_object_store: None,
options: GarbageCollectorOptions::default(),
recorder: MetricsRecorderHelper::noop(),
system_clock: Arc::new(DefaultSystemClock::default()),
rand: Arc::new(DbRand::default()),
}
}
pub fn into_path_builder(self) -> GarbageCollectorBuilder<Path> {
GarbageCollectorBuilder {
path: self.path.into(),
main_object_store: self.main_object_store,
wal_object_store: self.wal_object_store,
options: self.options,
recorder: self.recorder,
system_clock: self.system_clock,
rand: self.rand,
}
}
pub fn with_options(mut self, options: GarbageCollectorOptions) -> Self {
self.options = options;
self
}
pub fn with_metrics_recorder(mut self, recorder: Arc<dyn MetricsRecorder>) -> Self {
self.recorder = MetricsRecorderHelper::new(recorder, MetricLevel::default());
self
}
pub(crate) fn with_recorder(mut self, recorder: MetricsRecorderHelper) -> Self {
self.recorder = recorder;
self
}
pub fn with_system_clock(mut self, system_clock: Arc<dyn SystemClock>) -> Self {
self.system_clock = system_clock;
self
}
pub fn with_seed(mut self, seed: u64) -> Self {
self.rand = Arc::new(DbRand::new(seed));
self
}
pub fn with_wal_object_store(mut self, wal_object_store: Arc<dyn ObjectStore>) -> Self {
self.wal_object_store = Some(wal_object_store);
self
}
pub(crate) fn build_collector(
self,
table_store: Arc<TableStore>,
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
) -> GarbageCollector {
GarbageCollector::new(
manifest_store,
compactions_store,
table_store,
self.options,
&self.recorder,
self.system_clock,
)
}
pub fn build(self) -> GarbageCollector {
let path: Path = self.path.into();
let retrying_main_object_store = instrumented_retrying_object_store(
self.main_object_store,
&self.recorder,
ObjectStoreComponent::Gc,
ObjectStoreType::Main,
self.rand.clone(),
self.system_clock.clone(),
);
let retrying_wal_object_store = self.wal_object_store.map(|s| {
instrumented_retrying_object_store(
s,
&self.recorder,
ObjectStoreComponent::Gc,
ObjectStoreType::Wal,
self.rand.clone(),
self.system_clock.clone(),
)
});
let manifest_store = Arc::new(ManifestStore::new(
&path,
retrying_main_object_store.clone(),
));
let compactions_store = Arc::new(CompactionsStore::new(
&path,
retrying_main_object_store.clone(),
));
let table_store = Arc::new(TableStore::new(
ObjectStores::new(
retrying_main_object_store,
retrying_wal_object_store.clone(),
),
SsTableFormat::default(), path,
None, ));
GarbageCollector::new(
manifest_store,
compactions_store,
table_store,
self.options,
&self.recorder,
self.system_clock,
)
}
}
pub struct CompactorBuilder<P: Into<Path>> {
path: P,
main_object_store: Arc<dyn ObjectStore>,
compaction_runtime: Handle,
options: CompactorOptions,
scheduler_supplier: Option<Arc<dyn CompactionSchedulerSupplier>>,
rand: Arc<DbRand>,
recorder: MetricsRecorderHelper,
system_clock: Arc<dyn SystemClock>,
closed_result: Arc<dyn ClosedResultWriter>,
merge_operator: Option<MergeOperatorType>,
block_transformer: Option<Arc<dyn BlockTransformer>>,
#[cfg(feature = "compaction_filters")]
compaction_filter_supplier: Option<Arc<dyn CompactionFilterSupplier>>,
}
#[allow(unused)]
impl<P: Into<Path>> CompactorBuilder<P> {
pub fn new(path: P, main_object_store: Arc<dyn ObjectStore>) -> Self {
Self {
path,
main_object_store,
compaction_runtime: Handle::current(),
options: CompactorOptions::default(),
scheduler_supplier: None,
rand: Arc::new(DbRand::default()),
recorder: MetricsRecorderHelper::noop(),
system_clock: Arc::new(DefaultSystemClock::default()),
closed_result: Arc::new(WatchableOnceCell::new()),
merge_operator: None,
block_transformer: None,
#[cfg(feature = "compaction_filters")]
compaction_filter_supplier: None,
}
}
pub fn into_path_builder(self) -> CompactorBuilder<Path> {
CompactorBuilder {
path: self.path.into(),
main_object_store: self.main_object_store,
compaction_runtime: self.compaction_runtime,
options: self.options,
scheduler_supplier: self.scheduler_supplier,
rand: self.rand,
recorder: self.recorder,
system_clock: self.system_clock,
closed_result: self.closed_result,
merge_operator: self.merge_operator,
block_transformer: self.block_transformer,
#[cfg(feature = "compaction_filters")]
compaction_filter_supplier: self.compaction_filter_supplier,
}
}
#[allow(unused)]
pub fn with_runtime(mut self, compaction_runtime: Handle) -> Self {
self.compaction_runtime = compaction_runtime;
self
}
pub fn with_options(mut self, options: CompactorOptions) -> Self {
self.options = options;
self
}
pub fn with_metrics_recorder(mut self, recorder: Arc<dyn MetricsRecorder>) -> Self {
self.recorder = MetricsRecorderHelper::new(recorder, MetricLevel::default());
self
}
pub(crate) fn with_recorder(mut self, recorder: MetricsRecorderHelper) -> Self {
self.recorder = recorder;
self
}
pub fn with_system_clock(mut self, system_clock: Arc<dyn SystemClock>) -> Self {
self.system_clock = system_clock;
self
}
pub fn with_seed(mut self, seed: u64) -> Self {
self.rand = Arc::new(DbRand::new(seed));
self
}
pub fn with_scheduler_supplier(
mut self,
scheduler_supplier: Arc<dyn CompactionSchedulerSupplier>,
) -> Self {
self.scheduler_supplier = Some(scheduler_supplier);
self
}
pub fn with_merge_operator(mut self, merge_operator: MergeOperatorType) -> Self {
self.merge_operator = Some(merge_operator);
self
}
pub fn with_block_transformer(mut self, block_transformer: Arc<dyn BlockTransformer>) -> Self {
self.block_transformer = Some(block_transformer);
self
}
#[cfg(feature = "compaction_filters")]
pub fn with_compaction_filter_supplier(
mut self,
supplier: Arc<dyn CompactionFilterSupplier>,
) -> Self {
self.compaction_filter_supplier = Some(supplier);
self
}
pub fn build(self) -> Compactor {
let path: Path = self.path.into();
let retrying_main_object_store = instrumented_retrying_object_store(
self.main_object_store,
&self.recorder,
ObjectStoreComponent::Compactor,
ObjectStoreType::Main,
self.rand.clone(),
self.system_clock.clone(),
);
let manifest_store = Arc::new(ManifestStore::new(
&path,
retrying_main_object_store.clone(),
));
let compactions_store = Arc::new(CompactionsStore::new(
&path,
retrying_main_object_store.clone(),
));
let sst_format = SsTableFormat {
block_transformer: self.block_transformer.clone(),
..SsTableFormat::default()
};
let table_store = Arc::new(TableStore::new(
ObjectStores::new(retrying_main_object_store, None),
sst_format,
path,
None, ));
let scheduler_supplier = self
.scheduler_supplier
.unwrap_or(Arc::new(SizeTieredCompactionSchedulerSupplier));
Compactor::new(
manifest_store,
compactions_store,
table_store,
self.options,
scheduler_supplier,
self.compaction_runtime,
self.rand,
&self.recorder,
self.system_clock,
self.closed_result,
self.merge_operator,
#[cfg(feature = "compaction_filters")]
self.compaction_filter_supplier,
)
}
pub(crate) async fn build_handler(
self,
table_store: Arc<TableStore>,
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
) -> Result<
(
CompactorEventHandler,
async_channel::Receiver<CompactorMessage>,
),
SlateDBError,
> {
let options = Arc::new(self.options);
let handle = self.compaction_runtime;
let scheduler_supplier = self
.scheduler_supplier
.unwrap_or(Arc::new(SizeTieredCompactionSchedulerSupplier));
let (tx, rx) = async_channel::unbounded();
let scheduler = Arc::from(scheduler_supplier.compaction_scheduler(&options));
let stats = Arc::new(CompactionStats::new(&self.recorder));
let executor = Arc::new(TokioCompactionExecutor::new(
TokioCompactionExecutorOptions {
handle,
options: options.clone(),
worker_tx: tx,
table_store,
rand: self.rand.clone(),
stats: stats.clone(),
clock: self.system_clock.clone(),
manifest_store: manifest_store.clone(),
merge_operator: self.merge_operator,
#[cfg(feature = "compaction_filters")]
compaction_filter_supplier: self.compaction_filter_supplier,
},
));
let handler = CompactorEventHandler::new(
manifest_store,
compactions_store,
options,
scheduler,
executor,
self.rand,
stats,
self.system_clock,
)
.await?;
Ok((handler, rx))
}
}
pub struct DbReaderBuilder<P: Into<Path>> {
path: P,
object_store: Arc<dyn ObjectStore>,
wal_object_store: Option<Arc<dyn ObjectStore>>,
db_cache: Option<Arc<dyn DbCache>>,
checkpoint_id: Option<uuid::Uuid>,
merge_operator: Option<MergeOperatorType>,
block_transformer: Option<Arc<dyn BlockTransformer>>,
options: DbReaderOptions,
system_clock: Arc<dyn SystemClock>,
rand: Arc<DbRand>,
recorder: MetricsRecorderHelper,
}
impl<P: Into<Path>> DbReaderBuilder<P> {
pub fn new(path: P, object_store: Arc<dyn ObjectStore>) -> Self {
Self {
path,
object_store,
wal_object_store: None,
db_cache: default_db_cache(),
checkpoint_id: None,
merge_operator: None,
block_transformer: None,
options: DbReaderOptions::default(),
system_clock: Arc::new(DefaultSystemClock::default()),
rand: Arc::new(DbRand::default()),
recorder: MetricsRecorderHelper::noop(),
}
}
pub fn with_checkpoint_id(mut self, checkpoint_id: uuid::Uuid) -> Self {
self.checkpoint_id = Some(checkpoint_id);
self
}
pub fn with_wal_object_store(mut self, wal_object_store: Arc<dyn ObjectStore>) -> Self {
self.wal_object_store = Some(wal_object_store);
self
}
pub fn with_merge_operator(mut self, merge_operator: MergeOperatorType) -> Self {
self.merge_operator = Some(merge_operator);
self
}
pub fn with_options(mut self, options: DbReaderOptions) -> Self {
self.options = options;
self
}
pub fn with_db_cache(mut self, db_cache: Arc<dyn DbCache>) -> Self {
self.db_cache = Some(db_cache);
self
}
pub fn with_db_cache_disabled(mut self) -> Self {
self.db_cache = None;
self
}
pub fn with_system_clock(mut self, system_clock: Arc<dyn SystemClock>) -> Self {
self.system_clock = system_clock;
self
}
pub fn with_seed(mut self, seed: u64) -> Self {
self.rand = Arc::new(DbRand::new(seed));
self
}
pub fn with_metrics_recorder(mut self, recorder: Arc<dyn MetricsRecorder>) -> Self {
self.recorder = MetricsRecorderHelper::new(recorder, MetricLevel::default());
self
}
pub fn with_block_transformer(mut self, block_transformer: Arc<dyn BlockTransformer>) -> Self {
self.block_transformer = Some(block_transformer);
self
}
pub async fn build(self) -> Result<DbReader, crate::Error> {
let path = self.path.into();
let wal_object_store_uri = self.wal_object_store.as_ref().map(|_| String::new());
let retrying_object_store = instrumented_retrying_object_store(
self.object_store,
&self.recorder,
ObjectStoreComponent::Reader,
ObjectStoreType::Main,
self.rand.clone(),
self.system_clock.clone(),
);
let retrying_wal_object_store: Option<Arc<dyn ObjectStore>> =
self.wal_object_store.map(|s| {
instrumented_retrying_object_store(
s,
&self.recorder,
ObjectStoreComponent::Reader,
ObjectStoreType::Wal,
self.rand.clone(),
self.system_clock.clone(),
)
});
let maybe_cached = CachedObjectStore::from_config(
retrying_object_store.clone(),
&self.options.object_store_cache_options,
&self.recorder,
self.system_clock.clone(),
self.rand.clone(),
)
.await?;
let object_store: Arc<dyn ObjectStore> = match &maybe_cached {
Some(cached) => Arc::clone(cached) as Arc<dyn ObjectStore>,
None => retrying_object_store,
};
let manifest_store = Arc::new(ManifestStore::new(&path, object_store.clone()));
let latest_manifest =
StoredManifest::try_load(manifest_store, self.system_clock.clone()).await?;
if let Some(latest_manifest) = &latest_manifest {
if latest_manifest.db_state().wal_object_store_uri != wal_object_store_uri {
return Err(SlateDBError::WalStoreReconfigurationError.into());
}
}
let store_provider = DefaultStoreProvider {
path: path.clone(),
object_store,
wal_object_store: retrying_wal_object_store,
block_cache: self.db_cache.clone(),
block_transformer: self.block_transformer.clone(),
};
let reader = DbReader::open_internal(
&store_provider,
self.checkpoint_id,
self.merge_operator,
self.options,
self.system_clock,
self.rand,
self.recorder,
)
.await
.map_err(crate::Error::from)?;
if let Some(cached) = &maybe_cached {
reader.preload_cache(cached, path).await?;
}
Ok(reader)
}
}
fn default_db_cache() -> Option<Arc<dyn DbCache>> {
let block_cache = default_block_cache();
let meta_cache = default_meta_cache();
Some(Arc::new(
SplitCache::new()
.with_block_cache(block_cache)
.with_meta_cache(meta_cache)
.build(),
) as Arc<dyn DbCache>)
}
fn instrumented_retrying_object_store(
object_store: Arc<dyn ObjectStore>,
recorder: &MetricsRecorderHelper,
component: ObjectStoreComponent,
store_type: ObjectStoreType,
rand: Arc<DbRand>,
system_clock: Arc<dyn SystemClock>,
) -> Arc<dyn ObjectStore> {
let instrumented: Arc<dyn ObjectStore> = Arc::new(InstrumentedObjectStore::new(
object_store,
recorder,
component,
store_type,
));
Arc::new(RetryingObjectStore::new(instrumented, rand, system_clock))
}
#[allow(unreachable_code)]
pub(crate) fn default_block_cache() -> Option<Arc<dyn DbCache>> {
#[cfg(feature = "foyer")]
{
return Some(Arc::new(crate::db_cache::foyer::FoyerCache::new_with_opts(
crate::db_cache::foyer::FoyerCacheOptions {
max_capacity: crate::db_cache::DEFAULT_BLOCK_CACHE_CAPACITY,
..Default::default()
},
)));
}
#[cfg(feature = "moka")]
{
return Some(Arc::new(crate::db_cache::moka::MokaCache::new_with_opts(
crate::db_cache::moka::MokaCacheOptions {
max_capacity: crate::db_cache::DEFAULT_BLOCK_CACHE_CAPACITY,
time_to_live: None,
time_to_idle: None,
},
)));
}
None
}
#[allow(unreachable_code)]
pub(crate) fn default_meta_cache() -> Option<Arc<dyn DbCache>> {
#[cfg(feature = "foyer")]
{
return Some(Arc::new(crate::db_cache::foyer::FoyerCache::new_with_opts(
crate::db_cache::foyer::FoyerCacheOptions {
max_capacity: crate::db_cache::DEFAULT_META_CACHE_CAPACITY,
..Default::default()
},
)));
}
#[cfg(feature = "moka")]
{
return Some(Arc::new(crate::db_cache::moka::MokaCache::new_with_opts(
crate::db_cache::moka::MokaCacheOptions {
max_capacity: crate::db_cache::DEFAULT_META_CACHE_CAPACITY,
time_to_live: None,
time_to_idle: None,
},
)));
}
None
}
#[cfg(test)]
mod tests {
use crate::config::Settings;
use crate::error::ErrorKind;
use crate::garbage_collector::stats::GC_COUNT;
use crate::instrumented_object_store::stats::REQUEST_COUNT as OBJECT_STORE_REQUEST_COUNT;
use object_store::memory::InMemory;
use slatedb_common::metrics::{
lookup_metric, lookup_metric_with_labels, DefaultMetricsRecorder,
};
use std::sync::Arc;
fn object_store_labels(
component: &'static str,
store_type: &'static str,
op: &'static str,
api: &'static str,
) -> [(&'static str, &'static str); 4] {
[
("component", component),
("store_type", store_type),
("op", op),
("api", api),
]
}
#[tokio::test]
async fn test_db_builder_starts_gc_by_default() {
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let db = crate::Db::builder(
"test_db_builder_starts_gc_by_default",
Arc::new(InMemory::new()),
)
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.expect("failed to build db");
assert!(
lookup_metric(&metrics_recorder, GC_COUNT).is_some(),
"GC should be initialized by default"
);
db.close().await.expect("failed to close db");
}
#[tokio::test]
async fn test_db_builder_disables_gc_when_gc_options_are_none() {
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let db = crate::Db::builder(
"test_db_builder_disables_gc_when_gc_options_are_none",
Arc::new(InMemory::new()),
)
.with_settings(Settings {
garbage_collector_options: None,
..Settings::default()
})
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.expect("failed to build db");
assert!(
lookup_metric(&metrics_recorder, GC_COUNT).is_none(),
"GC should not be initialized when options are None"
);
db.close().await.expect("failed to close db");
}
#[tokio::test]
async fn test_db_builder_rejects_zero_l0_flush_parallelism() {
let result = crate::Db::builder(
"test_db_builder_rejects_zero_l0_flush_parallelism",
Arc::new(InMemory::new()),
)
.with_settings(Settings {
l0_flush_parallelism: 0,
..Settings::default()
})
.build()
.await;
let err = match result {
Ok(_) => panic!("expected invalid l0_flush_parallelism to fail"),
Err(err) => err,
};
assert!(matches!(err.kind(), ErrorKind::Invalid));
assert!(
err.to_string()
.contains("l0_flush_parallelism must be at least 1"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn test_shared_recorder_registers_object_store_metrics_for_db_gc_and_compactor() {
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let path = "test_shared_recorder_registers_object_store_metrics_for_db_gc_and_compactor";
let object_store = Arc::new(InMemory::new());
let db = crate::Db::builder(path, object_store.clone())
.with_settings(Settings {
compactor_options: None,
garbage_collector_options: None,
..Settings::default()
})
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.expect("failed to build db");
let gc = crate::GarbageCollectorBuilder::new(path, object_store.clone())
.with_metrics_recorder(metrics_recorder.clone())
.build();
let _compactor = crate::CompactorBuilder::new(path, object_store)
.with_metrics_recorder(metrics_recorder.clone())
.build();
db.put(b"k1", b"v1")
.await
.expect("failed to write db value");
db.flush().await.expect("failed to flush db");
gc.run_gc_once().await;
assert!(lookup_metric_with_labels(
&metrics_recorder,
OBJECT_STORE_REQUEST_COUNT,
&object_store_labels("db", "main", "put", "put"),
)
.is_some_and(|count| count > 0));
assert_eq!(
lookup_metric_with_labels(
&metrics_recorder,
OBJECT_STORE_REQUEST_COUNT,
&object_store_labels("gc", "main", "put", "put"),
),
Some(0)
);
assert_eq!(
lookup_metric_with_labels(
&metrics_recorder,
OBJECT_STORE_REQUEST_COUNT,
&object_store_labels("compactor", "main", "put", "put"),
),
Some(0)
);
db.close().await.expect("failed to close db");
}
}