use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use futures::stream::BoxStream;
use log::{debug, error, info, warn};
use tokio::runtime::Handle;
use tracing::instrument;
use ulid::Ulid;
use crate::clock::SystemClock;
use crate::compactions_store::CompactionsStore;
use crate::compactor::stats::CompactionStats;
use crate::compactor_executor::{
CompactionExecutor, StartCompactionJobArgs, TokioCompactionExecutor,
};
use crate::compactor_state::Compaction;
use crate::compactor_state_protocols::CompactorStateWriter;
use crate::config::CompactorOptions;
use crate::db_state::SortedRun;
use crate::dispatcher::{MessageFactory, MessageHandler, MessageHandlerExecutor};
use crate::error::{Error, SlateDBError};
use crate::manifest::store::ManifestStore;
use crate::merge_operator::MergeOperatorType;
use crate::rand::DbRand;
pub use crate::size_tiered_compaction::SizeTieredCompactionSchedulerSupplier;
use crate::stats::StatRegistry;
use crate::tablestore::TableStore;
use crate::utils::{format_bytes_si, IdGenerator, WatchableOnceCell};
pub use crate::compactor_state::{CompactionSpec, CompactionStatus, CompactorState, SourceId};
pub use crate::db::builder::CompactorBuilder;
pub(crate) const COMPACTOR_TASK_NAME: &str = "compactor";
pub trait CompactionSchedulerSupplier: Send + Sync {
fn compaction_scheduler(
&self,
options: &CompactorOptions,
) -> Box<dyn CompactionScheduler + Send + Sync>;
}
pub trait CompactionScheduler: Send + Sync {
fn maybe_schedule_compaction(&self, state: &CompactorState) -> Vec<CompactionSpec>;
fn validate_compaction(
&self,
_state: &CompactorState,
_compaction: &CompactionSpec,
) -> Result<(), Error> {
Ok(())
}
}
#[derive(Debug)]
pub(crate) enum CompactorMessage {
CompactionJobFinished {
id: Ulid,
result: Result<SortedRun, SlateDBError>,
},
CompactionJobProgress {
id: Ulid,
bytes_processed: u64,
},
LogStats,
PollManifest,
}
#[derive(Clone)]
pub struct Compactor {
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
table_store: Arc<TableStore>,
options: Arc<CompactorOptions>,
scheduler_supplier: Arc<dyn CompactionSchedulerSupplier>,
task_executor: Arc<MessageHandlerExecutor>,
compactor_runtime: Handle,
rand: Arc<DbRand>,
stats: Arc<CompactionStats>,
system_clock: Arc<dyn SystemClock>,
merge_operator: Option<MergeOperatorType>,
}
impl Compactor {
pub(crate) fn new(
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
table_store: Arc<TableStore>,
options: CompactorOptions,
scheduler_supplier: Arc<dyn CompactionSchedulerSupplier>,
compactor_runtime: Handle,
rand: Arc<DbRand>,
stat_registry: Arc<StatRegistry>,
system_clock: Arc<dyn SystemClock>,
closed_result: WatchableOnceCell<Result<(), SlateDBError>>,
merge_operator: Option<MergeOperatorType>,
) -> Self {
let stats = Arc::new(CompactionStats::new(stat_registry));
let task_executor = Arc::new(MessageHandlerExecutor::new(
closed_result.clone(),
system_clock.clone(),
));
Self {
manifest_store,
compactions_store,
table_store,
options: Arc::new(options),
scheduler_supplier,
task_executor,
compactor_runtime,
rand,
stats,
system_clock,
merge_operator,
}
}
pub async fn run(&self) -> Result<(), crate::Error> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let scheduler = Arc::from(self.scheduler_supplier.compaction_scheduler(&self.options));
let executor = Arc::new(TokioCompactionExecutor::new(
self.compactor_runtime.clone(),
self.options.clone(),
tx,
self.table_store.clone(),
self.rand.clone(),
self.stats.clone(),
self.system_clock.clone(),
self.manifest_store.clone(),
self.merge_operator.clone(),
));
let handler = CompactorEventHandler::new(
self.manifest_store.clone(),
self.compactions_store.clone(),
self.options.clone(),
scheduler,
executor,
self.rand.clone(),
self.stats.clone(),
self.system_clock.clone(),
)
.await?;
self.task_executor
.add_handler(
COMPACTOR_TASK_NAME.to_string(),
Box::new(handler),
rx,
&Handle::current(),
)
.expect("failed to spawn compactor task");
self.task_executor.monitor_on(&Handle::current())?;
self.task_executor
.join_task(COMPACTOR_TASK_NAME)
.await
.map_err(|e| e.into())
}
pub async fn stop(&self) -> Result<(), crate::Error> {
self.task_executor
.shutdown_task(COMPACTOR_TASK_NAME)
.await
.map_err(|e| e.into())
}
}
pub(crate) struct CompactorEventHandler {
state_writer: CompactorStateWriter,
options: Arc<CompactorOptions>,
scheduler: Arc<dyn CompactionScheduler + Send + Sync>,
executor: Arc<dyn CompactionExecutor + Send + Sync>,
rand: Arc<DbRand>,
stats: Arc<CompactionStats>,
system_clock: Arc<dyn SystemClock>,
}
#[async_trait]
impl MessageHandler<CompactorMessage> for CompactorEventHandler {
fn tickers(&mut self) -> Vec<(Duration, Box<MessageFactory<CompactorMessage>>)> {
vec![
(
self.options.poll_interval,
Box::new(|| CompactorMessage::PollManifest),
),
(
Duration::from_secs(10),
Box::new(|| CompactorMessage::LogStats),
),
]
}
async fn handle(&mut self, message: CompactorMessage) -> Result<(), SlateDBError> {
match message {
CompactorMessage::LogStats => self.handle_log_ticker(),
CompactorMessage::PollManifest => self.handle_ticker().await?,
CompactorMessage::CompactionJobFinished { id, result } => {
match result {
Ok(sr) => self.finish_compaction(id, sr).await?,
Err(err) => {
error!("error executing compaction [error={:#?}]", err);
self.finish_failed_compaction(id).await?;
}
}
self.maybe_schedule_compactions().await?;
}
CompactorMessage::CompactionJobProgress {
id,
bytes_processed,
} => {
self.state_mut().update_compaction(&id, |c| {
c.set_status(CompactionStatus::Running);
c.set_bytes_processed(bytes_processed);
});
}
}
Ok(())
}
async fn cleanup(
&mut self,
mut _messages: BoxStream<'async_trait, CompactorMessage>,
_result: Result<(), SlateDBError>,
) -> Result<(), SlateDBError> {
self.stop_executor().await?;
Ok(())
}
}
impl CompactorEventHandler {
pub(crate) async fn new(
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
options: Arc<CompactorOptions>,
scheduler: Arc<dyn CompactionScheduler + Send + Sync>,
executor: Arc<dyn CompactionExecutor + Send + Sync>,
rand: Arc<DbRand>,
stats: Arc<CompactionStats>,
system_clock: Arc<dyn SystemClock>,
) -> Result<Self, SlateDBError> {
let state_writer = CompactorStateWriter::new(
manifest_store,
compactions_store,
system_clock.clone(),
options.as_ref(),
rand.clone(),
)
.await?;
Ok(Self {
state_writer,
options,
scheduler,
executor,
rand,
stats,
system_clock,
})
}
fn state(&self) -> &CompactorState {
&self.state_writer.state
}
fn state_mut(&mut self) -> &mut CompactorState {
&mut self.state_writer.state
}
fn handle_log_ticker(&self) {
self.log_compaction_state();
self.log_compaction_throughput();
}
fn log_compaction_throughput(&self) {
let current_time = self.system_clock.now();
let current_time_ms = current_time.timestamp_millis() as u64;
let db_state = self.state().db_state();
let mut total_estimated_bytes = 0u64;
let mut total_bytes_processed = 0u64;
let mut total_elapsed_secs = 0.0f64;
for compaction in self.state().compactions() {
let estimated_source_bytes =
Self::calculate_estimated_source_bytes(compaction, db_state);
total_estimated_bytes += estimated_source_bytes;
total_bytes_processed += compaction.bytes_processed();
let start_time_ms = compaction
.id()
.datetime()
.duration_since(std::time::UNIX_EPOCH)
.expect("invalid duration")
.as_millis() as u64;
let elapsed_secs = if start_time_ms > 0 {
(current_time_ms as f64 - start_time_ms as f64) / 1000.0
} else {
0.0
};
total_elapsed_secs += elapsed_secs;
let throughput = if elapsed_secs > 0.0 {
compaction.bytes_processed() as f64 / elapsed_secs
} else {
0.0
};
let percentage = if estimated_source_bytes > 0 {
(compaction.bytes_processed() * 100 / estimated_source_bytes) as u32
} else {
0
};
debug!(
"compaction progress [id={}, progress={}%, processed_bytes={}, estimated_source_bytes={}, elapsed={:.2}s, throughput={}/s]",
compaction.id(),
percentage,
format_bytes_si(compaction.bytes_processed()),
format_bytes_si(estimated_source_bytes),
elapsed_secs,
format_bytes_si(throughput as u64),
);
}
let total_throughput = if total_elapsed_secs > 0.0 {
total_bytes_processed as f64 / total_elapsed_secs
} else {
0.0
};
self.stats
.total_bytes_being_compacted
.set(total_estimated_bytes);
self.stats.total_throughput.set(total_throughput as u64);
}
fn calculate_estimated_source_bytes(
compaction: &Compaction,
db_state: &crate::db_state::CoreDbState,
) -> u64 {
use crate::db_state::{SortedRun, SsTableHandle, SsTableId};
use std::collections::HashMap;
let ssts_by_id: HashMap<Ulid, &SsTableHandle> = db_state
.l0
.iter()
.map(|sst| match sst.id {
SsTableId::Compacted(id) => (id, sst),
SsTableId::Wal(_) => unreachable!("L0 SSTs should never have SsTableId::Wal"),
})
.collect();
let srs_by_id: HashMap<u32, &SortedRun> =
db_state.compacted.iter().map(|sr| (sr.id, sr)).collect();
compaction
.spec()
.sources()
.iter()
.map(|source| match source {
SourceId::Sst(id) => ssts_by_id
.get(id)
.expect("compaction source SST not found in L0")
.estimate_size(),
SourceId::SortedRun(id) => srs_by_id
.get(id)
.expect("compaction source sorted run not found")
.estimate_size(),
})
.sum()
}
async fn handle_ticker(&mut self) -> Result<(), SlateDBError> {
if !self.is_executor_stopped() {
self.state_writer.load_manifest().await?;
self.maybe_schedule_compactions().await?;
}
Ok(())
}
async fn stop_executor(&self) -> Result<(), SlateDBError> {
let this_executor = self.executor.clone();
#[cfg(not(dst))]
#[allow(clippy::disallowed_methods)]
let result = tokio::task::spawn_blocking(move || {
this_executor.stop();
})
.await
.map_err(|_| SlateDBError::CompactionExecutorFailed);
#[cfg(dst)]
let result = tokio::spawn(async move {
this_executor.stop();
})
.await
.map_err(|_| SlateDBError::CompactionExecutorFailed);
result
}
fn is_executor_stopped(&self) -> bool {
self.executor.is_stopped()
}
fn validate_compaction(&self, compaction: &CompactionSpec) -> Result<(), SlateDBError> {
if compaction.sources().is_empty() {
warn!("submitted compaction is empty: {:?}", compaction.sources());
return Err(SlateDBError::InvalidCompaction);
}
let has_only_l0 = compaction
.sources()
.iter()
.all(|s| matches!(s, SourceId::Sst(_)));
if has_only_l0 {
let highest_id = self
.state()
.db_state()
.compacted
.first()
.map_or(0, |sr| sr.id + 1);
if compaction.destination() < highest_id {
warn!("compaction destination is lesser than the expected L0-only highest_id: {:?} {:?}",
compaction.destination(), highest_id);
return Err(SlateDBError::InvalidCompaction);
}
}
self.scheduler
.validate_compaction(self.state(), compaction)
.map_err(|_e| SlateDBError::InvalidCompaction)
}
async fn maybe_schedule_compactions(&mut self) -> Result<(), SlateDBError> {
let mut specs = self.scheduler.maybe_schedule_compaction(self.state());
for spec in specs.drain(..) {
let active_compactions = self.state().compactions().count();
if active_compactions >= self.options.max_concurrent_compactions {
info!(
"already running {} compactions, which is at the max {}. Won't run compaction {:?}",
active_compactions,
self.options.max_concurrent_compactions,
spec
);
break;
}
let compaction_id = self.rand.rng().gen_ulid(self.system_clock.as_ref());
let compaction = Compaction::new(compaction_id, spec);
self.submit_compaction(compaction).await?;
}
Ok(())
}
async fn start_compaction(
&mut self,
job_id: Ulid,
compaction: Compaction,
) -> Result<(), SlateDBError> {
self.log_compaction_state();
let db_state = self.state().db_state();
let ssts = compaction.get_ssts(db_state);
let sorted_runs = compaction.get_sorted_runs(db_state);
let spec = compaction.spec();
let is_dest_last_run = db_state.compacted.is_empty()
|| db_state
.compacted
.last()
.is_some_and(|sr| spec.destination() == sr.id);
let job_args = StartCompactionJobArgs {
id: job_id,
compaction_id: compaction.id(),
destination: spec.destination(),
ssts,
sorted_runs,
compaction_logical_clock_tick: db_state.last_l0_clock_tick,
retention_min_seq: Some(db_state.recent_snapshot_min_seq),
is_dest_last_run,
estimated_source_bytes: Self::calculate_estimated_source_bytes(&compaction, db_state),
};
let this_executor = self.executor.clone();
#[cfg(not(dst))]
#[allow(clippy::disallowed_methods)]
let result = tokio::task::spawn_blocking(move || {
this_executor.start_compaction_job(job_args);
})
.await
.map_err(|_| SlateDBError::CompactionExecutorFailed);
#[cfg(dst)]
let result = tokio::spawn(async move {
this_executor.start_compaction_job(job_args);
})
.await
.map_err(|_| SlateDBError::CompactionExecutorFailed);
result
}
async fn finish_failed_compaction(&mut self, id: Ulid) -> Result<(), SlateDBError> {
self.state_mut().remove_compaction(&id);
self.state_writer.write_compactions_safely().await?;
Ok(())
}
#[instrument(level = "debug", skip_all, fields(id = %id))]
async fn finish_compaction(
&mut self,
id: Ulid,
output_sr: SortedRun,
) -> Result<(), SlateDBError> {
self.state_mut().finish_compaction(id, output_sr);
self.log_compaction_state();
self.state_writer.write_state_safely().await?;
self.maybe_schedule_compactions().await?;
self.stats
.last_compaction_ts
.set(self.system_clock.now().timestamp() as u64);
Ok(())
}
#[instrument(level = "debug", skip_all, fields(id = tracing::field::Empty))]
async fn submit_compaction(&mut self, compaction: Compaction) -> Result<(), SlateDBError> {
if let Err(e) = self.validate_compaction(compaction.spec()) {
warn!("invalid compaction [error={:?}]", e);
return Ok(());
}
self.state_mut().add_compaction(compaction.clone())?;
self.state_writer.write_compactions_safely().await?;
let job_id = compaction.id();
tracing::Span::current().record("id", tracing::field::display(&job_id));
if let Err(err) = self.start_compaction(job_id, compaction).await {
self.state_mut().remove_compaction(&job_id);
self.state_writer.write_compactions_safely().await?;
return Err(err);
}
Ok(())
}
fn log_compaction_state(&self) {
self.state().db_state().log_db_runs();
let compactions = self.state().compactions();
for compaction in compactions {
if log::log_enabled!(log::Level::Debug) {
debug!("in-flight compaction [compaction={:?}]", compaction);
} else {
info!("in-flight compaction [compaction={}]", compaction);
}
}
}
}
pub mod stats {
use crate::stats::{Counter, Gauge, StatRegistry};
use std::sync::Arc;
macro_rules! compactor_stat_name {
($suffix:expr) => {
crate::stat_name!("compactor", $suffix)
};
}
pub const BYTES_COMPACTED: &str = compactor_stat_name!("bytes_compacted");
pub const LAST_COMPACTION_TS_SEC: &str = compactor_stat_name!("last_compaction_timestamp_sec");
pub const RUNNING_COMPACTIONS: &str = compactor_stat_name!("running_compactions");
pub const TOTAL_BYTES_BEING_COMPACTED: &str =
compactor_stat_name!("total_bytes_being_compacted");
pub const TOTAL_THROUGHPUT_BYTES_PER_SEC: &str =
compactor_stat_name!("total_throughput_bytes_per_sec");
pub(crate) struct CompactionStats {
pub(crate) last_compaction_ts: Arc<Gauge<u64>>,
pub(crate) running_compactions: Arc<Gauge<i64>>,
pub(crate) bytes_compacted: Arc<Counter>,
pub(crate) total_bytes_being_compacted: Arc<Gauge<u64>>,
pub(crate) total_throughput: Arc<Gauge<u64>>,
}
impl CompactionStats {
pub(crate) fn new(stat_registry: Arc<StatRegistry>) -> Self {
let stats = Self {
last_compaction_ts: Arc::new(Gauge::default()),
running_compactions: Arc::new(Gauge::default()),
bytes_compacted: Arc::new(Counter::default()),
total_bytes_being_compacted: Arc::new(Gauge::default()),
total_throughput: Arc::new(Gauge::default()),
};
stat_registry.register(LAST_COMPACTION_TS_SEC, stats.last_compaction_ts.clone());
stat_registry.register(RUNNING_COMPACTIONS, stats.running_compactions.clone());
stat_registry.register(BYTES_COMPACTED, stats.bytes_compacted.clone());
stat_registry.register(
TOTAL_BYTES_BEING_COMPACTED,
stats.total_bytes_being_compacted.clone(),
);
stat_registry.register(
TOTAL_THROUGHPUT_BYTES_PER_SEC,
stats.total_throughput.clone(),
);
stats
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::ObjectStore;
use parking_lot::Mutex;
use rand::RngCore;
use ulid::Ulid;
use super::*;
use crate::clock::DefaultSystemClock;
use crate::compactions_store::{FenceableCompactions, StoredCompactions};
use crate::compactor::stats::CompactionStats;
use crate::compactor::stats::LAST_COMPACTION_TS_SEC;
use crate::compactor_executor::{CompactionExecutor, TokioCompactionExecutor};
use crate::compactor_state::CompactionStatus;
use crate::compactor_state::{CompactorState, SourceId};
use crate::config::{
PutOptions, Settings, SizeTieredCompactionSchedulerOptions, Ttl, WriteOptions,
};
use crate::db::Db;
use crate::db_state::{CoreDbState, SortedRun};
use crate::error::SlateDBError;
use crate::iter::KeyValueIterator;
use crate::manifest::store::{ManifestStore, StoredManifest};
use crate::merge_operator::{MergeOperator, MergeOperatorError};
use crate::object_stores::ObjectStores;
use crate::proptest_util::rng;
use crate::size_tiered_compaction::SizeTieredCompactionSchedulerSupplier;
use crate::sst::SsTableFormat;
use crate::sst_iter::{SstIterator, SstIteratorOptions};
use crate::stats::StatRegistry;
use crate::tablestore::TableStore;
use crate::test_utils::{assert_iterator, TestClock};
use crate::types::RowEntry;
use bytes::Bytes;
const PATH: &str = "/test/db";
struct StringConcatMergeOperator;
impl MergeOperator for StringConcatMergeOperator {
fn merge(
&self,
_key: &Bytes,
existing_value: Option<Bytes>,
value: Bytes,
) -> Result<Bytes, MergeOperatorError> {
let mut result = existing_value.unwrap_or_default().as_ref().to_vec();
result.extend_from_slice(&value);
Ok(Bytes::from(result))
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_compactor_compacts_l0() {
let os = Arc::new(InMemory::new());
let logical_clock = Arc::new(TestClock::new());
let compaction_scheduler = Arc::new(SizeTieredCompactionSchedulerSupplier::new(
SizeTieredCompactionSchedulerOptions {
min_compaction_sources: 1,
max_compaction_sources: 999,
include_size_threshold: 4.0,
},
));
let mut options = db_options(Some(compactor_options()));
options.l0_sst_size_bytes = 128;
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(logical_clock)
.with_compaction_scheduler_supplier(compaction_scheduler)
.build()
.await
.unwrap();
let (_, _, table_store) = build_test_stores(os.clone());
let mut expected = HashMap::<Vec<u8>, Vec<u8>>::new();
for i in 0..4 {
let k = vec![b'a' + i as u8; 16];
let v = vec![b'b' + i as u8; 48];
expected.insert(k.clone(), v.clone());
db.put(&k, &v).await.unwrap();
let k = vec![b'j' + i as u8; 16];
let v = vec![b'k' + i as u8; 48];
db.put(&k, &v).await.unwrap();
expected.insert(k.clone(), v.clone());
}
db.flush().await.unwrap();
let db_state = await_compaction(&db).await;
let db_state = db_state.expect("db was not compacted");
for run in db_state.compacted {
for sst in run.ssts {
let mut iter = SstIterator::new_borrowed_initialized(
..,
&sst,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
while let Some(kv) = iter.next().await.unwrap() {
let expected_v = expected
.remove(kv.key.as_ref())
.expect("removing unexpected key");
let db_v = db.get(kv.key.as_ref()).await.unwrap().unwrap();
assert_eq!(expected_v, db_v.as_ref());
}
}
}
assert!(expected.is_empty());
}
#[cfg(feature = "wal_disable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_tombstones_in_l0() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let logical_clock = Arc::new(TestClock::new());
let scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| {
state.db_state().l0.len() == 2 ||
(state.db_state().l0.len() == 1 && state.db_state().compacted.len() == 1)
},
)));
let mut options = db_options(Some(compactor_options()));
options.wal_enabled = false;
options.l0_sst_size_bytes = 128;
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(logical_clock)
.with_compaction_scheduler_supplier(scheduler.clone())
.build()
.await
.unwrap();
let (manifest_store, _, table_store) = build_test_stores(os.clone());
db.put(&[b'a'; 16], &[b'a'; 32]).await.unwrap();
db.put(&[b'b'; 16], &[b'a'; 32]).await.unwrap();
db.flush().await.unwrap();
let db_state = await_compaction(&db).await.unwrap();
assert_eq!(db_state.compacted.len(), 1);
assert_eq!(db_state.l0.len(), 0, "{:?}", db_state.l0);
db.delete_with_options(
&[b'a'; 16],
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush().await.unwrap();
let db_state = get_db_state(manifest_store.clone()).await;
assert_eq!(db_state.l0.len(), 1, "{:?}", db_state.l0);
assert_eq!(db_state.compacted.len(), 1);
let l0 = db_state.l0.front().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
l0,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let tombstone = iter.next_entry().await.unwrap();
assert!(tombstone.unwrap().value.is_tombstone());
let db_state = await_compacted_compaction(manifest_store.clone(), db_state.compacted)
.await
.unwrap();
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().ssts;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let next = iter.next().await.unwrap();
assert_eq!(next.unwrap().key.as_ref(), &[b'b'; 16]);
let next = iter.next().await.unwrap();
assert!(next.is_none());
}
#[cfg(feature = "wal_disable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_not_filter_tombstone_with_snapshot() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let logical_clock = Arc::new(TestClock::new());
let scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| {
state.db_state().l0.len() == 2 ||
(state.db_state().l0.len() == 1 && state.db_state().compacted.len() == 1)
},
)));
let mut options = db_options(Some(compactor_options()));
options.wal_enabled = false;
options.l0_sst_size_bytes = 128;
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(logical_clock)
.with_compaction_scheduler_supplier(scheduler.clone())
.build()
.await
.unwrap();
let (manifest_store, _, table_store) = build_test_stores(os.clone());
db.put(&[b'a'; 16], &[b'a'; 32]).await.unwrap();
db.put(&[b'b'; 16], &[b'a'; 32]).await.unwrap();
let _snapshot = db.snapshot().await.unwrap();
db.flush().await.unwrap();
let db_state = await_compaction(&db).await.unwrap();
assert_eq!(db_state.compacted.len(), 1);
assert_eq!(db_state.l0.len(), 0, "{:?}", db_state.l0);
db.delete_with_options(
&[b'a'; 16],
&WriteOptions {
await_durable: false,
},
)
.await
.unwrap();
db.flush().await.unwrap();
let db_state = get_db_state(manifest_store.clone()).await;
assert_eq!(db_state.l0.len(), 1, "{:?}", db_state.l0);
assert_eq!(db_state.compacted.len(), 1);
let db_state = await_compacted_compaction(manifest_store.clone(), db_state.compacted)
.await
.unwrap();
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().ssts;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let next = iter.next().await.unwrap();
let entry_a = next.unwrap();
assert_eq!(entry_a.key.as_ref(), &[b'a'; 16]);
let next = iter.next().await.unwrap();
let entry_b = next.unwrap();
assert_eq!(entry_b.key.as_ref(), &[b'b'; 16]);
let next = iter.next().await.unwrap();
assert!(
next.is_none(),
"Expected two keys (a and b) in the compacted SST"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_apply_merge_during_l0_compaction() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let logical_clock = Arc::new(TestClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.db_state().l0.len() >= 2,
)));
let options = db_options(Some(compactor_options()));
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(logical_clock)
.with_compaction_scheduler_supplier(compaction_scheduler)
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (_manifest_store, _, table_store) = build_test_stores(os.clone());
db.merge(b"key1", b"a").await.unwrap();
db.merge(b"key1", b"b").await.unwrap();
db.put(&vec![b'x'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
db.merge(b"key1", b"c").await.unwrap();
db.merge(b"key2", b"x").await.unwrap();
db.put(&vec![b'y'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
let db_state = await_compaction(&db).await;
let db_state = db_state.expect("db was not compacted");
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().ssts;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_merge(b"key1", b"abc", 4).with_create_ts(0),
RowEntry::new_merge(b"key2", b"x", 5).with_create_ts(0),
RowEntry::new_value(&[b'x'; 16], &[b'p'; 128], 3).with_create_ts(0),
RowEntry::new_value(&[b'y'; 16], &[b'p'; 128], 6).with_create_ts(0),
],
)
.await;
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("abc")));
let result = db.get(b"key2").await.unwrap();
assert_eq!(result, Some(Bytes::from("x")));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_apply_merge_across_l0_and_sorted_runs() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let logical_clock = Arc::new(TestClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| !state.db_state().l0.is_empty(),
)));
let options = db_options(Some(compactor_options()));
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(logical_clock)
.with_compaction_scheduler_supplier(compaction_scheduler)
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (_manifest_store, _, table_store) = build_test_stores(os.clone());
db.merge(b"key1", b"a").await.unwrap();
db.merge(b"key1", b"b").await.unwrap();
db.put(&vec![b'x'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
let db_state = await_compaction(&db).await;
let db_state = db_state.expect("db was not compacted");
assert_eq!(db_state.compacted.len(), 1);
db.merge(b"key1", b"c").await.unwrap();
db.merge(b"key1", b"d").await.unwrap();
db.put(&vec![b'y'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
let db_state = await_compaction(&db).await;
let db_state = db_state.expect("db was not compacted");
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().ssts;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_merge(b"key1", b"abcd", 5).with_create_ts(0),
RowEntry::new_value(&[b'x'; 16], &[b'p'; 128], 3).with_create_ts(0),
RowEntry::new_value(&[b'y'; 16], &[b'p'; 128], 6).with_create_ts(0),
],
)
.await;
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("abcd")));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_merge_without_base_value() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let logical_clock = Arc::new(TestClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.db_state().l0.len() >= 2,
)));
let options = db_options(Some(compactor_options()));
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(logical_clock)
.with_compaction_scheduler_supplier(compaction_scheduler)
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (_manifest_store, _, table_store) = build_test_stores(os.clone());
db.merge(b"key1", b"x").await.unwrap();
db.put(&vec![b'x'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
db.merge(b"key1", b"y").await.unwrap();
db.merge(b"key1", b"z").await.unwrap();
db.put(&vec![b'y'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
let db_state = await_compaction(&db).await;
let db_state = db_state.expect("db was not compacted");
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().ssts;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_merge(b"key1", b"xyz", 4).with_create_ts(0),
RowEntry::new_value(&[b'x'; 16], &[b'p'; 128], 2).with_create_ts(0),
RowEntry::new_value(&[b'y'; 16], &[b'p'; 128], 5).with_create_ts(0),
],
)
.await;
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("xyz")));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_preserve_merge_order_across_multiple_ssts() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let logical_clock = Arc::new(TestClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.db_state().l0.len() >= 3,
)));
let options = db_options(Some(compactor_options()));
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(logical_clock)
.with_compaction_scheduler_supplier(compaction_scheduler)
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (_manifest_store, _, table_store) = build_test_stores(os.clone());
db.merge(b"key1", b"1").await.unwrap();
db.put(&vec![b'a'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
db.merge(b"key1", b"2").await.unwrap();
db.put(&vec![b'b'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
db.merge(b"key1", b"3").await.unwrap();
db.put(&vec![b'c'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
let db_state = await_compaction(&db).await;
let db_state = db_state.expect("db was not compacted");
assert_eq!(db_state.compacted.len(), 1);
let compacted = &db_state.compacted.first().unwrap().ssts;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(&[b'a'; 16], &[b'p'; 128], 2).with_create_ts(0),
RowEntry::new_value(&[b'b'; 16], &[b'p'; 128], 4).with_create_ts(0),
RowEntry::new_value(&[b'c'; 16], &[b'p'; 128], 6).with_create_ts(0),
RowEntry::new_merge(b"key1", b"123", 5).with_create_ts(0),
],
)
.await;
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("123")));
}
#[cfg(feature = "wal_disable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_not_compact_expired_merge_operations_in_last_run() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let insert_clock = Arc::new(TestClock::new());
let scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.db_state().l0.len() >= 2,
)));
let mut options = db_options(Some(compactor_options()));
options.wal_enabled = false;
options.l0_sst_size_bytes = 128;
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(insert_clock.clone())
.with_compaction_scheduler_supplier(scheduler)
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (_manifest_store, _, table_store) = build_test_stores(os.clone());
insert_clock.ticker.store(0, atomic::Ordering::SeqCst);
db.merge_with_options(
b"key1",
&[b'a'; 32],
&crate::config::MergeOptions {
ttl: Ttl::ExpireAfter(10),
},
&WriteOptions {
await_durable: true,
},
)
.await
.unwrap();
insert_clock.ticker.store(20, atomic::Ordering::SeqCst);
db.merge_with_options(
b"key1",
&[b'b'; 32],
&crate::config::MergeOptions { ttl: Ttl::NoExpiry },
&WriteOptions {
await_durable: true,
},
)
.await
.unwrap();
let db_state = await_compaction(&db).await.unwrap();
assert_eq!(db_state.compacted.len(), 1);
assert_eq!(db_state.last_l0_clock_tick, 20);
let compacted = &db_state.compacted.first().unwrap().ssts;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![RowEntry::new_merge(b"key1", &[b'b'; 32], 2).with_create_ts(20)],
)
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_merge_and_then_overwrite_with_put() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let logical_clock = Arc::new(TestClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.db_state().l0.len() >= 2,
)));
let options = db_options(Some(compactor_options()));
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(logical_clock)
.with_compaction_scheduler_supplier(compaction_scheduler)
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (manifest_store, _compactions_store, _table_store) = build_test_stores(os.clone());
db.merge(b"key1", b"a").await.unwrap();
db.merge(b"key1", b"b").await.unwrap();
db.put(&vec![b'x'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
db.put(b"key1", b"new_value").await.unwrap();
db.put(&vec![b'y'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let result = db.get(b"key1").await.unwrap();
assert_eq!(result, Some(Bytes::from("new_value")));
let stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let db_state = stored_manifest.db_state();
assert!(
!db_state.compacted.is_empty(),
"compaction should have occurred"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn should_not_merge_operations_with_different_expire_times() {
use crate::test_utils::OnDemandCompactionSchedulerSupplier;
let os = Arc::new(InMemory::new());
let logical_clock = Arc::new(TestClock::new());
let compaction_scheduler = Arc::new(OnDemandCompactionSchedulerSupplier::new(Arc::new(
|state| state.db_state().l0.len() >= 2,
)));
let options = db_options(Some(compactor_options()));
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(logical_clock)
.with_compaction_scheduler_supplier(compaction_scheduler)
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
let (manifest_store, _compactions_store, table_store) = build_test_stores(os.clone());
db.merge_with_options(
b"key1",
b"a",
&crate::config::MergeOptions {
ttl: Ttl::ExpireAfter(100),
},
&WriteOptions::default(),
)
.await
.unwrap();
db.put(&vec![b'x'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
db.merge_with_options(
b"key1",
b"b",
&crate::config::MergeOptions {
ttl: Ttl::ExpireAfter(200),
},
&WriteOptions::default(),
)
.await
.unwrap();
db.put(&vec![b'y'; 16], &vec![b'p'; 128]).await.unwrap(); db.flush().await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
let db_state = stored_manifest.db_state();
assert!(
!db_state.compacted.is_empty(),
"compaction should have occurred"
);
let compacted = &db_state.compacted.first().unwrap().ssts;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
let mut key1_entries = vec![];
while let Some(entry) = iter.next_entry().await.unwrap() {
if entry.key.as_ref() == b"key1" {
key1_entries.push(entry);
}
}
assert!(
!key1_entries.is_empty(),
"should have merge operations for key1"
);
assert!(key1_entries
.iter()
.all(|e| matches!(e.value, crate::types::ValueDeletable::Merge(_))));
if key1_entries.len() == 2 {
assert_ne!(
key1_entries[0].expire_ts, key1_entries[1].expire_ts,
"separate merge operations should have different expire times"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_should_compact_expired_entries() {
let os = Arc::new(InMemory::new());
let insert_clock = Arc::new(TestClock::new());
let compaction_scheduler = Arc::new(SizeTieredCompactionSchedulerSupplier::new(
SizeTieredCompactionSchedulerOptions {
min_compaction_sources: 2,
max_compaction_sources: 2,
include_size_threshold: 4.0,
},
));
let mut options = db_options(Some(compactor_options()));
options.default_ttl = Some(50);
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(insert_clock.clone())
.with_compaction_scheduler_supplier(compaction_scheduler)
.build()
.await
.unwrap();
let (_, _, table_store) = build_test_stores(os.clone());
let value = &[b'a'; 64];
insert_clock.ticker.store(0, atomic::Ordering::SeqCst);
db.put_with_options(
&[1; 16],
value,
&PutOptions {
ttl: Ttl::ExpireAfter(10),
},
&WriteOptions::default(),
)
.await
.unwrap();
insert_clock.ticker.store(10, atomic::Ordering::SeqCst);
db.put_with_options(
&[2; 16],
value,
&PutOptions { ttl: Ttl::Default },
&WriteOptions::default(),
)
.await
.unwrap();
db.flush().await.unwrap();
insert_clock.ticker.store(30, atomic::Ordering::SeqCst);
db.put_with_options(
&[3; 16],
value,
&PutOptions { ttl: Ttl::NoExpiry },
&WriteOptions::default(),
)
.await
.unwrap();
insert_clock.ticker.store(70, atomic::Ordering::SeqCst);
db.put_with_options(
&[1; 16],
value,
&PutOptions {
ttl: Ttl::ExpireAfter(80),
},
&WriteOptions::default(),
)
.await
.unwrap();
db.flush().await.unwrap();
let db_state = await_compaction(&db).await;
let db_state = db_state.expect("db was not compacted");
assert!(db_state.l0_last_compacted.is_some());
assert_eq!(db_state.compacted.len(), 1);
assert_eq!(db_state.last_l0_clock_tick, 70);
let compacted = &db_state.compacted.first().unwrap().ssts;
assert_eq!(compacted.len(), 1);
let handle = compacted.first().unwrap();
let mut iter = SstIterator::new_borrowed_initialized(
..,
handle,
table_store.clone(),
SstIteratorOptions::default(),
)
.await
.unwrap()
.expect("Expected Some(iter) but got None");
assert_iterator(
&mut iter,
vec![
RowEntry::new_value(&[1; 16], value, 4)
.with_create_ts(70)
.with_expire_ts(150),
RowEntry::new_value(&[3; 16], value, 3).with_create_ts(30),
],
)
.await;
}
#[tokio::test]
async fn test_compactor_saves_latest_compaction_on_restart() {
let os = Arc::new(InMemory::new());
let (manifest_store, compactions_store, _table_store) = build_test_stores(os);
let clock = Arc::new(DefaultSystemClock::new());
StoredManifest::create_new_db(manifest_store.clone(), CoreDbState::new(), clock.clone())
.await
.unwrap();
let compactor_options = Arc::new(compactor_options());
let rand = Arc::new(DbRand::default());
let mut handler = CompactorEventHandler::new(
manifest_store.clone(),
compactions_store.clone(),
compactor_options.clone(),
Arc::new(MockScheduler::new()),
Arc::new(MockExecutor::new()),
rand.clone(),
Arc::new(CompactionStats::new(Arc::new(StatRegistry::new()))),
clock.clone(),
)
.await
.unwrap();
let first_epoch = handler.state().manifest().value.compactor_epoch;
let compaction_id = Ulid::new();
let compaction = Compaction::new(
compaction_id,
CompactionSpec::new(vec![SourceId::Sst(Ulid::new())], 0),
);
handler.state_mut().add_compaction(compaction).unwrap();
handler
.state_writer
.write_compactions_safely()
.await
.unwrap();
let (_, persisted) = compactions_store
.try_read_latest_compactions()
.await
.unwrap()
.expect("compactions should exist after first write");
assert!(
persisted.iter().next().is_some(),
"expected stored compactions to include the seeded compaction"
);
drop(handler);
let mut handler = CompactorEventHandler::new(
manifest_store.clone(),
compactions_store.clone(),
compactor_options.clone(),
Arc::new(MockScheduler::new()),
Arc::new(MockExecutor::new()),
rand,
Arc::new(CompactionStats::new(Arc::new(StatRegistry::new()))),
clock.clone(),
)
.await
.unwrap();
assert!(handler.state().compactions().next().is_none());
assert!(
handler.state().manifest().value.compactor_epoch > first_epoch,
"compactor epoch should advance on restart"
);
handler
.state_writer
.write_compactions_safely()
.await
.unwrap();
let (_, persisted_after_trim) = compactions_store
.try_read_latest_compactions()
.await
.unwrap()
.expect("compactions should exist after clearing");
let mut persisted_after_trim_iter = persisted_after_trim.iter();
assert_eq!(
persisted_after_trim_iter
.next()
.expect("expected one retained compaction after restart")
.id(),
compaction_id,
"stored compactions should retain the latest finished compaction after restart"
);
assert!(
persisted_after_trim_iter.next().is_none(),
"no additional compactions should be retained beyond the latest finished one"
);
}
#[tokio::test]
async fn test_compactor_starts_with_empty_compactions() {
let os = Arc::new(InMemory::new());
let (manifest_store, compactions_store, _table_store) = build_test_stores(os);
let clock = Arc::new(DefaultSystemClock::new());
StoredManifest::create_new_db(manifest_store.clone(), CoreDbState::new(), clock.clone())
.await
.unwrap();
let compactor_options = Arc::new(compactor_options());
let rand = Arc::new(DbRand::default());
let mut handler = CompactorEventHandler::new(
manifest_store.clone(),
compactions_store.clone(),
compactor_options.clone(),
Arc::new(MockScheduler::new()),
Arc::new(MockExecutor::new()),
rand.clone(),
Arc::new(CompactionStats::new(Arc::new(StatRegistry::new()))),
clock.clone(),
)
.await
.unwrap();
let first_epoch = handler.state().manifest().value.compactor_epoch;
let compaction_id = Ulid::new();
let compaction = Compaction::new(
compaction_id,
CompactionSpec::new(vec![SourceId::Sst(Ulid::new())], 0),
);
handler.state_mut().add_compaction(compaction).unwrap();
handler
.state_writer
.write_compactions_safely()
.await
.unwrap();
let (_, persisted) = compactions_store
.try_read_latest_compactions()
.await
.unwrap()
.expect("compactions should exist after first write");
let mut persisted_iter = persisted.iter();
assert!(
persisted_iter.next().is_some(),
"expected stored compactions to include the seeded compaction"
);
assert!(
persisted_iter.next().is_none(),
"expected only one stored compaction"
);
drop(handler);
let mut handler = CompactorEventHandler::new(
manifest_store.clone(),
compactions_store.clone(),
compactor_options.clone(),
Arc::new(MockScheduler::new()),
Arc::new(MockExecutor::new()),
rand,
Arc::new(CompactionStats::new(Arc::new(StatRegistry::new()))),
clock.clone(),
)
.await
.unwrap();
assert!(handler.state().compactions().next().is_none());
assert!(
handler.state().manifest().value.compactor_epoch > first_epoch,
"compactor epoch should advance on restart"
);
handler
.state_writer
.write_compactions_safely()
.await
.unwrap();
let compactions = handler.state().compactions_dirty().clone().into_value();
let mut all_compactions = compactions.iter();
let finished_compaction = all_compactions
.next()
.expect("expected one retained compaction after restart");
assert_eq!(
finished_compaction.id(),
compaction_id,
"stored compactions should retain the latest finished compaction after restart"
);
assert_eq!(
finished_compaction.status(),
CompactionStatus::Finished,
"retained compaction should be marked as finished"
);
assert!(
all_compactions.next().is_none(),
"no additional compactions should be retained beyond the latest finished one"
);
}
#[tokio::test]
async fn test_should_track_total_bytes_and_throughput() {
use crate::compactor::stats::{
TOTAL_BYTES_BEING_COMPACTED, TOTAL_THROUGHPUT_BYTES_PER_SEC,
};
use chrono::DateTime;
let mut fixture = CompactorEventHandlerTestFixture::new().await;
let current_time = fixture.handler.system_clock.now();
let current_time_ms = current_time.timestamp_millis() as u64;
let start_time_1 =
DateTime::from_timestamp_millis((current_time_ms - 2000) as i64).unwrap();
let start_time_2 =
DateTime::from_timestamp_millis((current_time_ms - 1000) as i64).unwrap();
let mut compaction_1 = Compaction::new(
Ulid::from_parts(start_time_1.timestamp_millis() as u64, 0),
CompactionSpec::new(vec![], 10),
);
compaction_1.set_bytes_processed(500);
let mut compaction_2 = Compaction::new(
Ulid::from_parts(start_time_2.timestamp_millis() as u64, 0),
CompactionSpec::new(vec![], 11),
);
compaction_2.set_bytes_processed(1000);
fixture
.handler
.state_mut()
.add_compaction(compaction_1)
.expect("failed to add compaction 1");
fixture
.handler
.state_mut()
.add_compaction(compaction_2)
.expect("failed to add compaction 2");
fixture.handler.handle_log_ticker();
let total_bytes = fixture
.stats_registry
.lookup(TOTAL_BYTES_BEING_COMPACTED)
.unwrap()
.get();
assert_eq!(total_bytes, 0);
let throughput = fixture
.stats_registry
.lookup(TOTAL_THROUGHPUT_BYTES_PER_SEC)
.unwrap()
.get();
assert!(
throughput > 0,
"Expected throughput > 0, got {}",
throughput
);
}
#[tokio::test]
async fn test_should_track_per_job_throughput() {
let start_time_ms = 1000u64;
let current_time_ms = 3000u64;
let processed_bytes = 1000u64;
let mut compaction = Compaction::new(
Ulid::from_parts(start_time_ms, 0),
CompactionSpec::new(vec![], 10),
);
compaction.set_bytes_processed(processed_bytes);
let elapsed_secs = (current_time_ms as f64 - start_time_ms as f64) / 1000.0;
let throughput = processed_bytes as f64 / elapsed_secs;
assert_eq!(throughput, 500.0);
let elapsed_zero = (start_time_ms as f64 - start_time_ms as f64) / 1000.0;
let throughput_zero = if elapsed_zero > 0.0 {
processed_bytes as f64 / elapsed_zero
} else {
0.0
};
assert_eq!(throughput_zero, 0.0);
}
#[tokio::test]
async fn test_should_track_running_compactions_count() {
use crate::compactor::stats::RUNNING_COMPACTIONS;
let mut fixture = CompactorEventHandlerTestFixture::new().await;
assert_eq!(
fixture
.stats_registry
.lookup(RUNNING_COMPACTIONS)
.unwrap()
.get(),
0
);
let compaction = Compaction::new(Ulid::new(), CompactionSpec::new(vec![], 10));
fixture
.handler
.state_mut()
.add_compaction(compaction)
.expect("failed to add compaction");
assert_eq!(fixture.handler.state().compactions().count(), 1);
}
struct CompactorEventHandlerTestFixture {
manifest: StoredManifest,
manifest_store: Arc<ManifestStore>,
compactions_store: Arc<CompactionsStore>,
options: Settings,
db: Db,
scheduler: Arc<MockScheduler>,
executor: Arc<MockExecutor>,
real_executor: Arc<dyn CompactionExecutor>,
real_executor_rx: tokio::sync::mpsc::UnboundedReceiver<CompactorMessage>,
stats_registry: Arc<StatRegistry>,
handler: CompactorEventHandler,
}
impl CompactorEventHandlerTestFixture {
async fn new() -> Self {
let compactor_options = Arc::new(compactor_options());
let options = db_options(None);
let os = Arc::new(InMemory::new());
let (manifest_store, compactions_store, table_store) = build_test_stores(os.clone());
let db = Db::builder(PATH, os.clone())
.with_settings(options.clone())
.build()
.await
.unwrap();
let scheduler = Arc::new(MockScheduler::new());
let executor = Arc::new(MockExecutor::new());
let (real_executor_tx, real_executor_rx) = tokio::sync::mpsc::unbounded_channel();
let rand = Arc::new(DbRand::default());
let stats_registry = Arc::new(StatRegistry::new());
let compactor_stats = Arc::new(CompactionStats::new(stats_registry.clone()));
let real_executor = Arc::new(TokioCompactionExecutor::new(
Handle::current(),
compactor_options.clone(),
real_executor_tx,
table_store,
rand.clone(),
compactor_stats.clone(),
Arc::new(DefaultSystemClock::new()),
manifest_store.clone(),
options.merge_operator.clone(),
));
let handler = CompactorEventHandler::new(
manifest_store.clone(),
compactions_store.clone(),
compactor_options.clone(),
scheduler.clone(),
executor.clone(),
rand.clone(),
compactor_stats.clone(),
Arc::new(DefaultSystemClock::new()),
)
.await
.unwrap();
let manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
Self {
manifest,
manifest_store,
compactions_store,
options,
db,
scheduler,
executor,
real_executor_rx,
real_executor,
stats_registry,
handler,
}
}
async fn latest_db_state(&mut self) -> CoreDbState {
self.manifest.refresh().await.unwrap().core.clone()
}
async fn write_l0(&mut self) {
let mut rng = rng::new_test_rng(None);
let manifest = self.manifest.refresh().await.unwrap();
let l0s = manifest.core.l0.len();
let mut k = vec![0u8; self.options.l0_sst_size_bytes];
rng.fill_bytes(&mut k);
self.db.put(&k, &[b'x'; 10]).await.unwrap();
self.db.flush().await.unwrap();
loop {
let manifest = self.manifest.refresh().await.unwrap().clone();
if manifest.core.l0.len() > l0s {
break;
}
}
}
async fn build_l0_compaction(&mut self) -> CompactionSpec {
let db_state = self.latest_db_state().await;
let l0_ids_to_compact: Vec<SourceId> = db_state
.l0
.iter()
.map(|h| SourceId::Sst(h.id.unwrap_compacted_id()))
.collect();
CompactionSpec::new(l0_ids_to_compact, 0)
}
fn assert_started_compaction(&self, num: usize) -> Vec<StartCompactionJobArgs> {
let attempts = self.executor.pop_jobs();
assert_eq!(num, attempts.len());
attempts
}
fn assert_and_forward_compactions(&self, num: usize) {
for c in self.assert_started_compaction(num) {
self.real_executor.start_compaction_job(c)
}
}
}
#[tokio::test]
async fn test_should_record_last_compaction_ts() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Some(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Some(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
None => panic!("channel closed before receiving any message"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
let starting_last_ts = fixture
.stats_registry
.lookup(LAST_COMPACTION_TS_SEC)
.unwrap()
.get();
fixture
.handler
.handle(msg)
.await
.expect("fatal error handling compaction message");
assert!(
fixture
.stats_registry
.lookup(LAST_COMPACTION_TS_SEC)
.unwrap()
.get()
> starting_last_ts
);
}
#[tokio::test]
async fn test_should_write_manifest_safely() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Some(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Some(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
None => panic!("channel closed before receiving any message"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
fixture.write_l0().await;
fixture
.handler
.handle(msg)
.await
.expect("fatal error handling compaction message");
let db_state = fixture.latest_db_state().await;
assert_eq!(db_state.l0.len(), 1);
assert_eq!(db_state.compacted.len(), 1);
let l0_id = db_state.l0.front().unwrap().id.unwrap_compacted_id();
let compacted_l0s: Vec<Ulid> = db_state
.compacted
.first()
.unwrap()
.ssts
.iter()
.map(|sst| sst.id.unwrap_compacted_id())
.collect();
assert!(!compacted_l0s.contains(&l0_id));
assert_eq!(
db_state.l0_last_compacted.unwrap(),
compaction
.sources()
.first()
.and_then(|id| id.maybe_unwrap_sst())
.unwrap()
);
}
#[tokio::test]
async fn test_should_clear_compaction_on_failure_and_retry() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
let job = fixture.assert_started_compaction(1).pop().unwrap();
let msg = CompactorMessage::CompactionJobFinished {
id: job.id,
result: Err(SlateDBError::InvalidDBState),
};
fixture
.handler
.handle(msg)
.await
.expect("fatal error handling compaction message");
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_started_compaction(1);
}
#[tokio::test]
async fn test_should_persist_compactions_on_start_and_finish() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
let (_, stored_compactions) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
assert_eq!(
stored_compactions
.iter()
.collect::<Vec<&Compaction>>()
.len(),
1
);
let running_id = stored_compactions
.iter()
.next()
.expect("compaction should be persisted")
.id();
let state_id = fixture
.handler
.state()
.compactions()
.next()
.expect("state missing compaction")
.id();
assert_eq!(running_id, state_id);
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Some(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Some(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
None => panic!("channel closed before receiving any message"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
fixture.handler.handle(msg).await.unwrap();
let (_, stored_compactions) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
let mut stored_compactions_iter = stored_compactions.iter();
assert_eq!(
stored_compactions_iter
.next()
.expect("compactions should not be empty after finish")
.id(),
running_id,
);
assert!(
stored_compactions_iter.next().is_none(),
"expected only one retained finished compaction for GC"
);
}
#[tokio::test]
async fn test_should_fail_when_compactions_store_fences_compactor() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
let stored_compactions = StoredCompactions::load(fixture.compactions_store.clone())
.await
.unwrap();
FenceableCompactions::init(
stored_compactions,
fixture.handler.options.manifest_update_timeout,
fixture.handler.system_clock.clone(),
)
.await
.unwrap();
let result = fixture.handler.handle_ticker().await;
assert!(matches!(result, Err(SlateDBError::Fenced)));
}
#[tokio::test]
async fn test_should_update_failed_compaction_status() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
let job = fixture.assert_started_compaction(1).pop().unwrap();
let (_, stored) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
assert_eq!(stored.iter().collect::<Vec<&Compaction>>().len(), 1);
let msg = CompactorMessage::CompactionJobFinished {
id: job.id,
result: Err(SlateDBError::InvalidDBState),
};
fixture.handler.handle(msg).await.unwrap();
let (_, stored_after) = fixture
.compactions_store
.read_latest_compactions()
.await
.unwrap();
assert_eq!(
stored_after
.iter()
.next()
.expect("compactions should be empty after failure")
.status(),
CompactionStatus::Submitted,
"compactions should be removed after failure"
);
}
#[tokio::test]
async fn test_should_error_when_finishing_if_compactions_fenced() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
let job = fixture.assert_started_compaction(1).pop().unwrap();
let stored_compactions = StoredCompactions::load(fixture.compactions_store.clone())
.await
.unwrap();
FenceableCompactions::init(
stored_compactions,
fixture.handler.options.manifest_update_timeout,
fixture.handler.system_clock.clone(),
)
.await
.unwrap();
let db_state = fixture.latest_db_state().await;
let output_sr = SortedRun {
id: compaction.destination(),
ssts: db_state.l0.iter().cloned().collect(),
};
let msg = CompactorMessage::CompactionJobFinished {
id: job.id,
result: Ok(output_sr),
};
let result = fixture.handler.handle(msg).await;
assert!(matches!(result, Err(SlateDBError::Fenced)));
}
#[tokio::test]
async fn test_should_not_schedule_conflicting_compaction() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_started_compaction(1);
fixture.write_l0().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
assert_eq!(0, fixture.executor.pop_jobs().len())
}
#[tokio::test]
async fn test_should_leave_checkpoint_when_removing_ssts_after_compaction() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let compaction = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(compaction.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Some(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Some(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
None => panic!("channel closed before receiving any message"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
fixture
.handler
.handle(msg)
.await
.expect("fatal error handling compaction message");
let current_dbstate = fixture.latest_db_state().await;
let checkpoint = current_dbstate.checkpoints.last().unwrap();
let old_manifest = fixture
.manifest_store
.read_manifest(checkpoint.manifest_id)
.await
.unwrap();
let l0_ids: Vec<SourceId> = old_manifest
.core
.l0
.iter()
.map(|sst| SourceId::Sst(sst.id.unwrap_compacted_id()))
.collect();
assert_eq!(&l0_ids, compaction.sources());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[cfg(feature = "zstd")]
async fn test_compactor_compressed_block_size() {
use crate::compactor::stats::BYTES_COMPACTED;
use crate::config::{CompressionCodec, SstBlockSize};
let os = Arc::new(InMemory::new());
let logical_clock = Arc::new(TestClock::new());
let compaction_scheduler = Arc::new(SizeTieredCompactionSchedulerSupplier::new(
SizeTieredCompactionSchedulerOptions {
min_compaction_sources: 1,
max_compaction_sources: 999,
include_size_threshold: 4.0,
},
));
let mut options = db_options(Some(compactor_options()));
options.l0_sst_size_bytes = 128;
options.compression_codec = Some(CompressionCodec::Zstd);
let db = Db::builder(PATH, os.clone())
.with_settings(options)
.with_logical_clock(logical_clock)
.with_compaction_scheduler_supplier(compaction_scheduler)
.with_sst_block_size(SstBlockSize::Other(128))
.build()
.await
.unwrap();
for i in 0..4 {
let k = vec![b'a' + i as u8; 16];
let v = vec![b'b' + i as u8; 48];
db.put(&k, &v).await.unwrap();
let k = vec![b'j' + i as u8; 16];
let v = vec![b'k' + i as u8; 48];
db.put(&k, &v).await.unwrap();
}
db.flush().await.unwrap();
await_compaction(&db).await.expect("db was not compacted");
let metrics = db.metrics();
let bytes_compacted = metrics.lookup(BYTES_COMPACTED).unwrap().get();
assert!(bytes_compacted > 0, "bytes_compacted: {}", bytes_compacted);
}
#[tokio::test]
async fn test_validate_compaction_empty_sources_rejected() {
let fixture = CompactorEventHandlerTestFixture::new().await;
let c = CompactionSpec::new(Vec::new(), 0);
let err = fixture.handler.validate_compaction(&c).unwrap_err();
assert!(matches!(err, SlateDBError::InvalidCompaction));
}
#[tokio::test]
async fn test_validate_compaction_l0_only_ok_when_no_sr() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let c = fixture.build_l0_compaction().await;
fixture.handler.validate_compaction(&c).unwrap();
}
#[tokio::test]
async fn test_validate_compaction_l0_only_rejects_when_dest_below_highest_sr() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let c1 = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(c1.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Some(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Some(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
None => panic!("channel closed before receiving any message"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
fixture.handler.handle(msg).await.unwrap();
fixture.write_l0().await;
let c2 = fixture.build_l0_compaction().await; let err = fixture.handler.validate_compaction(&c2).unwrap_err();
assert!(matches!(err, SlateDBError::InvalidCompaction));
}
#[tokio::test]
async fn test_validate_compaction_mixed_l0_and_sr_deferred_to_scheduler() {
let mut fixture = CompactorEventHandlerTestFixture::new().await;
fixture.write_l0().await;
let c1 = fixture.build_l0_compaction().await;
fixture.scheduler.inject_compaction(c1.clone());
fixture.handler.handle_ticker().await.unwrap();
fixture.assert_and_forward_compactions(1);
let msg = tokio::time::timeout(Duration::from_millis(10), async {
match fixture.real_executor_rx.recv().await {
Some(m @ CompactorMessage::CompactionJobFinished { .. }) => m,
Some(_) => fixture
.real_executor_rx
.recv()
.await
.expect("channel closed before CompactionJobAttemptFinished"),
None => panic!("channel closed before receiving any message"),
}
})
.await
.expect("timeout waiting for CompactionJobAttemptFinished");
fixture.handler.handle(msg).await.unwrap();
fixture.write_l0().await;
let state = fixture.latest_db_state().await;
let sr_id = state.compacted.first().unwrap().id;
let l0_ulid = state.l0.front().unwrap().id.unwrap_compacted_id();
let mixed = CompactionSpec::new(
vec![SourceId::SortedRun(sr_id), SourceId::Sst(l0_ulid)],
sr_id,
);
fixture.handler.validate_compaction(&mixed).unwrap();
}
async fn run_for<T, F>(duration: Duration, f: impl Fn() -> F) -> Option<T>
where
F: Future<Output = Option<T>>,
{
#[allow(clippy::disallowed_methods)]
let now = SystemTime::now();
while now.elapsed().unwrap() < duration {
let maybe_result = f().await;
if maybe_result.is_some() {
return maybe_result;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
None
}
fn build_test_stores(
os: Arc<dyn ObjectStore>,
) -> (Arc<ManifestStore>, Arc<CompactionsStore>, Arc<TableStore>) {
let sst_format = SsTableFormat {
block_size: 32,
min_filter_keys: 10,
..SsTableFormat::default()
};
let manifest_store = Arc::new(ManifestStore::new(&Path::from(PATH), os.clone()));
let compactions_store = Arc::new(CompactionsStore::new(&Path::from(PATH), os.clone()));
let table_store = Arc::new(TableStore::new(
ObjectStores::new(os.clone(), None),
sst_format,
Path::from(PATH),
None,
));
(manifest_store, compactions_store, table_store)
}
async fn await_compaction(db: &Db) -> Option<CoreDbState> {
run_for(Duration::from_secs(10), || async {
let (empty_wal, empty_memtable, core_db_state) = {
let db_state = db.inner.state.read();
let cow_db_state = db_state.state();
(
db.inner.wal_buffer.is_empty(),
db_state.memtable().is_empty() && cow_db_state.imm_memtable.is_empty(),
db_state.state().core().clone(),
)
};
let empty_l0 = core_db_state.l0.is_empty();
let compaction_ran = !core_db_state.compacted.is_empty();
if empty_wal && empty_memtable && empty_l0 && compaction_ran {
return Some(core_db_state);
}
None
})
.await
}
#[allow(unused)] async fn await_compacted_compaction(
manifest_store: Arc<ManifestStore>,
old_compacted: Vec<SortedRun>,
) -> Option<CoreDbState> {
run_for(Duration::from_secs(10), || async {
let db_state = get_db_state(manifest_store.clone()).await;
if !db_state.compacted.eq(&old_compacted) {
return Some(db_state);
}
None
})
.await
}
async fn get_db_state(manifest_store: Arc<ManifestStore>) -> CoreDbState {
let stored_manifest =
StoredManifest::load(manifest_store.clone(), Arc::new(DefaultSystemClock::new()))
.await
.unwrap();
stored_manifest.db_state().clone()
}
fn db_options(compactor_options: Option<CompactorOptions>) -> Settings {
Settings {
flush_interval: Some(Duration::from_millis(100)),
#[cfg(feature = "wal_disable")]
wal_enabled: true,
manifest_poll_interval: Duration::from_millis(100),
manifest_update_timeout: Duration::from_secs(300),
l0_sst_size_bytes: 256,
l0_max_ssts: 8,
compactor_options,
..Settings::default()
}
}
fn compactor_options() -> CompactorOptions {
CompactorOptions {
poll_interval: Duration::from_millis(100),
max_concurrent_compactions: 1,
..CompactorOptions::default()
}
}
struct MockSchedulerInner {
compaction: Vec<CompactionSpec>,
}
#[derive(Clone)]
struct MockScheduler {
inner: Arc<Mutex<MockSchedulerInner>>,
}
impl MockScheduler {
fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(MockSchedulerInner { compaction: vec![] })),
}
}
fn inject_compaction(&self, compaction: CompactionSpec) {
let mut inner = self.inner.lock();
inner.compaction.push(compaction);
}
}
impl CompactionScheduler for MockScheduler {
fn maybe_schedule_compaction(&self, _state: &CompactorState) -> Vec<CompactionSpec> {
let mut inner = self.inner.lock();
std::mem::take(&mut inner.compaction)
}
}
struct MockExecutorInner {
jobs: Vec<StartCompactionJobArgs>,
}
#[derive(Clone)]
struct MockExecutor {
inner: Arc<Mutex<MockExecutorInner>>,
}
impl MockExecutor {
fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(MockExecutorInner { jobs: vec![] })),
}
}
fn pop_jobs(&self) -> Vec<StartCompactionJobArgs> {
let mut guard = self.inner.lock();
std::mem::take(&mut guard.jobs)
}
}
impl CompactionExecutor for MockExecutor {
fn start_compaction_job(&self, compaction: StartCompactionJobArgs) {
let mut guard = self.inner.lock();
guard.jobs.push(compaction);
}
fn stop(&self) {}
fn is_stopped(&self) -> bool {
false
}
}
}