use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use futures::stream::BoxStream;
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use tokio::runtime::Handle;
use tracing::instrument;
use ulid::Ulid;
#[cfg(feature = "compaction_filters")]
use crate::compaction_filter::CompactionFilterSupplier;
use crate::compactions_store::{CompactionsStore, StoredCompactions};
use crate::compactor::stats::CompactionStats;
use crate::compactor_executor::{
CompactionExecutor, StartCompactionJobArgs, TokioCompactionExecutor,
TokioCompactionExecutorOptions,
};
use crate::compactor_state_protocols::CompactorStateWriter;
use crate::config::CompactorOptions;
use crate::db_state::SortedRun;
use crate::db_status::ClosedResultWriter;
use crate::dispatcher::{MessageFactory, MessageHandler, MessageHandlerExecutor};
use crate::error::{Error, SlateDBError};
use crate::manifest::store::ManifestStore;
use crate::manifest::SsTableHandle;
use crate::merge_operator::MergeOperatorType;
use crate::rand::DbRand;
use crate::tablestore::TableStore;
use crate::utils::{format_bytes_si, IdGenerator};
use slatedb_common::clock::SystemClock;
use slatedb_common::metrics::MetricsRecorderHelper;
pub use crate::compactor_state::{
Compaction, CompactionSpec, CompactionStatus, CompactionsCore, CompactorState, SourceId,
};
pub use crate::compactor_state_protocols::CompactorStateView;
pub use crate::db::builder::CompactorBuilder;
pub use crate::size_tiered_compaction::SizeTieredCompactionSchedulerSupplier;
pub(crate) const COMPACTOR_TASK_NAME: &str = "compactor";
pub trait CompactionSchedulerSupplier: Send + Sync {
fn compaction_scheduler(
&self,
options: &CompactorOptions,
) -> Box<dyn CompactionScheduler + Send + Sync>;
}
pub trait CompactionScheduler: Send + Sync {
fn propose(&self, state: &CompactorStateView) -> Vec<CompactionSpec>;
fn validate(&self, _state: &CompactorStateView, _spec: &CompactionSpec) -> Result<(), Error> {
Ok(())
}
fn generate(
&self,
state: &CompactorStateView,
request: &CompactionRequest,
) -> Result<Vec<CompactionSpec>, Error> {
match request {
CompactionRequest::Spec(spec) => Ok(vec![spec.clone()]),
CompactionRequest::Full => {
let manifest = state.manifest();
let sources = manifest
.compacted
.iter()
.map(|sr| SourceId::SortedRun(sr.id))
.collect::<Vec<_>>();
if sources.is_empty() {
if !manifest.l0.is_empty() {
error!(
"rejected full compaction: L0-only input is invalid because Full excludes L0 SSTs"
);
}
return Err(crate::Error::from(SlateDBError::InvalidCompaction));
}
let destination = manifest
.compacted
.iter()
.map(|sr| sr.id)
.min()
.expect("full compaction requires at least one sorted run");
Ok(vec![CompactionSpec::new(sources, destination)])
}
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum CompactionRequest {
Full,
Spec(CompactionSpec),
}
#[derive(Debug)]
pub(crate) enum CompactorMessage {
CompactionJobFinished {
id: Ulid,
result: Result<SortedRun, SlateDBError>,
},
CompactionJobProgress {
id: Ulid,
bytes_processed: u64,
output_ssts: Vec<SsTableHandle>,
},
LogStats,
PollManifest,
}
#[derive(Clone)]
pub struct Compactor {
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
table_store: Arc<TableStore>,
options: Arc<CompactorOptions>,
scheduler_supplier: Arc<dyn CompactionSchedulerSupplier>,
task_executor: Arc<MessageHandlerExecutor>,
compactor_runtime: Handle,
rand: Arc<DbRand>,
stats: Arc<CompactionStats>,
system_clock: Arc<dyn SystemClock>,
merge_operator: Option<MergeOperatorType>,
#[cfg(feature = "compaction_filters")]
compaction_filter_supplier: Option<Arc<dyn CompactionFilterSupplier>>,
}
impl Compactor {
#[cfg_attr(not(feature = "compaction_filters"), allow(clippy::too_many_arguments))]
pub(crate) fn new(
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
table_store: Arc<TableStore>,
options: CompactorOptions,
scheduler_supplier: Arc<dyn CompactionSchedulerSupplier>,
compactor_runtime: Handle,
rand: Arc<DbRand>,
recorder: &MetricsRecorderHelper,
system_clock: Arc<dyn SystemClock>,
closed_result: Arc<dyn ClosedResultWriter>,
merge_operator: Option<MergeOperatorType>,
#[cfg(feature = "compaction_filters")] compaction_filter_supplier: Option<
Arc<dyn CompactionFilterSupplier>,
>,
) -> Self {
let stats = Arc::new(CompactionStats::new(recorder));
let task_executor = Arc::new(MessageHandlerExecutor::new(
closed_result,
system_clock.clone(),
));
Self {
manifest_store,
compactions_store,
table_store,
options: Arc::new(options),
scheduler_supplier,
task_executor,
compactor_runtime,
rand,
stats,
system_clock,
merge_operator,
#[cfg(feature = "compaction_filters")]
compaction_filter_supplier,
}
}
pub async fn run(&self) -> Result<(), crate::Error> {
let (tx, rx) = async_channel::unbounded();
let scheduler = Arc::from(self.scheduler_supplier.compaction_scheduler(&self.options));
let executor = Arc::new(TokioCompactionExecutor::new(
TokioCompactionExecutorOptions {
handle: self.compactor_runtime.clone(),
options: self.options.clone(),
worker_tx: tx,
table_store: self.table_store.clone(),
rand: self.rand.clone(),
stats: self.stats.clone(),
clock: self.system_clock.clone(),
manifest_store: self.manifest_store.clone(),
merge_operator: self.merge_operator.clone(),
#[cfg(feature = "compaction_filters")]
compaction_filter_supplier: self.compaction_filter_supplier.clone(),
},
));
let handler = CompactorEventHandler::new(
self.manifest_store.clone(),
self.compactions_store.clone(),
self.options.clone(),
scheduler,
executor,
self.rand.clone(),
self.stats.clone(),
self.system_clock.clone(),
)
.await?;
self.task_executor
.add_handler(
COMPACTOR_TASK_NAME.to_string(),
Box::new(handler),
rx,
&Handle::current(),
)
.expect("failed to spawn compactor task");
self.task_executor.monitor_on(&Handle::current())?;
self.task_executor
.join_task(COMPACTOR_TASK_NAME)
.await
.map_err(|e| e.into())
}
pub async fn stop(&self) -> Result<(), crate::Error> {
self.task_executor
.shutdown_task(COMPACTOR_TASK_NAME)
.await
.map_err(|e| e.into())
}
pub(crate) async fn submit(
spec: CompactionSpec,
compactions_store: Arc<CompactionsStore>,
rand: Arc<DbRand>,
system_clock: Arc<dyn SystemClock>,
) -> Result<Ulid, crate::Error> {
let compaction_id = rand.rng().gen_ulid(system_clock.as_ref());
let compaction = Compaction::new(compaction_id, spec);
let mut stored_compactions =
match StoredCompactions::try_load(compactions_store.clone()).await? {
Some(stored) => stored,
None => return Err(crate::Error::from(SlateDBError::InvalidDBState)),
};
loop {
let mut dirty = stored_compactions.prepare_dirty()?;
dirty.value.insert(compaction.clone());
match stored_compactions.update(dirty).await {
Ok(()) => return Ok(compaction_id),
Err(SlateDBError::TransactionalObjectVersionExists) => {
stored_compactions.refresh().await?;
}
Err(err) => return Err(crate::Error::from(err)),
}
}
}
}
pub(crate) struct CompactorEventHandler {
state_writer: CompactorStateWriter,
options: Arc<CompactorOptions>,
scheduler: Arc<dyn CompactionScheduler + Send + Sync>,
executor: Arc<dyn CompactionExecutor + Send + Sync>,
rand: Arc<DbRand>,
stats: Arc<CompactionStats>,
system_clock: Arc<dyn SystemClock>,
}
#[async_trait]
impl MessageHandler<CompactorMessage> for CompactorEventHandler {
fn tickers(&mut self) -> Vec<(Duration, Box<MessageFactory<CompactorMessage>>)> {
vec![
(
self.options.poll_interval,
Box::new(|| CompactorMessage::PollManifest),
),
(
Duration::from_secs(10),
Box::new(|| CompactorMessage::LogStats),
),
]
}
async fn handle(&mut self, message: CompactorMessage) -> Result<(), SlateDBError> {
match message {
CompactorMessage::LogStats => self.handle_log_ticker(),
CompactorMessage::PollManifest => self.handle_ticker().await?,
CompactorMessage::CompactionJobFinished { id, result } => {
match result {
Ok(sr) => self.finish_compaction(id, sr).await?,
Err(err) => {
error!("error executing compaction [error={:#?}]", err);
self.finish_failed_compaction(id).await?;
}
}
self.maybe_schedule_compactions().await?;
self.maybe_start_compactions().await?;
}
CompactorMessage::CompactionJobProgress {
id,
bytes_processed,
output_ssts,
} => {
let compaction = self.state().compactions().value.get(&id);
let update_output_ssts =
compaction.is_some_and(|c| c.output_ssts().len() != output_ssts.len());
self.state_mut().update_compaction(&id, |c| {
c.set_bytes_processed(bytes_processed);
if update_output_ssts {
c.set_output_ssts(output_ssts);
}
});
if update_output_ssts {
self.state_writer.write_compactions_safely().await?;
}
}
}
Ok(())
}
async fn cleanup(
&mut self,
mut _messages: BoxStream<'async_trait, CompactorMessage>,
_result: Result<(), SlateDBError>,
) -> Result<(), SlateDBError> {
self.stop_executor().await?;
Ok(())
}
}
impl CompactorEventHandler {
pub(crate) async fn new(
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
options: Arc<CompactorOptions>,
scheduler: Arc<dyn CompactionScheduler + Send + Sync>,
executor: Arc<dyn CompactionExecutor + Send + Sync>,
rand: Arc<DbRand>,
stats: Arc<CompactionStats>,
system_clock: Arc<dyn SystemClock>,
) -> Result<Self, SlateDBError> {
let state_writer = CompactorStateWriter::new(
manifest_store,
compactions_store,
system_clock.clone(),
options.as_ref(),
rand.clone(),
)
.await?;
Ok(Self {
state_writer,
options,
scheduler,
executor,
rand,
stats,
system_clock,
})
}
fn state(&self) -> &CompactorState {
&self.state_writer.state
}
fn state_mut(&mut self) -> &mut CompactorState {
&mut self.state_writer.state
}
fn handle_log_ticker(&self) {
self.log_compaction_state();
self.log_compaction_throughput();
}
fn log_compaction_throughput(&self) {
let current_time = self.system_clock.now();
let current_time_ms = current_time.timestamp_millis() as u64;
let db_state = self.state().db_state();
let mut total_estimated_bytes = 0u64;
let mut total_bytes_processed = 0u64;
let mut total_elapsed_secs = 0.0f64;
for compaction in self.state().active_compactions() {
let estimated_source_bytes =
Self::calculate_estimated_source_bytes(compaction, db_state);
total_estimated_bytes += estimated_source_bytes;
total_bytes_processed += compaction.bytes_processed();
let start_time_ms = compaction
.id()
.datetime()
.duration_since(std::time::UNIX_EPOCH)
.expect("invalid duration")
.as_millis() as u64;
let elapsed_secs = if start_time_ms > 0 {
(current_time_ms as f64 - start_time_ms as f64) / 1000.0
} else {
0.0
};
total_elapsed_secs += elapsed_secs;
let throughput = if elapsed_secs > 0.0 {
compaction.bytes_processed() as f64 / elapsed_secs
} else {
0.0
};
let percentage = if estimated_source_bytes > 0 {
(compaction.bytes_processed() * 100 / estimated_source_bytes) as u32
} else {
0
};
debug!(
"compaction progress [id={}, progress={}%, processed_bytes={}, estimated_source_bytes={}, elapsed={:.2}s, throughput={}/s]",
compaction.id(),
percentage,
format_bytes_si(compaction.bytes_processed()),
format_bytes_si(estimated_source_bytes),
elapsed_secs,
format_bytes_si(throughput as u64),
);
}
let total_throughput = if total_elapsed_secs > 0.0 {
total_bytes_processed as f64 / total_elapsed_secs
} else {
0.0
};
self.stats
.total_bytes_being_compacted
.set(total_estimated_bytes as i64);
self.stats.total_throughput.set(total_throughput as i64);
}
fn calculate_estimated_source_bytes(
compaction: &Compaction,
db_state: &crate::db_state::ManifestCore,
) -> u64 {
use crate::db_state::{SortedRun, SsTableView};
use std::collections::HashMap;
let views_by_id: HashMap<Ulid, &SsTableView> =
db_state.l0.iter().map(|view| (view.id, view)).collect();
let srs_by_id: HashMap<u32, &SortedRun> =
db_state.compacted.iter().map(|sr| (sr.id, sr)).collect();
compaction
.spec()
.sources()
.iter()
.map(|source| match source {
SourceId::SstView(id) => views_by_id
.get(id)
.expect("compaction source view not found in L0")
.estimate_size(),
SourceId::SortedRun(id) => srs_by_id
.get(id)
.expect("compaction source sorted run not found")
.estimate_size(),
})
.sum()
}
async fn handle_ticker(&mut self) -> Result<(), SlateDBError> {
if !self.is_executor_stopped() {
self.state_writer.refresh().await?;
self.maybe_schedule_compactions().await?;
self.maybe_start_compactions().await?;
}
Ok(())
}
async fn stop_executor(&self) -> Result<(), SlateDBError> {
let this_executor = self.executor.clone();
#[cfg(not(dst))]
#[allow(clippy::disallowed_methods)]
let result = tokio::task::spawn_blocking(move || {
this_executor.stop();
})
.await
.map_err(|_| SlateDBError::CompactionExecutorFailed);
#[cfg(dst)]
let result = tokio::spawn(async move {
this_executor.stop();
})
.await
.map_err(|_| SlateDBError::CompactionExecutorFailed);
result
}
fn is_executor_stopped(&self) -> bool {
self.executor.is_stopped()
}
fn validate_compaction(&self, compaction: &CompactionSpec) -> Result<(), SlateDBError> {
if compaction.sources().is_empty() {
warn!("submitted compaction is empty: {:?}", compaction.sources());
return Err(SlateDBError::InvalidCompaction);
}
let db_state = self.state().db_state();
let l0_view_ids = db_state
.l0
.iter()
.map(|view| view.id)
.collect::<std::collections::HashSet<_>>();
let sr_ids = db_state
.compacted
.iter()
.map(|sr| sr.id)
.collect::<std::collections::HashSet<_>>();
if let Some(missing) = compaction.sources().iter().find(|source| match source {
SourceId::SstView(id) => !l0_view_ids.contains(id),
SourceId::SortedRun(id) => !sr_ids.contains(id),
}) {
warn!("compaction source missing from db state: {:?}", missing);
return Err(SlateDBError::InvalidCompaction);
}
if compaction.has_l0_sources() && !compaction.has_sr_sources() {
let highest_id = self
.state()
.db_state()
.compacted
.first()
.map_or(0, |sr| sr.id + 1);
if compaction.destination() < highest_id {
warn!("compaction destination is lesser than the expected L0-only highest_id: {:?} {:?}",
compaction.destination(), highest_id);
return Err(SlateDBError::InvalidCompaction);
}
}
if compaction.has_l0_sources() {
let running_l0_exists = self
.state()
.compactions_with_status(CompactionStatus::Running)
.any(|c| c.spec().has_l0_sources());
if running_l0_exists {
warn!("rejected compaction: parallel L0 compaction already running");
return Err(SlateDBError::InvalidCompaction);
}
}
self.scheduler
.validate(&self.state().into(), compaction)
.map_err(|_e| SlateDBError::InvalidCompaction)
}
async fn maybe_schedule_compactions(&mut self) -> Result<(), SlateDBError> {
let running_compaction_count = self.running_compaction_count();
let available_capacity = self.options.max_concurrent_compactions - running_compaction_count;
if available_capacity == 0 {
debug!(
"skipping compaction scheduling since at capacity [running_compactions={}, max_concurrent_compactions={}]",
running_compaction_count,
self.options.max_concurrent_compactions
);
return Ok(());
}
let mut specs = self.scheduler.propose(&self.state().into());
let num_specs_added = specs
.drain(..available_capacity.min(specs.len()))
.map(|spec| -> Result<(), SlateDBError> {
let compaction_id = self.rand.rng().gen_ulid(self.system_clock.as_ref());
debug!(
"scheduling new compaction [spec={:?}, id={}]",
spec, compaction_id
);
self.state_mut()
.add_compaction(Compaction::new(compaction_id, spec))?;
Ok(())
})
.count();
if num_specs_added > 0 {
self.state_writer.write_compactions_safely().await?;
}
Ok(())
}
async fn maybe_start_compactions(&mut self) -> Result<(), SlateDBError> {
let submitted_compactions = self
.state()
.compactions_with_status(CompactionStatus::Submitted)
.cloned()
.collect::<Vec<_>>();
let result = self
.start_submitted_compactions(&submitted_compactions)
.await;
if !submitted_compactions.is_empty() {
self.state_writer.write_compactions_safely().await?;
}
result
}
async fn start_submitted_compactions(
&mut self,
submitted_compactions: &[Compaction],
) -> Result<(), SlateDBError> {
for compaction in submitted_compactions {
assert!(
compaction.status() == CompactionStatus::Submitted,
"expected submitted compaction, got {:?}",
compaction.status()
);
let running_compaction_count = self.running_compaction_count();
if running_compaction_count >= self.options.max_concurrent_compactions {
info!(
"skipping compaction since capacity is exceeded [running_compactions={}, max_concurrent_compactions={}, compaction={:?}]",
running_compaction_count,
self.options.max_concurrent_compactions,
compaction
);
break;
}
if let Err(e) = self.validate_compaction(compaction.spec()) {
error!(
"compaction validation failed [error={:?}, compaction={:?}]",
compaction, e
);
self.state_mut().update_compaction(&compaction.id(), |c| {
c.set_status(CompactionStatus::Failed)
});
continue;
}
match self
.start_compaction(compaction.id(), compaction.clone())
.await
{
Ok(_) => {
self.state_mut().update_compaction(&compaction.id(), |c| {
c.set_status(CompactionStatus::Running)
});
}
Err(e) => {
self.state_mut().update_compaction(&compaction.id(), |c| {
c.set_status(CompactionStatus::Failed)
});
return Err(e);
}
}
}
Ok(())
}
#[instrument(level = "debug", skip_all, fields(id = %job_id))]
async fn start_compaction(
&mut self,
job_id: Ulid,
compaction: Compaction,
) -> Result<(), SlateDBError> {
self.log_compaction_state();
let db_state = self.state().db_state();
let sst_views = compaction.get_l0_sst_views(db_state);
let sorted_runs = compaction.get_sorted_runs(db_state);
let spec = compaction.spec();
let is_dest_last_run = db_state.compacted.is_empty()
|| db_state
.compacted
.last()
.is_some_and(|sr| spec.destination() == sr.id);
let job_args = StartCompactionJobArgs {
id: job_id,
compaction_id: compaction.id(),
destination: spec.destination(),
sst_views,
sorted_runs,
output_ssts: compaction.output_ssts().clone(),
compaction_clock_tick: db_state.last_l0_clock_tick,
retention_min_seq: Some(db_state.recent_snapshot_min_seq),
is_dest_last_run,
};
let this_executor = self.executor.clone();
#[cfg(not(dst))]
#[allow(clippy::disallowed_methods)]
let result = tokio::task::spawn_blocking(move || {
this_executor.start_compaction_job(job_args);
})
.await
.map_err(|_| SlateDBError::CompactionExecutorFailed);
#[cfg(dst)]
let result = tokio::spawn(async move {
this_executor.start_compaction_job(job_args);
})
.await
.map_err(|_| SlateDBError::CompactionExecutorFailed);
result
}
async fn finish_failed_compaction(&mut self, id: Ulid) -> Result<(), SlateDBError> {
self.state_mut()
.update_compaction(&id, |c| c.set_status(CompactionStatus::Failed));
self.state_writer.write_compactions_safely().await?;
Ok(())
}
#[instrument(level = "debug", skip_all, fields(id = %id))]
async fn finish_compaction(
&mut self,
id: Ulid,
output_sr: SortedRun,
) -> Result<(), SlateDBError> {
self.state_mut().finish_compaction(id, output_sr);
self.log_compaction_state();
self.state_writer.write_state_safely().await?;
self.maybe_schedule_compactions().await?;
self.maybe_start_compactions().await?;
self.stats
.last_compaction_ts
.set(self.system_clock.now().timestamp());
Ok(())
}
fn log_compaction_state(&self) {
self.state().db_state().log_db_runs();
let compactions = self.state().active_compactions();
for compaction in compactions {
if log::log_enabled!(log::Level::Debug) {
debug!("in-flight compaction [compaction={:?}]", compaction);
} else {
info!("in-flight compaction [compaction={}]", compaction);
}
}
}
fn running_compaction_count(&self) -> usize {
self.state()
.active_compactions()
.filter(|c| c.status() == CompactionStatus::Running)
.count()
}
}
pub mod stats {
use slatedb_common::metrics::{CounterFn, GaugeFn, MetricsRecorderHelper, UpDownCounterFn};
use std::sync::Arc;
pub use crate::merge_operator::MERGE_OPERATOR_OPERANDS;
use crate::merge_operator::{
MERGE_OPERATOR_COMPACT_PATH, MERGE_OPERATOR_OPERANDS_DESCRIPTION, MERGE_OPERATOR_PATH_LABEL,
};
macro_rules! compactor_stat_name {
($suffix:expr) => {
concat!("slatedb.compactor.", $suffix)
};
}
pub const BYTES_COMPACTED: &str = compactor_stat_name!("bytes_compacted");
pub const LAST_COMPACTION_TS_SEC: &str = compactor_stat_name!("last_compaction_timestamp_sec");
pub const RUNNING_COMPACTIONS: &str = compactor_stat_name!("running_compactions");
pub const TOTAL_BYTES_BEING_COMPACTED: &str =
compactor_stat_name!("total_bytes_being_compacted");
pub const TOTAL_THROUGHPUT_BYTES_PER_SEC: &str =
compactor_stat_name!("total_throughput_bytes_per_sec");
pub(crate) struct CompactionStats {
pub(crate) last_compaction_ts: Arc<dyn GaugeFn>,
pub(crate) running_compactions: Arc<dyn UpDownCounterFn>,
pub(crate) bytes_compacted: Arc<dyn CounterFn>,
pub(crate) total_bytes_being_compacted: Arc<dyn GaugeFn>,
pub(crate) total_throughput: Arc<dyn GaugeFn>,
pub(crate) merge_operator_compact_operands: Arc<dyn CounterFn>,
}
impl CompactionStats {
pub(crate) fn new(recorder: &MetricsRecorderHelper) -> Self {
Self {
last_compaction_ts: recorder.gauge(LAST_COMPACTION_TS_SEC).register(),
running_compactions: recorder.up_down_counter(RUNNING_COMPACTIONS).register(),
bytes_compacted: recorder.counter(BYTES_COMPACTED).register(),
total_bytes_being_compacted: recorder.gauge(TOTAL_BYTES_BEING_COMPACTED).register(),
total_throughput: recorder.gauge(TOTAL_THROUGHPUT_BYTES_PER_SEC).register(),
merge_operator_compact_operands: recorder
.counter(MERGE_OPERATOR_OPERANDS)
.labels(&[(MERGE_OPERATOR_PATH_LABEL, MERGE_OPERATOR_COMPACT_PATH)])
.description(MERGE_OPERATOR_OPERANDS_DESCRIPTION)
.register(),
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::ObjectStore;
use parking_lot::Mutex;
use rand::RngCore;
use slatedb_common::MockSystemClock;
use ulid::Ulid;
use super::*;
use crate::compactions_store::{FenceableCompactions, StoredCompactions};
use crate::compactor::stats::CompactionStats;
use crate::compactor::stats::LAST_COMPACTION_TS_SEC;
use crate::compactor_executor::{
CompactionExecutor, TokioCompactionExecutor, TokioCompactionExecutorOptions,
};
use crate::compactor_state::CompactionStatus;
use crate::compactor_state::SourceId;
use crate::config::{
FlushOptions, FlushType, MergeOptions, PutOptions, Settings,
SizeTieredCompactionSchedulerOptions, Ttl, WriteOptions,
};
use crate::db::Db;
use crate::db_state::{
ManifestCore, SortedRun, SsTableHandle, SsTableId, SsTableInfo, SsTableView,
};
use crate::error::SlateDBError;
use crate::format::sst::{SsTableFormat, SST_FORMAT_VERSION_LATEST};
use crate::iter::RowEntryIterator;
use crate::manifest::store::{ManifestStore, StoredManifest};
use crate::manifest::Manifest;
use crate::merge_operator::{MergeOperator, MergeOperatorError};
use crate::object_stores::ObjectStores;
use crate::proptest_util::rng;
use crate::sst_iter::{SstIterator, SstIteratorOptions};
use crate::tablestore::TableStore;
use crate::test_utils::assert_iterator;
use crate::types::KeyValue;
use crate::types::RowEntry;
use bytes::Bytes;
use slatedb_common::clock::{DefaultSystemClock, SystemClock};
const PATH: &str = "/test/db";
struct StringConcatMergeOperator;
impl MergeOperator for StringConcatMergeOperator {
fn merge(
&self,
_key: &Bytes,
existing_value: Option<Bytes>,
value: Bytes,
) -> Result<Bytes, MergeOperatorError> {
let mut result = existing_value.unwrap_or_default().as_ref().to_vec();
result.extend_from_slice(&value);
Ok(Bytes::from(result))
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_compactor_compacts_l0() {
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let mut options = db_options(Some(compactor_options()));
options.l0_sst_size_bytes = 512;
let scheduler_options = SizeTieredCompactionSchedulerOptions {
min_compaction_sources: 1,
max_compaction_sources: 999,
include_size_threshold: 4.0,
}
.into();
options
.compactor_options
.as_mut()
.expect("compactor options must be set")
.scheduler_options = scheduler_options;
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(system_clock.clone())
.build()
.await
.unwrap();
let (_, _, table_store) = build_test_stores(os.clone());
let mut expected = HashMap::<Vec<u8>, Vec<u8>>::new();
for i in 0..4 {
let k = vec![b'a' + i as u8; 16];
let v = vec![b'b' + i as u8; 48];
expected.insert(k.clone(), v.clone());
db.put_with_options(
&k,
&v,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
let k = vec![b'j' + i as u8; 16];
let v = vec![b'k' + i as u8; 48];
db.put_with_options(
&k,
&v,
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
expected.insert(k.clone(), v.clone());
}
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
let db_state = await_compaction(&db, Some(system_clock.clone())).await;
let db_state = db_state.expect("db was not compacted");
for run in db_state.compacted {
for sst in run.sst_views {
let mut iter = SstIterator::new_borrowed_initialized(
..,
&sst,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
while let Some(kv) = iter.next().await.unwrap().map(KeyValue::from) {
let expected_v = expected
.remove(kv.key.as_ref())
.expect("removing unexpected key");
let db_v = db.get(kv.key.as_ref()).await.unwrap().unwrap();
assert_eq!(expected_v, db_v.as_ref());
}
}
}
assert!(expected.is_empty());
}
#[cfg(feature = "wal_disable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_tombstones_in_l0() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| {
state.manifest().l0.len() == 2 ||
(state.manifest().l0.len() == 1 && state.manifest().compacted.len() == 1)
},
)));
let mut options = db_options(None);
options.wal_enabled = false;
options.l0_sst_size_bytes = 128;
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(system_clock.clone())
.with_compactor_builder(compactor_builder_with_scheduler(
os.clone(),
scheduler.clone(),
system_clock.clone(),
))
.build()
.await
.unwrap();
let (manifest_store, _, table_store) = build_test_stores(os.clone());
db.put(&[b'a'; 16], &[b'a'; 32]).await.unwrap();
db.put(&[b'b'; 16], &[b'a'; 32]).await.unwrap();
db.flush().await.unwrap();
let db_state = await_compaction(&db, Some(system_clock.clone()))
.await
.unwrap();
assert_eq!(db_state.compacted.len(), 1);
assert_eq!(db_state.l0.len(), 0, "{:?}", db_state.l0);
db.delete_with_options(
&[b'a'; 16],
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush().await.unwrap();
let db_state = get_db_state(manifest_store.clone()).await;
assert_eq!(db_state.l0.len(), 1, "{:?}", db_state.l0);
assert_eq!(db_state.compacted.len(), 1);
let l0 = db_state.l0.front().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
l0,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let tombstone = iter.next().await.unwrap();
assert!(tombstone.unwrap().value.is_tombstone());
let db_state = await_compacted_compaction(
manifest_store.clone(),
db_state.compacted,
Some(system_clock.clone()),
)
.await
.unwrap();
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().sst_views;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let next = iter.next().await.unwrap().map(KeyValue::from);
assert_eq!(next.unwrap().key.as_ref(), &[b'b'; 16]);
let next = iter.next().await.unwrap().map(KeyValue::from);
assert!(next.is_none());
}
#[cfg(feature = "wal_disable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_not_filter_tombstone_with_snapshot() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| {
state.manifest().l0.len() == 2 ||
(state.manifest().l0.len() == 1 && state.manifest().compacted.len() == 1)
},
)));
let mut options = db_options(None);
options.wal_enabled = false;
options.l0_sst_size_bytes = 128;
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(system_clock.clone())
.with_compactor_builder(compactor_builder_with_scheduler(
os.clone(),
scheduler.clone(),
system_clock.clone(),
))
.build()
.await
.unwrap();
let (manifest_store, _, table_store) = build_test_stores(os.clone());
db.put(&[b'a'; 16], &[b'a'; 32]).await.unwrap();
db.put(&[b'b'; 16], &[b'a'; 32]).await.unwrap();
let _snapshot = db.snapshot().await.unwrap();
db.flush().await.unwrap();
let db_state = await_compaction(&db, Some(system_clock.clone()))
.await
.unwrap();
assert_eq!(db_state.compacted.len(), 1);
assert_eq!(db_state.l0.len(), 0, "{:?}", db_state.l0);
db.delete_with_options(
&[b'a'; 16],
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush().await.unwrap();
let db_state = get_db_state(manifest_store.clone()).await;
assert_eq!(db_state.l0.len(), 1, "{:?}", db_state.l0);
assert_eq!(db_state.compacted.len(), 1);
let db_state = await_compacted_compaction(
manifest_store.clone(),
db_state.compacted,
Some(system_clock.clone()),
)
.await
.unwrap();
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().sst_views;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let next = iter.next().await.unwrap().unwrap();
assert_eq!(next.key.as_ref(), &[b'a'; 16]);
assert!(next.value.is_tombstone());
let next = iter.next().await.unwrap().unwrap();
assert_eq!(next.key.as_ref(), &[b'a'; 16]);
assert_eq!(next.value.as_bytes().unwrap().as_ref(), &[b'a'; 32]);
let next = iter.next().await.unwrap().unwrap();
assert_eq!(next.key.as_ref(), &[b'b'; 16]);
assert_eq!(next.value.as_bytes().unwrap().as_ref(), &[b'a'; 32]);
let next = iter.next().await.unwrap();
assert!(next.is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_apply_merge_during_l0_compaction() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.manifest().l0.len() >= 2,
)));
let options = db_options(None);
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(system_clock.clone())
.with_compactor_builder(compactor_builder_with_scheduler(
os.clone(),
compaction_scheduler.clone(),
system_clock.clone(),
))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (_manifest_store, _, table_store) = build_test_stores(os.clone());
db.merge_with_options(
b"key1",
b"a",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.merge_with_options(
b"key1",
b"b",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'x'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush().await.unwrap();
db.merge_with_options(
b"key1",
b"c",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.merge_with_options(
b"key2",
b"x",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'y'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush().await.unwrap();
let db_state = await_compaction(&db, Some(system_clock)).await;
let db_state = db_state.expect("db was not compacted");
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().sst_views;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_merge(b"key1", b"abc", 4).with_create_ts(0),
RowEntry::new_merge(b"key2", b"x", 5).with_create_ts(0),
RowEntry::new_value(&[b'x'; 16], &[b'p'; 128], 3).with_create_ts(0),
RowEntry::new_value(&[b'y'; 16], &[b'p'; 128], 6).with_create_ts(0),
],
)
.await;
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("abc")));
let result = db.get(b"key2").await.unwrap();
assert_eq!(result, Some(Bytes::from("x")));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_record_merge_operator_operands_on_compact_path() {
use crate::merge_operator::MERGE_OPERATOR_COMPACT_PATH;
use crate::test_utils::{
lookup_merge_operator_operands, OnDemandCompactionSchedulerSupplier,
};
use slatedb_common::metrics::DefaultMetricsRecorder;
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.manifest().l0.len() >= 2,
)));
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let db = Db::builder(PATH, os.clone())
.with_settings(db_options(None))
.with_system_clock(system_clock.clone())
.with_metrics_recorder(metrics_recorder.clone())
.with_compactor_builder(compactor_builder_with_scheduler(
os.clone(),
compaction_scheduler,
system_clock.clone(),
))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
assert_eq!(
lookup_merge_operator_operands(&metrics_recorder, MERGE_OPERATOR_COMPACT_PATH),
Some(0)
);
db.merge_with_options(
b"key1",
b"a",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
db.merge_with_options(
b"key1",
b"b",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::MemTable,
})
.await
.unwrap();
let db_state = await_compaction(&db, Some(system_clock)).await;
assert!(db_state.is_some(), "db was not compacted");
assert!(
lookup_merge_operator_operands(&metrics_recorder, MERGE_OPERATOR_COMPACT_PATH)
.is_some_and(|value| value > 0)
);
db.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_apply_merge_across_l0_and_sorted_runs() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| !state.manifest().l0.is_empty(),
)));
let options = db_options(None);
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(system_clock.clone())
.with_compactor_builder(compactor_builder_with_scheduler(
os.clone(),
compaction_scheduler.clone(),
system_clock.clone(),
))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (_manifest_store, _, table_store) = build_test_stores(os.clone());
db.merge_with_options(
b"key1",
b"a",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.merge_with_options(
b"key1",
b"b",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'x'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.flush().await.unwrap();
let db_state = await_compaction(&db, Some(system_clock.clone())).await;
let db_state = db_state.expect("db was not compacted");
assert_eq!(db_state.compacted.len(), 1);
let expected_tick = system_clock.now().timestamp_millis();
db.merge_with_options(
b"key1",
b"c",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.merge_with_options(
b"key1",
b"d",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'y'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.flush().await.unwrap();
let db_state = await_compaction(&db, Some(system_clock)).await;
let db_state = db_state.expect("db was not compacted");
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().sst_views;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_merge(b"key1", b"abcd", 5).with_create_ts(expected_tick),
RowEntry::new_value(&[b'x'; 16], &[b'p'; 128], 3).with_create_ts(0),
RowEntry::new_value(&[b'y'; 16], &[b'p'; 128], 6).with_create_ts(expected_tick),
],
)
.await;
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("abcd")));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_merge_without_base_value() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.manifest().l0.len() >= 2,
)));
let options = db_options(None);
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(system_clock.clone())
.with_compactor_builder(compactor_builder_with_scheduler(
os.clone(),
compaction_scheduler.clone(),
system_clock.clone(),
))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (_manifest_store, _, table_store) = build_test_stores(os.clone());
db.merge_with_options(
b"key1",
b"x",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.put_with_options(
&vec![b'x'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.flush().await.unwrap();
db.merge_with_options(
b"key1",
b"y",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.merge_with_options(
b"key1",
b"z",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'y'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.flush().await.unwrap();
let db_state = await_compaction(&db, Some(system_clock)).await;
let db_state = db_state.expect("db was not compacted");
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().sst_views;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_merge(b"key1", b"xyz", 4).with_create_ts(0),
RowEntry::new_value(&[b'x'; 16], &[b'p'; 128], 2).with_create_ts(0),
RowEntry::new_value(&[b'y'; 16], &[b'p'; 128], 5).with_create_ts(0),
],
)
.await;
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("xyz")));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_preserve_merge_order_across_multiple_ssts() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.manifest().l0.len() >= 3,
)));
let options = db_options(None);
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(system_clock.clone())
.with_compactor_builder(compactor_builder_with_scheduler(
os.clone(),
compaction_scheduler.clone(),
system_clock.clone(),
))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (_manifest_store, _, table_store) = build_test_stores(os.clone());
db.merge_with_options(
b"key1",
b"1",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'a'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.flush().await.unwrap();
db.merge_with_options(
b"key1",
b"2",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'b'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.flush().await.unwrap();
db.merge_with_options(
b"key1",
b"3",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'c'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.flush().await.unwrap();
let db_state = await_compaction(&db, Some(system_clock)).await;
let db_state = db_state.expect("db was not compacted");
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().sst_views;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(&[b'a'; 16], &[b'p'; 128], 2).with_create_ts(0),
RowEntry::new_value(&[b'b'; 16], &[b'p'; 128], 4).with_create_ts(0),
RowEntry::new_value(&[b'c'; 16], &[b'p'; 128], 6).with_create_ts(0),
RowEntry::new_merge(b"key1", b"123", 5).with_create_ts(0),
],
)
.await;
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("123")));
}
#[cfg(feature = "wal_disable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_not_compact_expired_merge_operations_in_last_run() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let insert_clock = Arc::new(MockSystemClock::new());
let scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.manifest().l0.len() >= 2,
)));
let mut options = db_options(None);
options.wal_enabled = false;
options.l0_sst_size_bytes = 128;
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(insert_clock.clone())
.with_compactor_builder(compactor_builder_with_scheduler(
os.clone(),
scheduler.clone(),
insert_clock.clone(),
))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (_manifest_store, _, table_store) = build_test_stores(os.clone());
insert_clock.set(0);
db.merge_with_options(
b"key1",
&[b'a'; 32],
&crate::config::MergeOptions {
ttl: Ttl::ExpireAfter(10),
},
&WriteOptions {
await_durable: true,
},
)
.await
.unwrap();
insert_clock.set(20);
db.merge_with_options(
b"key1",
&[b'b'; 32],
&crate::config::MergeOptions { ttl: Ttl::NoExpiry },
&WriteOptions {
await_durable: true,
},
)
.await
.unwrap();
let db_state = await_compaction(&db, Some(insert_clock.clone()))
.await
.unwrap();
assert_eq!(db_state.compacted.len(), 1);
assert_eq!(db_state.last_l0_clock_tick, 20);
let compacted = &db_state.compacted.first().unwrap().sst_views;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![RowEntry::new_merge(b"key1", &[b'b'; 32], 2).with_create_ts(20)],
)
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_merge_and_then_overwrite_with_put() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.manifest().l0.len() >= 2,
)));
let options = db_options(None);
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(system_clock.clone())
.with_compactor_builder(compactor_builder_with_scheduler(
os.clone(),
compaction_scheduler.clone(),
system_clock.clone(),
))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (manifest_store, _compactions_store, _table_store) = build_test_stores(os.clone());
db.merge_with_options(
b"key1",
b"a",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.merge_with_options(
b"key1",
b"b",
&MergeOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'x'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.flush().await.unwrap();
db.put_with_options(
b"key1",
b"new_value",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'y'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.flush().await.unwrap();
let _ = await_compaction(&db, Some(system_clock)).await;
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("new_value")));
let stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let db_state = stored_manifest.db_state();
assert!(
!db_state.compacted.is_empty(),
"compaction should have occurred"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_not_merge_operations_with_different_expire_times() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.manifest().l0.len() >= 2,
)));
let options = db_options(None);
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(system_clock.clone())
.with_compactor_builder(compactor_builder_with_scheduler(
os.clone(),
compaction_scheduler.clone(),
system_clock.clone(),
))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (manifest_store, _compactions_store, table_store) = build_test_stores(os.clone());
db.merge_with_options(
b"key1",
b"a",
&crate::config::MergeOptions {
ttl: Ttl::ExpireAfter(100),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'x'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.flush().await.unwrap();
db.merge_with_options(
b"key1",
b"b",
&crate::config::MergeOptions {
ttl: Ttl::ExpireAfter(200),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.put_with_options(
&vec![b'y'; 16],
&vec![b'p'; 128],
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap(); db.flush().await.unwrap();
let _ = await_compaction(&db, Some(system_clock)).await;
let stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let db_state = stored_manifest.db_state();
assert!(
!db_state.compacted.is_empty(),
"compaction should have occurred"
);
let compacted = &db_state.compacted.first().unwrap().sst_views;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let mut key1_entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
if entry.key.as_ref() == b"key1" {
key1_entries.push(entry);
}
}
assert!(
!key1_entries.is_empty(),
"should have merge operations for key1"
);
assert!(key1_entries
.iter()
.all(|e| matches!(e.value, crate::types::ValueDeletable::Merge(_))));
if key1_entries.len() == 2 {
assert_ne!(
key1_entries[0].expire_ts, key1_entries[1].expire_ts,
"separate merge operations should have different expire times"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_merge_operations_with_same_expire_at() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.manifest().l0.len() >= 2,
)));
let options = db_options(None);
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(system_clock.clone())
.with_compactor_builder(compactor_builder_with_scheduler(
os.clone(),
compaction_scheduler.clone(),
system_clock.clone(),
))
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (manifest_store, _compactions_store, table_store) = build_test_stores(os.clone());
let flush_opts = FlushOptions {
flush_type: FlushType::MemTable,
};
system_clock.set(100);
db.merge_with_options(
b"key1",
b"a",
&MergeOptions {
ttl: Ttl::ExpireAt(1000),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush_with_options(flush_opts.clone()).await.unwrap();
system_clock.set(200);
db.merge_with_options(
b"key1",
b"b",
&MergeOptions {
ttl: Ttl::ExpireAt(1000),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush_with_options(flush_opts.clone()).await.unwrap();
let _ = await_compaction(&db, Some(system_clock)).await;
let stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let db_state = stored_manifest.db_state();
assert!(
!db_state.compacted.is_empty(),
"compaction should have occurred"
);
let compacted = &db_state.compacted.first().unwrap().sst_views;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let mut key1_entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
if entry.key.as_ref() == b"key1" {
key1_entries.push(entry);
}
}
assert_eq!(
key1_entries.len(),
1,
"expected a single merged entry for key1, got {}",
key1_entries.len()
);
let merged = &key1_entries[0];
assert_eq!(merged.expire_ts, Some(1000));
assert!(
matches!(&merged.value, crate::types::ValueDeletable::Merge(v) if v.as_ref() == b"ab"),
"expected merged value 'ab', got {:?}",
merged.value
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_compact_expired_expire_at_entries() {
let os = Arc::new(InMemory::new());
let insert_clock = Arc::new(MockSystemClock::new());
let scheduler_options = SizeTieredCompactionSchedulerOptions {
min_compaction_sources: 2,
max_compaction_sources: 2,
include_size_threshold: 4.0,
}
.into();
let mut options = db_options(Some(compactor_options()));
options
.compactor_options
.as_mut()
.expect("compactor options missing")
.scheduler_options = scheduler_options;
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(insert_clock.clone())
.build()
.await
.unwrap();
let (_, _, table_store) = build_test_stores(os.clone());
let value = &[b'a'; 64];
let flush_opts = FlushOptions {
flush_type: FlushType::MemTable,
};
insert_clock.set(0);
db.put_with_options(
&[1; 16],
value,
&PutOptions {
ttl: Ttl::ExpireAt(10),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
insert_clock.set(5);
db.put_with_options(
&[2; 16],
value,
&PutOptions {
ttl: Ttl::ExpireAt(i64::MAX),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush_with_options(flush_opts.clone()).await.unwrap();
insert_clock.set(10);
db.put_with_options(
&[3; 16],
value,
&PutOptions { ttl: Ttl::NoExpiry },
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush_with_options(flush_opts.clone()).await.unwrap();
let db_state = await_compaction(&db, Some(insert_clock)).await;
let db_state = db_state.expect("db was not compacted");
assert!(db_state.last_compacted_l0_sst_view_id.is_some());
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().sst_views;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(&[2; 16], value, 2)
.with_create_ts(5)
.with_expire_ts(i64::MAX),
RowEntry::new_value(&[3; 16], value, 3).with_create_ts(10),
],
)
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_compact_expired_entries() {
let os = Arc::new(InMemory::new());
let insert_clock = Arc::new(MockSystemClock::new());
let scheduler_options = SizeTieredCompactionSchedulerOptions {
min_compaction_sources: 2,
max_compaction_sources: 2,
include_size_threshold: 4.0,
}
.into();
let mut options = db_options(Some(compactor_options()));
options.default_ttl = Some(50);
options
.compactor_options
.as_mut()
.expect("compactor options missing")
.scheduler_options = scheduler_options;
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(insert_clock.clone())
.build()
.await
.unwrap();
let (_, _, table_store) = build_test_stores(os.clone());
let value = &[b'a'; 64];
insert_clock.set(0);
db.put_with_options(
&[1; 16],
value,
&PutOptions {
ttl: Ttl::ExpireAfter(10),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
insert_clock.set(10);
db.put_with_options(
&[2; 16],
value,
&PutOptions { ttl: Ttl::Default },
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush().await.unwrap();
insert_clock.set(30);
db.put_with_options(
&[3; 16],
value,
&PutOptions { ttl: Ttl::NoExpiry },
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
insert_clock.set(70);
db.put_with_options(
&[1; 16],
value,
&PutOptions {
ttl: Ttl::ExpireAfter(80),
},
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush().await.unwrap();
let db_state = await_compaction(&db, Some(insert_clock)).await;
let db_state = db_state.expect("db was not compacted");
assert!(db_state.last_compacted_l0_sst_view_id.is_some());
assert_eq!(db_state.compacted.len(), 1);
assert_eq!(db_state.last_l0_clock_tick, 70);
let compacted = &db_state.compacted.first().unwrap().sst_views;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(&[1; 16], value, 4)
.with_create_ts(70)
.with_expire_ts(150),
RowEntry::new_value(&[3; 16], value, 3).with_create_ts(30),
],
)
.await;
}
#[tokio::test]
async fn test_should_track_total_bytes_and_throughput() {
use crate::compactor::stats::{
TOTAL_BYTES_BEING_COMPACTED, TOTAL_THROUGHPUT_BYTES_PER_SEC,
};
use chrono::DateTime;
let mut fixture = CompactorEventHandlerTestFixture::new().await;
let current_time = fixture.handler.system_clock.now();
let current_time_ms = current_time.timestamp_millis() as u64;
let start_time_1 =
DateTime::from_timestamp_millis((current_time_ms - 2000) as i64).unwrap();
let start_time_2 =
DateTime::from_timestamp_millis((current_time_ms - 1000) as i64).unwrap();
let mut compaction_1 = Compaction::new(
Ulid::from_parts(start_time_1.timestamp_millis() as u64, 0),
CompactionSpec::new(vec![], 10),
);
compaction_1.set_bytes_processed(500);
let mut compaction_2 = Compaction::new(
Ulid::from_parts(start_time_2.timestamp_millis() as u64, 0),
CompactionSpec::new(vec![], 11),
);
compaction_2.set_bytes_processed(1000);
fixture
.handler
.state_mut()
.add_compaction(compaction_1)
.expect("failed to add compaction 1");
fixture
.handler
.state_mut()
.add_compaction(compaction_2)
.expect("failed to add compaction 2");
fixture.handler.handle_log_ticker();
let total_bytes = slatedb_common::metrics::lookup_metric(
&fixture.test_recorder,
TOTAL_BYTES_BEING_COMPACTED,
)
.expect("metric not found");
assert_eq!(total_bytes, 0);
let throughput = slatedb_common::metrics::lookup_metric(
&fixture.test_recorder,
TOTAL_THROUGHPUT_BYTES_PER_SEC,
)
.expect("metric not found");
assert!(
throughput > 0,
"Expected throughput > 0, got {}",
throughput
);
}
#[tokio::test]
async fn test_should_track_per_job_throughput() {
let start_time_ms = 1000u64;
let current_time_ms = 3000u64;
let processed_bytes = 1000u64;
let mut compaction = Compaction::new(
Ulid::from_parts(start_time_ms, 0),
CompactionSpec::new(vec![], 10),
);
compaction.set_bytes_processed(processed_bytes);
let elapsed_secs = (current_time_ms as f64 - start_time_ms as f64) / 1000.0;
let throughput = processed_bytes as f64 / elapsed_secs;
assert_eq!(throughput, 500.0);
let elapsed_zero = (start_time_ms as f64 - start_time_ms as f64) / 1000.0;
let throughput_zero = if elapsed_zero > 0.0 {
processed_bytes as f64 / elapsed_zero
} else {
0.0
};
assert_eq!(throughput_zero, 0.0);
}
#[tokio::test]
async fn test_should_track_running_compactions_count() {
use crate::compactor::stats::RUNNING_COMPACTIONS;
let mut fixture = CompactorEventHandlerTestFixture::new().await;
let running =
slatedb_common::metrics::lookup_metric(&fixture.test_recorder, RUNNING_COMPACTIONS)
.expect("metric not found");
assert_eq!(running, 0);
let compaction = Compaction::new(Ulid::new(), CompactionSpec::new(vec![], 10));
fixture
.handler
.state_mut()
.add_compaction(compaction)
.expect("failed to add compaction");
assert_eq!(fixture.handler.state().active_compactions().count(), 1);
}
#[tokio::test]
async fn test_submit_persists_compaction() {
let os = Arc::new(InMemory::new());
let (manifest_store, compactions_store, _table_store) = build_test_stores(os.clone());
let system_clock: Arc<dyn SystemClock> = Arc::new(DefaultSystemClock::new());
StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
system_clock.clone(),
)
.await
.unwrap();
let stored_manifest = StoredManifest::load(manifest_store.clone(), system_clock.clone())
.await
.unwrap();
StoredCompactions::create(
compactions_store.clone(),
stored_manifest.manifest().compactor_epoch,
)
.await
.unwrap();
let spec = CompactionSpec::new(vec![SourceId::SortedRun(0)], 0);
let compaction_id = Compactor::submit(
spec.clone(),
compactions_store.clone(),
Arc::new(DbRand::default()),
system_clock.clone(),
)
.await
.unwrap();
let (_, compactions) = compactions_store.read_latest_compactions().await.unwrap();
let stored = compactions
.get(&compaction_id)
.expect("missing submitted compaction");
assert_eq!(stored.spec(), &spec);
assert_eq!(stored.status(), CompactionStatus::Submitted);
}
#[tokio::test]
async fn test_submit_full_compaction_uses_sorted_run_sources_only() {
let os = Arc::new(InMemory::new());
let (manifest_store, compactions_store, _table_store) = build_test_stores(os.clone());
let system_clock: Arc<dyn SystemClock> = Arc::new(DefaultSystemClock::new());
StoredManifest::create_new_db(
manifest_store.clone(),
ManifestCore::new(),
system_clock.clone(),
)
.await
.unwrap();
let mut stored_manifest =
StoredManifest::load(manifest_store.clone(), system_clock.clone())
.await
.unwrap();
let mut dirty = stored_manifest.prepare_dirty().unwrap();
let l0_info = SsTableInfo {
first_entry: Some(Bytes::from_static(b"a")),
..SsTableInfo::default()
};
let sr_info = SsTableInfo {
first_entry: Some(Bytes::from_static(b"m")),
..SsTableInfo::default()
};
let l0_view_newest: SsTableView = SsTableView::identity(SsTableHandle::new(
SsTableId::Compacted(Ulid::new()),
SST_FORMAT_VERSION_LATEST,
l0_info.clone(),
));
let l0_view_oldest: SsTableView = SsTableView::identity(SsTableHandle::new(
SsTableId::Compacted(Ulid::new()),
SST_FORMAT_VERSION_LATEST,
l0_info.clone(),
));
dirty.value.core.l0 = VecDeque::from(vec![l0_view_newest, l0_view_oldest]);
dirty.value.core.compacted = vec![
SortedRun {
id: 2,
sst_views: vec![SsTableView::identity(SsTableHandle::new(
SsTableId::Compacted(Ulid::new()),
SST_FORMAT_VERSION_LATEST,
sr_info.clone(),
))],
},
SortedRun {
id: 1,
sst_views: vec![SsTableView::identity(SsTableHandle::new(
SsTableId::Compacted(Ulid::new()),
SST_FORMAT_VERSION_LATEST,
sr_info.clone(),
))],
},
];
stored_manifest.update(dirty).await.unwrap();
StoredCompactions::create(
compactions_store.clone(),
stored_manifest.manifest().compactor_epoch,
)
.await
.unwrap();
let scheduler = MockScheduler::new();
let specs = scheduler
.generate(
&CompactorStateView {
compactions: None,
manifest: (0, stored_manifest.manifest().clone()),
},
&CompactionRequest::Full,
)
.unwrap();
assert_eq!(specs.len(), 1);
let compaction_id = Compactor::submit(
specs[0].clone(),
compactions_store.clone(),
Arc::new(DbRand::default()),
system_clock.clone(),
)
.await
.unwrap();
let (_, compactions) = compactions_store.read_latest_compactions().await.unwrap();
let stored = compactions
.get(&compaction_id)
.expect("missing submitted compaction");
let expected_sources = vec![SourceId::SortedRun(2), SourceId::SortedRun(1)];
assert_eq!(stored.spec().sources(), &expected_sources);
assert_eq!(stored.spec().destination(), 1);
assert_eq!(stored.status(), CompactionStatus::Submitted);
}
#[test]
fn test_plan_spec_returns_spec_clone() {
let scheduler = MockScheduler::new();
let state = CompactorStateView {
compactions: None,
manifest: (0, Manifest::initial(ManifestCore::new())),
};
let spec = CompactionSpec::new(vec![SourceId::SortedRun(7)], 7);
let planned = scheduler
.generate(&state, &CompactionRequest::Spec(spec.clone()))
.unwrap();
assert_eq!(planned, vec![spec]);
}
#[test]
fn test_plan_full_uses_sorted_runs_and_min_destination() {
let scheduler = MockScheduler::new();
let mut core = ManifestCore::new();
let l0_info = SsTableInfo {
first_entry: Some(Bytes::from_static(b"a")),
..SsTableInfo::default()
};
let sr_info = SsTableInfo {
first_entry: Some(Bytes::from_static(b"m")),
..SsTableInfo::default()
};
let l0_view_first: SsTableView = SsTableView::identity(SsTableHandle::new(
SsTableId::Compacted(Ulid::from_parts(1, 0)),
SST_FORMAT_VERSION_LATEST,
l0_info.clone(),
));
let l0_view_second: SsTableView = SsTableView::identity(SsTableHandle::new(
SsTableId::Compacted(Ulid::from_parts(2, 0)),
SST_FORMAT_VERSION_LATEST,
l0_info,
));
core.l0 = VecDeque::from(vec![l0_view_first, l0_view_second]);
core.compacted = vec![
SortedRun {
id: 5,
sst_views: vec![SsTableView::identity(SsTableHandle::new(
SsTableId::Compacted(Ulid::from_parts(10, 0)),
SST_FORMAT_VERSION_LATEST,
sr_info.clone(),
))],
},
SortedRun {
id: 2,
sst_views: vec![SsTableView::identity(SsTableHandle::new(
SsTableId::Compacted(Ulid::from_parts(11, 0)),
SST_FORMAT_VERSION_LATEST,
sr_info,
))],
},
];
let state = CompactorStateView {
compactions: None,
manifest: (0, Manifest::initial(core)),
};
let planned = scheduler
.generate(&state, &CompactionRequest::Full)
.unwrap();
let expected_sources = vec![SourceId::SortedRun(5), SourceId::SortedRun(2)];
assert_eq!(planned.len(), 1);
assert_eq!(planned[0].sources(), &expected_sources);
assert_eq!(planned[0].destination(), 2);
}
#[test]
fn test_plan_full_without_sorted_runs_is_invalid() {
let scheduler = MockScheduler::new();
let mut core = ManifestCore::new();
let l0_info = SsTableInfo {
first_entry: Some(Bytes::from_static(b"a")),
..SsTableInfo::default()
};
core.l0 = VecDeque::from(vec![
SsTableView::identity(SsTableHandle::new(
SsTableId::Compacted(Ulid::from_parts(1, 0)),
SST_FORMAT_VERSION_LATEST,
l0_info.clone(),
)),
SsTableView::identity(SsTableHandle::new(
SsTableId::Compacted(Ulid::from_parts(2, 0)),
SST_FORMAT_VERSION_LATEST,
l0_info,
)),
]);
let state = CompactorStateView {
compactions: None,
manifest: (0, Manifest::initial(core)),
};
let err = scheduler
.generate(&state, &CompactionRequest::Full)
.expect_err(
"full compaction should reject empty or L0-only inputs because L0 SSTs are excluded",
);
assert_eq!(err.kind(), crate::ErrorKind::Invalid);
assert_eq!(err.to_string(), "Invalid error: invalid compaction");
}
struct CompactorEventHandlerTestFixture {
manifest: StoredManifest,
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
options: Settings,
db: Db,
scheduler: Arc<MockScheduler>,
executor: Arc<MockExecutor>,
real_executor: Arc<dyn CompactionExecutor>,
real_executor_rx: async_channel::Receiver<CompactorMessage>,
test_recorder: Arc<slatedb_common::metrics::DefaultMetricsRecorder>,
handler: CompactorEventHandler,
}
impl CompactorEventHandlerTestFixture {
async fn new() -> Self {
let compactor_options = Arc::new(compactor_options());
let options = db_options(None);
let os = Arc::new(InMemory::new());
let (manifest_store, compactions_store, table_store) = build_test_stores(os.clone());
let db = Db::builder(PATH, os.clone())
.with_settings(options.clone())
.build()
.await
.unwrap();
let scheduler = Arc::new(MockScheduler::new());
let executor = Arc::new(MockExecutor::new());
let (real_executor_tx, real_executor_rx) = async_channel::unbounded();
let rand = Arc::new(DbRand::default());
let test_recorder = Arc::new(slatedb_common::metrics::DefaultMetricsRecorder::new());
let recorder = MetricsRecorderHelper::new(
test_recorder.clone() as Arc<dyn slatedb_common::metrics::MetricsRecorder>,
slatedb_common::metrics::MetricLevel::default(),
);
let compactor_stats = Arc::new(CompactionStats::new(&recorder));
let real_executor = Arc::new(TokioCompactionExecutor::new(
TokioCompactionExecutorOptions {
handle: Handle::current(),
options: compactor_options.clone(),
worker_tx: real_executor_tx,
table_store,
rand: rand.clone(),
stats: compactor_stats.clone(),
clock: Arc::new(DefaultSystemClock::new()),
manifest_store: manifest_store.clone(),
merge_operator: None,
#[cfg(feature = "compaction_filters")]
compaction_filter_supplier: None,
},
));
let handler = CompactorEventHandler::new(
manifest_store.clone(),
compactions_store.clone(),
compactor_options.clone(),
scheduler.clone(),
executor.clone(),
rand.clone(),
compactor_stats.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
Self {
manifest,
manifest_store,
compactions_store,
options,
db,
scheduler,
executor,
real_executor_rx,
real_executor,
test_recorder,
handler,
}
}
async fn latest_db_state(&mut self) -> ManifestCore {
self.manifest.refresh().await.unwrap().core.clone()
}
async fn write_l0(&mut self) {
let mut rng = rng::new_test_rng(None);
let manifest = self.manifest.refresh().await.unwrap();
let l0s = manifest.core.l0.len();
let mut k = vec![0u8; self.options.l0_sst_size_bytes];
rng.fill_bytes(&mut k);
self.db.put(&k, &[b'x'; 10]).await.unwrap();
self.db.flush().await.unwrap();
loop {
let manifest = self.manifest.refresh().await.unwrap().clone();
if manifest.core.l0.len() > l0s {
break;
}
}
}
async fn build_l0_compaction(&mut self) -> CompactionSpec {
let db_state = self.latest_db_state().await;
let l0_ids_to_compact: Vec<SourceId> = db_state
.l0
.iter()
.map(|h| SourceId::SstView(h.id))
.collect();
CompactionSpec::new(l0_ids_to_compact, 0)
}
fn assert_started_compaction(&self, num: usize) -> Vec<StartCompactionJobArgs> {
let attempts = self.executor.pop_jobs();
assert_eq!(num, attempts.len());
attempts
}
fn assert_and_forward_compactions(&self, num: usize) {
for c in self.assert_started_compaction(num) {
self.real_executor.start_compaction_job(c)
}
}
}
#[tokio::test]
async fn test_should_record_last_compaction_ts() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Ok(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Ok(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
Err(e) => panic!("channel closed before receiving any message: {e}"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
let starting_last_ts =
slatedb_common::metrics::lookup_metric(&fixture.test_recorder, LAST_COMPACTION_TS_SEC)
.expect("metric not found");
fixture
.handler
.handle(msg)
.await
.expect("fatal error handling compaction message");
let last_ts =
slatedb_common::metrics::lookup_metric(&fixture.test_recorder, LAST_COMPACTION_TS_SEC)
.expect("metric not found");
assert!(last_ts > starting_last_ts);
}
#[tokio::test]
async fn test_should_write_manifest_safely() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Ok(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Ok(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
Err(e) => panic!("channel closed before receiving any message: {e}"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
fixture.write_l0().await;
fixture
.handler
.handle(msg)
.await
.expect("fatal error handling compaction message");
let db_state = fixture.latest_db_state().await;
assert_eq!(db_state.l0.len(), 1);
assert_eq!(db_state.compacted.len(), 1);
let l0_id = db_state.l0.front().unwrap().sst.id.unwrap_compacted_id();
let compacted_l0s: Vec<Ulid> = db_state
.compacted
.first()
.unwrap()
.sst_views
.iter()
.map(|view| view.sst.id.unwrap_compacted_id())
.collect();
assert!(!compacted_l0s.contains(&l0_id));
assert_eq!(
db_state.last_compacted_l0_sst_view_id.unwrap(),
compaction
.sources()
.first()
.and_then(|id| id.maybe_unwrap_sst_view())
.unwrap()
);
}
#[tokio::test]
async fn test_should_clear_compaction_on_failure_and_retry() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
let job = fixture.assert_started_compaction(1).pop().unwrap();
let msg = CompactorMessage::CompactionJobFinished {
id: job.id,
result: Err(SlateDBError::InvalidDBState),
};
fixture
.handler
.handle(msg)
.await
.expect("fatal error handling compaction message");
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_started_compaction(1);
}
#[tokio::test]
async fn test_should_persist_compactions_on_start_and_finish() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
let (_, stored_compactions) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
assert_eq!(
stored_compactions
.iter()
.collect::<Vec<&Compaction>>()
.len(),
1
);
let running_id = stored_compactions
.iter()
.next()
.expect("compaction should be persisted")
.id();
let state_id = fixture
.handler
.state()
.active_compactions()
.next()
.expect("state missing compaction")
.id();
assert_eq!(running_id, state_id);
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Ok(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Ok(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
Err(e) => panic!("channel closed before receiving any message: {e}"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
fixture.handler.handle(msg).await.unwrap();
let (_, stored_compactions) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
let mut stored_compactions_iter = stored_compactions.iter();
assert_eq!(
stored_compactions_iter
.next()
.expect("compactions should not be empty after finish")
.id(),
running_id,
);
assert!(
stored_compactions_iter.next().is_none(),
"expected only one retained finished compaction for GC"
);
}
#[tokio::test]
async fn test_progress_persists_output_ssts() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
fixture.handler.state_writer.refresh().await.unwrap();
let spec = fixture.build_l0_compaction().await;
let compaction_id = Ulid::new();
fixture
.handler
.state_mut()
.add_compaction(Compaction::new(compaction_id, spec))
.expect("failed to add compaction");
fixture.handler.maybe_start_compactions().await.unwrap();
let sst_info = SsTableInfo {
first_entry: Some(Bytes::from_static(b"a")),
..SsTableInfo::default()
};
let output_sst = SsTableHandle::new(
SsTableId::Compacted(Ulid::new()),
SST_FORMAT_VERSION_LATEST,
sst_info,
);
let output_ssts = vec![output_sst.clone()];
fixture
.handler
.handle(CompactorMessage::CompactionJobProgress {
id: compaction_id,
bytes_processed: 123,
output_ssts: output_ssts.clone(),
})
.await
.expect("fatal error handling progress message");
let (_, stored_compactions) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
let stored = stored_compactions
.get(&compaction_id)
.expect("missing stored compaction");
assert_eq!(stored.output_ssts(), &output_ssts);
let state_compaction = fixture
.handler
.state()
.active_compactions()
.find(|c| c.id() == compaction_id)
.expect("missing compaction in state");
assert_eq!(state_compaction.output_ssts(), &output_ssts);
}
#[tokio::test]
async fn test_maybe_schedule_compactions_only_submits() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction);
fixture.handler.maybe_schedule_compactions().await.unwrap();
assert_eq!(fixture.executor.pop_jobs().len(), 0);
let mut compactions = fixture.handler.state().active_compactions();
let scheduled = compactions.next().expect("missing compaction");
assert_eq!(scheduled.status(), CompactionStatus::Submitted);
assert!(compactions.next().is_none());
let (_, stored_compactions) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
let stored = stored_compactions
.iter()
.next()
.expect("compaction should be persisted");
assert_eq!(stored.status(), CompactionStatus::Submitted);
}
#[tokio::test]
async fn test_maybe_start_compactions_starts_submitted() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
fixture.handler.state_writer.refresh().await.unwrap();
let spec = fixture.build_l0_compaction().await;
let compaction_id = Ulid::new();
fixture
.handler
.state_mut()
.add_compaction(Compaction::new(compaction_id, spec))
.expect("failed to add compaction");
fixture.handler.maybe_start_compactions().await.unwrap();
let jobs = fixture.executor.pop_jobs();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].compaction_id, compaction_id);
let compactions = &fixture.handler.state_writer.state.compactions().value;
assert_eq!(
compactions
.get(&compaction_id)
.expect("missing compaction")
.status(),
CompactionStatus::Running
);
let (_, stored_compactions) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
assert_eq!(
stored_compactions
.get(&compaction_id)
.expect("missing stored compaction")
.status(),
CompactionStatus::Running
);
}
#[tokio::test]
async fn test_maybe_start_compactions_respects_capacity() {
let mut fixture = CompactorEventHandlerTestFixture::new().await; fixture.write_l0().await;
fixture.handler.state_writer.refresh().await.unwrap();
let spec = fixture.build_l0_compaction().await;
let spec_alt = CompactionSpec::new(spec.sources().clone(), spec.destination() + 1);
let first_id = Ulid::from_parts(1, 0);
let second_id = Ulid::from_parts(2, 0);
fixture
.handler
.state_mut()
.add_compaction(Compaction::new(first_id, spec))
.expect("failed to add compaction");
fixture
.handler
.state_mut()
.add_compaction(Compaction::new(second_id, spec_alt))
.expect("failed to add compaction");
fixture.handler.maybe_start_compactions().await.unwrap();
let jobs = fixture.executor.pop_jobs();
assert_eq!(jobs.len(), 1);
let compactions = &fixture.handler.state_writer.state.compactions().value;
assert_eq!(
compactions
.get(&first_id)
.expect("missing first compaction")
.status(),
CompactionStatus::Running
);
assert_eq!(
compactions
.get(&second_id)
.expect("missing second compaction")
.status(),
CompactionStatus::Submitted
);
}
#[tokio::test]
async fn test_maybe_start_compactions_marks_invalid_failed() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
let compaction_id = Ulid::new();
fixture
.handler
.state_mut()
.add_compaction(Compaction::new(
compaction_id,
CompactionSpec::new(Vec::new(), 0), ))
.expect("failed to add compaction");
fixture.handler.maybe_start_compactions().await.unwrap();
assert_eq!(fixture.executor.pop_jobs().len(), 0);
let compactions = &fixture.handler.state_writer.state.compactions().value;
assert_eq!(
compactions
.get(&compaction_id)
.expect("missing compaction")
.status(),
CompactionStatus::Failed
);
let (_, stored_compactions) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
assert_eq!(
stored_compactions
.get(&compaction_id)
.expect("missing stored compaction")
.status(),
CompactionStatus::Failed
);
}
#[tokio::test]
async fn test_handle_ticker_starts_preexisting_submitted_compaction() {
let compactor_options = Arc::new(compactor_options());
let options = db_options(None);
let os = Arc::new(InMemory::new());
let (manifest_store, compactions_store, _table_store) = build_test_stores(os.clone());
let db = Db::builder(PATH, os.clone())
.with_settings(options.clone())
.build()
.await
.unwrap();
let system_clock: Arc<dyn SystemClock> = Arc::new(DefaultSystemClock::new());
let mut stored_manifest =
StoredManifest::load(manifest_store.clone(), system_clock.clone())
.await
.unwrap();
let mut rng = rng::new_test_rng(None);
let manifest = stored_manifest.refresh().await.unwrap();
let l0s = manifest.core.l0.len();
let mut k = vec![0u8; options.l0_sst_size_bytes];
rng.fill_bytes(&mut k);
db.put(&k, &[b'x'; 10]).await.unwrap();
db.flush().await.unwrap();
loop {
let manifest = stored_manifest.refresh().await.unwrap().clone();
if manifest.core.l0.len() > l0s {
break;
}
}
let db_state = stored_manifest.refresh().await.unwrap().core.clone();
let sources = db_state
.l0
.iter()
.map(|h| SourceId::SstView(h.id))
.collect::<Vec<_>>();
let spec = CompactionSpec::new(sources, 0);
let compaction_id = Ulid::new();
let compaction = Compaction::new(compaction_id, spec);
let mut stored_compactions = StoredCompactions::create(
compactions_store.clone(),
stored_manifest.manifest().compactor_epoch,
)
.await
.unwrap();
let mut dirty = stored_compactions.prepare_dirty().unwrap();
dirty.value.insert(compaction);
stored_compactions.update(dirty).await.unwrap();
let scheduler = Arc::new(MockScheduler::new());
let executor = Arc::new(MockExecutor::new());
let rand = Arc::new(DbRand::default());
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let compactor_stats = Arc::new(CompactionStats::new(&recorder));
let mut handler = CompactorEventHandler::new(
manifest_store,
compactions_store.clone(),
compactor_options,
scheduler,
executor.clone(),
rand,
compactor_stats,
system_clock,
)
.await
.unwrap();
handler.handle_ticker().await.unwrap();
let jobs = executor.pop_jobs();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].compaction_id, compaction_id);
let db_state = handler.state().db_state().clone();
let output_sr = SortedRun {
id: jobs[0].destination,
sst_views: db_state.l0.iter().cloned().collect(),
};
let msg = CompactorMessage::CompactionJobFinished {
id: compaction_id,
result: Ok(output_sr),
};
handler.handle(msg).await.unwrap();
let (_, stored_compactions) = compactions_store.read_latest_compactions().await.unwrap();
assert_eq!(
stored_compactions
.get(&compaction_id)
.expect("missing stored compaction")
.status(),
CompactionStatus::Completed
);
}
#[tokio::test]
async fn test_should_fail_when_compactions_store_fences_compactor() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
let stored_compactions = StoredCompactions::load(fixture.compactions_store.clone())
.await
.unwrap();
FenceableCompactions::init(
stored_compactions,
fixture.handler.options.manifest_update_timeout,
fixture.handler.system_clock.clone(),
)
.await
.unwrap();
let result = fixture.handler.handle_ticker().await;
assert!(matches!(result, Err(SlateDBError::Fenced)));
}
#[tokio::test]
async fn test_should_update_failed_compaction_status() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
let job = fixture.assert_started_compaction(1).pop().unwrap();
let (_, stored) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
assert_eq!(stored.iter().collect::<Vec<&Compaction>>().len(), 1);
let msg = CompactorMessage::CompactionJobFinished {
id: job.id,
result: Err(SlateDBError::InvalidDBState),
};
fixture.handler.handle(msg).await.unwrap();
let (_, stored_after) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
assert_eq!(
stored_after
.iter()
.next()
.expect("compaction should be persisted after failure")
.status(),
CompactionStatus::Failed,
"compaction should be marked failed after failure"
);
}
#[tokio::test]
async fn test_should_error_when_finishing_if_compactions_fenced() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
let job = fixture.assert_started_compaction(1).pop().unwrap();
let stored_compactions = StoredCompactions::load(fixture.compactions_store.clone())
.await
.unwrap();
FenceableCompactions::init(
stored_compactions,
fixture.handler.options.manifest_update_timeout,
fixture.handler.system_clock.clone(),
)
.await
.unwrap();
let db_state = fixture.latest_db_state().await;
let output_sr = SortedRun {
id: compaction.destination(),
sst_views: db_state.l0.iter().cloned().collect(),
};
let msg = CompactorMessage::CompactionJobFinished {
id: job.id,
result: Ok(output_sr),
};
let result = fixture.handler.handle(msg).await;
assert!(matches!(result, Err(SlateDBError::Fenced)));
}
#[tokio::test]
async fn test_should_not_schedule_conflicting_compaction() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_started_compaction(1);
fixture.write_l0().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
assert_eq!(0, fixture.executor.pop_jobs().len())
}
#[tokio::test]
async fn test_should_leave_checkpoint_when_removing_ssts_after_compaction() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Ok(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Ok(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
Err(e) => panic!("channel closed before receiving any message: {e}"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
fixture
.handler
.handle(msg)
.await
.expect("fatal error handling compaction message");
let current_dbstate = fixture.latest_db_state().await;
let checkpoint = current_dbstate.checkpoints.last().unwrap();
let old_manifest = fixture
.manifest_store
.read_manifest(checkpoint.manifest_id)
.await
.unwrap();
let l0_ids: Vec<SourceId> = old_manifest
.core
.l0
.iter()
.map(|view| SourceId::SstView(view.id))
.collect();
assert_eq!(&l0_ids, compaction.sources());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[cfg(feature = "zstd")]
async fn test_compactor_compressed_block_size() {
use crate::compactor::stats::BYTES_COMPACTED;
use crate::config::{CompressionCodec, SstBlockSize};
use slatedb_common::metrics::{lookup_metric, DefaultMetricsRecorder};
let os = Arc::new(InMemory::new());
let system_clock = Arc::new(MockSystemClock::new());
let scheduler_options = SizeTieredCompactionSchedulerOptions {
min_compaction_sources: 1,
max_compaction_sources: 999,
include_size_threshold: 4.0,
}
.into();
let mut options = db_options(Some(compactor_options()));
options.l0_sst_size_bytes = 128;
options.compression_codec = Some(CompressionCodec::Zstd);
options
.compactor_options
.as_mut()
.expect("compactor options missing")
.scheduler_options = scheduler_options;
let metrics_recorder = Arc::new(DefaultMetricsRecorder::new());
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_system_clock(system_clock.clone())
.with_sst_block_size(SstBlockSize::Other(128))
.with_metrics_recorder(metrics_recorder.clone())
.build()
.await
.unwrap();
for i in 0..4 {
let k = vec![b'a' + i as u8; 16];
let v = vec![b'b' + i as u8; 48];
db.put(&k, &v).await.unwrap();
let k = vec![b'j' + i as u8; 16];
let v = vec![b'k' + i as u8; 48];
db.put(&k, &v).await.unwrap();
}
db.flush().await.unwrap();
await_compaction(&db, Some(system_clock.clone()))
.await
.expect("db was not compacted");
let bytes_compacted = lookup_metric(&metrics_recorder, BYTES_COMPACTED).unwrap();
assert!(bytes_compacted > 0, "bytes_compacted: {}", bytes_compacted);
}
#[tokio::test]
async fn test_validate_compaction_empty_sources_rejected() {
let fixture = CompactorEventHandlerTestFixture::new().await;
let c = CompactionSpec::new(Vec::new(), 0);
let err = fixture.handler.validate_compaction(&c).unwrap_err();
assert!(matches!(err, SlateDBError::InvalidCompaction));
}
#[tokio::test]
async fn test_validate_compaction_rejects_missing_l0_source() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.handler.handle_ticker().await.unwrap();
let c = CompactionSpec::new(vec![SourceId::SstView(Ulid::new())], 0);
let err = fixture.handler.validate_compaction(&c).unwrap_err();
assert!(matches!(err, SlateDBError::InvalidCompaction));
}
#[tokio::test]
async fn test_validate_compaction_rejects_missing_sr_source() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.handler.handle_ticker().await.unwrap();
let c = CompactionSpec::new(vec![SourceId::SortedRun(42)], 42);
let err = fixture.handler.validate_compaction(&c).unwrap_err();
assert!(matches!(err, SlateDBError::InvalidCompaction));
}
#[tokio::test]
async fn test_validate_compaction_l0_only_ok_when_no_sr() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
fixture.handler.handle_ticker().await.unwrap();
let c = fixture.build_l0_compaction().await;
fixture.handler.validate_compaction(&c).unwrap();
}
#[tokio::test]
async fn test_validate_compaction_l0_only_rejects_when_dest_below_highest_sr() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let c1 = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(c1.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Ok(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Ok(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
Err(e) => panic!("channel closed before receiving any message: {e}"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
fixture.handler.handle(msg).await.unwrap();
fixture.write_l0().await;
fixture.handler.handle_ticker().await.unwrap();
let c2 = fixture.build_l0_compaction().await; let err = fixture.handler.validate_compaction(&c2).unwrap_err();
assert!(matches!(err, SlateDBError::InvalidCompaction));
}
#[tokio::test]
async fn test_validate_compaction_mixed_l0_and_sr_deferred_to_scheduler() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let c1 = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(c1.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Ok(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Ok(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
Err(e) => panic!("channel closed before receiving any message: {e}"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
fixture.handler.handle(msg).await.unwrap();
fixture.write_l0().await;
fixture.handler.handle_ticker().await.unwrap();
let state = fixture.latest_db_state().await;
let sr_id = state.compacted.first().unwrap().id;
let l0_view_id = state.l0.front().unwrap().id;
let mixed = CompactionSpec::new(
vec![SourceId::SortedRun(sr_id), SourceId::SstView(l0_view_id)],
sr_id,
);
fixture.handler.validate_compaction(&mixed).unwrap();
}
#[tokio::test]
async fn test_validate_compaction_rejects_parallel_l0() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
fixture.write_l0().await;
fixture.handler.handle_ticker().await.unwrap();
let state = fixture.latest_db_state().await;
assert!(state.l0.len() >= 2);
let first_l0 = CompactionSpec::new(vec![SourceId::SstView(state.l0.back().unwrap().id)], 0);
fixture.scheduler.inject_compaction(first_l0.clone());
fixture.handler.handle_ticker().await.unwrap();
let second_l0 =
CompactionSpec::new(vec![SourceId::SstView(state.l0.front().unwrap().id)], 1);
let err = fixture.handler.validate_compaction(&second_l0).unwrap_err();
assert!(matches!(err, SlateDBError::InvalidCompaction));
}
async fn run_for<T, F>(duration: Duration, f: impl Fn() -> F) -> Option<T>
where
F: Future<Output = Option<T>>,
{
#[allow(clippy::disallowed_methods)]
let now = SystemTime::now();
while now.elapsed().unwrap() < duration {
let maybe_result = f().await;
if maybe_result.is_some() {
return maybe_result;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
None
}
fn build_test_stores(
os: Arc<dyn ObjectStore>,
) -> (Arc<ManifestStore>, Arc<CompactionsStore>, Arc<TableStore>) {
let sst_format = SsTableFormat {
block_size: 32,
min_filter_keys: 10,
..SsTableFormat::default()
};
let manifest_store = Arc::new(ManifestStore::new(&Path::from(PATH), os.clone()));
let compactions_store = Arc::new(CompactionsStore::new(&Path::from(PATH), os.clone()));
let table_store = Arc::new(TableStore::new(
ObjectStores::new(os.clone(), None),
sst_format,
Path::from(PATH),
None,
));
(manifest_store, compactions_store, table_store)
}
async fn await_compaction(
db: &Db,
clock: Option<Arc<dyn SystemClock>>,
) -> Option<ManifestCore> {
run_for(Duration::from_secs(10), || async {
if let Some(clock) = &clock {
clock.as_ref().advance(Duration::from_millis(60000)).await;
}
let (empty_wal, empty_memtable, core_db_state) = {
let db_state = db.inner.state.read();
let cow_db_state = db_state.state();
(
db.inner.wal_buffer.is_empty(),
db_state.memtable().is_empty() && cow_db_state.imm_memtable.is_empty(),
db_state.state().core().clone(),
)
};
let empty_l0 = core_db_state.l0.is_empty();
let compaction_ran = !core_db_state.compacted.is_empty();
if empty_wal && empty_memtable && empty_l0 && compaction_ran {
return Some(core_db_state);
}
None
})
.await
}
#[allow(unused)] async fn await_compacted_compaction(
manifest_store: Arc<ManifestStore>,
old_compacted: Vec<SortedRun>,
clock: Option<Arc<dyn SystemClock>>,
) -> Option<ManifestCore> {
run_for(Duration::from_secs(10), || async {
if let Some(clock) = &clock {
clock.as_ref().advance(Duration::from_millis(60000)).await;
}
let db_state = get_db_state(manifest_store.clone()).await;
if !db_state.compacted.eq(&old_compacted) {
return Some(db_state);
}
None
})
.await
}
async fn get_db_state(manifest_store: Arc<ManifestStore>) -> ManifestCore {
let stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
stored_manifest.db_state().clone()
}
fn db_options(compactor_options: Option<CompactorOptions>) -> Settings {
Settings {
flush_interval: Some(Duration::from_millis(100)),
#[cfg(feature = "wal_disable")]
wal_enabled: true,
manifest_poll_interval: Duration::from_millis(100),
manifest_update_timeout: Duration::from_secs(300),
l0_sst_size_bytes: 256,
l0_max_ssts: 8,
compactor_options,
..Settings::default()
}
}
fn compactor_options() -> CompactorOptions {
CompactorOptions {
poll_interval: Duration::from_millis(100),
max_concurrent_compactions: 1,
scheduler_options: Default::default(),
..CompactorOptions::default()
}
}
fn compactor_builder_with_scheduler(
os: Arc<dyn ObjectStore>,
scheduler_supplier: Arc<dyn CompactionSchedulerSupplier>,
system_clock: Arc<dyn SystemClock>,
) -> CompactorBuilder<&'static str> {
CompactorBuilder::new(PATH, os)
.with_system_clock(system_clock)
.with_options(compactor_options())
.with_scheduler_supplier(scheduler_supplier)
}
struct MockSchedulerInner {
compaction: Vec<CompactionSpec>,
}
#[derive(Clone)]
struct MockScheduler {
inner: Arc<Mutex<MockSchedulerInner>>,
}
impl MockScheduler {
fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(MockSchedulerInner { compaction: vec![] })),
}
}
fn inject_compaction(&self, compaction: CompactionSpec) {
let mut inner = self.inner.lock();
inner.compaction.push(compaction);
}
}
impl CompactionScheduler for MockScheduler {
fn propose(&self, _state: &CompactorStateView) -> Vec<CompactionSpec> {
let mut inner = self.inner.lock();
std::mem::take(&mut inner.compaction)
}
}
struct MockExecutorInner {
jobs: Vec<StartCompactionJobArgs>,
}
#[derive(Clone)]
struct MockExecutor {
inner: Arc<Mutex<MockExecutorInner>>,
}
impl MockExecutor {
fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(MockExecutorInner { jobs: vec![] })),
}
}
fn pop_jobs(&self) -> Vec<StartCompactionJobArgs> {
let mut guard = self.inner.lock();
std::mem::take(&mut guard.jobs)
}
}
impl CompactionExecutor for MockExecutor {
fn start_compaction_job(&self, compaction: StartCompactionJobArgs) {
let mut guard = self.inner.lock();
guard.jobs.push(compaction);
}
fn stop(&self) {}
fn is_stopped(&self) -> bool {
false
}
}
}