use crate::compaction::CompactionConfig;
use crate::data_file::{DataFile, DataFileType};
use crate::db_status::DbLifecycle;
use crate::error::Result;
use crate::file::{FileManager, ReadAheadBufferedReader, TrackedFileId};
use crate::format::{FileBuilder, FileBuilderFactory};
use crate::iterator::{
BucketFilterIterator, DeduplicatingIterator, KvIterator, MergingIterator,
SchemaEvolvingIterator, SortedRun, VlogSeqOffsetIterator,
};
use crate::lsm::{LevelEdit, VersionEdit};
use crate::schema::SchemaManager;
use crate::sst::{SSTIteratorMetrics, SSTIteratorOptions};
use crate::vlog::{VlogEdit, VlogMergeCollector};
use log::trace;
use metrics::{Counter, counter};
use std::sync::Arc;
use tokio::runtime::Runtime;
type CompactionCompleteCallback = Arc<dyn Fn(usize, VersionEdit, Option<VlogEdit>) + Send + Sync>;
pub struct CompactionTask {
lsm_tree_idx: usize,
metrics: Arc<CompactionTaskMetrics>,
sst_metrics: Arc<SSTIteratorMetrics>,
sorted_runs: Vec<SortedRun>,
output_level: u8,
file_manager: Arc<FileManager>,
file_builder_factory: Arc<FileBuilderFactory>,
data_file_type: DataFileType,
ttl_provider: Arc<crate::ttl::TTLProvider>,
output_files_readonly: bool,
schema_manager: Arc<SchemaManager>,
}
#[derive(Clone)]
pub(crate) struct CompactionTaskMetrics {
read_bytes_total: Counter,
write_bytes_total: Counter,
}
impl CompactionTaskMetrics {
pub(crate) fn new(db_id: &str) -> Self {
let db_id = db_id.to_string();
Self {
read_bytes_total: counter!("compaction_read_bytes_total", "db_id" => db_id.clone()),
write_bytes_total: counter!("compaction_write_bytes_total", "db_id" => db_id),
}
}
}
impl CompactionTask {
#[allow(clippy::too_many_arguments)]
pub fn new(
metrics: Arc<CompactionTaskMetrics>,
sst_metrics: Arc<SSTIteratorMetrics>,
lsm_tree_idx: usize,
sorted_runs: Vec<SortedRun>,
output_level: u8,
file_manager: Arc<FileManager>,
file_builder_factory: Arc<FileBuilderFactory>,
data_file_type: DataFileType,
ttl_provider: Arc<crate::ttl::TTLProvider>,
schema_manager: Arc<SchemaManager>,
) -> Self {
Self {
lsm_tree_idx,
metrics,
sst_metrics,
sorted_runs,
output_level,
file_manager,
file_builder_factory,
data_file_type,
ttl_provider,
output_files_readonly: false,
schema_manager,
}
}
pub fn with_readonly_outputs(mut self) -> Self {
self.output_files_readonly = true;
self
}
pub fn sorted_runs(&self) -> &[SortedRun] {
&self.sorted_runs
}
pub fn output_level(&self) -> u8 {
self.output_level
}
pub fn ttl_provider(&self) -> Arc<crate::ttl::TTLProvider> {
Arc::clone(&self.ttl_provider)
}
}
pub struct CompactionResult {
lsm_tree_idx: usize,
new_files: Vec<Arc<DataFile>>,
edit: VersionEdit,
vlog_edit: Option<VlogEdit>,
}
impl CompactionResult {
pub fn new(
lsm_tree_idx: usize,
new_files: Vec<Arc<DataFile>>,
edit: VersionEdit,
vlog_edit: Option<VlogEdit>,
) -> Self {
Self {
lsm_tree_idx,
new_files,
edit,
vlog_edit,
}
}
pub fn new_files(&self) -> &[Arc<DataFile>] {
&self.new_files
}
pub fn edit(&self) -> &VersionEdit {
&self.edit
}
pub fn vlog_edit(&self) -> Option<&VlogEdit> {
self.vlog_edit.as_ref()
}
}
pub struct CompactionExecutor {
runtime: Option<Arc<Runtime>>,
options: CompactionConfig,
db_lifecycle: Arc<DbLifecycle>,
}
impl CompactionExecutor {
pub fn new(options: CompactionConfig, db_lifecycle: Arc<DbLifecycle>) -> Result<Self> {
Self::new_with_runtime(
options,
Arc::new(
tokio::runtime::Builder::new_multi_thread()
.thread_name("cobble-compaction")
.worker_threads(options.max_threads.max(1))
.enable_all()
.build()
.map_err(|e| crate::error::Error::IoError(e.to_string()))?,
),
db_lifecycle,
)
}
pub fn new_with_runtime(
options: CompactionConfig,
runtime: Arc<Runtime>,
db_lifecycle: Arc<DbLifecycle>,
) -> Result<Self> {
Ok(Self {
runtime: Some(runtime),
options,
db_lifecycle,
})
}
pub fn block_on_handle<T>(
&self,
handle: tokio::task::JoinHandle<T>,
) -> std::result::Result<T, tokio::task::JoinError> {
let runtime = self.runtime.as_ref().expect("Executor has no runtime.");
runtime.block_on(handle)
}
pub fn new_without_runtime(options: CompactionConfig, db_lifecycle: Arc<DbLifecycle>) -> Self {
Self {
runtime: None,
options,
db_lifecycle,
}
}
pub fn with_defaults(db_lifecycle: Arc<DbLifecycle>) -> Result<Self> {
Self::new(CompactionConfig::default(), db_lifecycle)
}
pub fn execute(
&self,
task: CompactionTask,
on_complete: Option<CompactionCompleteCallback>,
) -> tokio::task::JoinHandle<Result<CompactionResult>> {
let runtime = self.runtime.as_ref().expect("Executor has no runtime.");
let options = self.options;
let db_lifecycle = Arc::clone(&self.db_lifecycle);
runtime.spawn_blocking(move || {
let result = Self::run_compaction(task, options);
match result {
Ok(result) => {
if let Some(callback) = on_complete {
callback(
result.lsm_tree_idx,
result.edit.clone(),
result.vlog_edit.clone(),
);
}
Ok(result)
}
Err(err) => {
db_lifecycle.mark_error(err.clone());
Err(err)
}
}
})
}
pub fn execute_blocking(
&self,
task: CompactionTask,
on_complete: Option<CompactionCompleteCallback>,
) -> Result<CompactionResult> {
let db_lifecycle = Arc::clone(&self.db_lifecycle);
let result = Self::run_compaction(task, self.options);
let result = match result {
Ok(result) => result,
Err(err) => {
db_lifecycle.mark_error(err.clone());
return Err(err);
}
};
if let Some(callback) = on_complete {
callback(
result.lsm_tree_idx,
result.edit.clone(),
result.vlog_edit.clone(),
);
}
Ok(result)
}
pub fn shutdown(&mut self) {
if let Some(runtime) = self.runtime.take()
&& let Ok(runtime) = Arc::try_unwrap(runtime)
{
runtime.shutdown_timeout(std::time::Duration::from_secs(5));
}
}
fn run_compaction(task: CompactionTask, options: CompactionConfig) -> Result<CompactionResult> {
let mut all_iters: Vec<Box<dyn for<'a> KvIterator<'a>>> = Vec::new();
let mut read_bytes = 0u64;
let use_read_ahead =
options.read_ahead_enabled && tokio::runtime::Handle::try_current().is_ok();
let target_schema = task.schema_manager.latest_schema();
let num_columns = target_schema.num_columns();
for run in &task.sorted_runs {
for file in run.files() {
read_bytes = read_bytes.saturating_add(file.size as u64);
}
let file_manager = Arc::clone(&task.file_manager);
let sst_metrics = Arc::clone(&task.sst_metrics);
let schema_manager = Arc::clone(&task.schema_manager);
let target_schema = Arc::clone(&target_schema);
let run_iter = run.iter(move |file| {
let source_schema = schema_manager.schema(file.schema_id)?;
let reader = file_manager.open_data_file_reader(file.file_id)?;
let reader: Box<dyn crate::file::RandomAccessFile> = if use_read_ahead {
Box::new(ReadAheadBufferedReader::new(
reader,
options.read_buffer_size,
))
} else {
Box::new(reader)
};
let base_iter: Box<dyn for<'a> KvIterator<'a>> = match file.file_type {
DataFileType::SSTable => {
let sst_options = SSTIteratorOptions {
metrics: Some(Arc::clone(&sst_metrics)),
num_columns: source_schema.num_columns(),
bloom_filter_enabled: options.bloom_filter_enabled,
..SSTIteratorOptions::default()
};
let iter = crate::sst::SSTIterator::with_cache_and_file(
reader,
file,
sst_options,
None,
)?;
if file.needs_bucket_filter() {
Box::new(BucketFilterIterator::new(
iter,
file.effective_bucket_range.clone(),
))
} else {
Box::new(iter)
}
}
DataFileType::Parquet => {
let iter =
crate::parquet::ParquetIterator::from_data_file(reader, file, None)?;
if file.needs_bucket_filter() {
Box::new(BucketFilterIterator::new(
iter,
file.effective_bucket_range.clone(),
))
} else {
Box::new(iter)
}
}
};
let iter: Box<dyn for<'a> KvIterator<'a>> =
if file.schema_id == target_schema.version() {
base_iter
} else {
Box::new(SchemaEvolvingIterator::new(
base_iter,
Arc::clone(&source_schema),
Arc::clone(&target_schema),
Arc::clone(&schema_manager),
))
};
if file.vlog_file_seq_offset == 0 {
Ok(iter)
} else {
Ok(Box::new(VlogSeqOffsetIterator::new(
iter,
target_schema.num_columns(),
file.vlog_file_seq_offset,
)))
}
});
all_iters.push(Box::new(run_iter));
}
task.metrics.read_bytes_total.increment(read_bytes);
let merging_iter = MergingIterator::new(all_iters);
let input_has_separated_values = task
.sorted_runs
.iter()
.flat_map(|run| run.files().iter())
.any(|file| file.has_separated_values());
let merge_collector = input_has_separated_values.then(|| VlogMergeCollector::shared(true));
let merge_callback = merge_collector.as_ref().map(VlogMergeCollector::callback);
let mut dedup_iter = DeduplicatingIterator::new(
merging_iter,
num_columns,
task.ttl_provider(),
merge_callback,
Arc::clone(&target_schema),
);
dedup_iter.seek_to_first()?;
let mut output_files: Vec<Arc<DataFile>> = Vec::new();
let mut written_bytes = 0u64;
let mut current_builder: Option<Box<dyn FileBuilder>> = None;
let mut current_file_id: Option<u64> = None;
while dedup_iter.valid() {
if let Some(collector) = merge_collector.as_ref() {
collector.borrow_mut().check_error()?;
}
let (key, kv_value) = match dedup_iter.take_current()? {
Some(kv) => kv,
None => break,
};
if current_builder.is_none() {
let (file_id, writer) = if task.output_files_readonly {
task.file_manager.create_data_file()?
} else {
task.file_manager.create_data_file_with_offload()?
};
current_file_id = Some(file_id);
current_builder = Some((task.file_builder_factory)(Box::new(writer)));
}
if let Some(ref mut builder) = current_builder {
builder.add(&key, &kv_value)?;
if builder.offset() >= options.target_file_size {
let file_id = current_file_id.take().unwrap();
let builder = current_builder.take().unwrap();
let (first_key, last_key, file_size, footer_bytes) = builder.finish()?;
let bucket_range = DataFile::bucket_range_from_keys(&first_key, &last_key);
trace!(
"compaction output file level={} file_id={} size={}",
task.output_level, file_id, file_size
);
let data_file = DataFile::new(
task.data_file_type,
first_key,
last_key,
file_id,
TrackedFileId::new(&task.file_manager, file_id),
target_schema.version(),
file_size,
bucket_range.clone(),
bucket_range,
)
.with_separated_values(
merge_collector
.as_ref()
.is_some_and(|collector| collector.borrow().has_separated_values()),
);
data_file.set_meta_bytes(footer_bytes);
output_files.push(Arc::new(data_file));
written_bytes = written_bytes.saturating_add(file_size as u64);
if let Some(collector) = &merge_collector {
collector.borrow_mut().reset_has_separated_values();
}
}
}
dedup_iter.next()?;
}
if let Some(collector) = merge_collector.as_ref() {
collector.borrow_mut().check_error()?;
}
if let Some(builder) = current_builder
&& !builder.is_empty()
{
let file_id = current_file_id.take().unwrap();
let (first_key, last_key, file_size, footer_bytes) = builder.finish()?;
let bucket_range = DataFile::bucket_range_from_keys(&first_key, &last_key);
trace!(
"compaction output file level={} file_id={} size={}",
task.output_level, file_id, file_size
);
let data_file = DataFile::new(
task.data_file_type,
first_key,
last_key,
file_id,
TrackedFileId::new(&task.file_manager, file_id),
target_schema.version(),
file_size,
bucket_range.clone(),
bucket_range,
)
.with_separated_values(
merge_collector
.as_ref()
.is_some_and(|collector| collector.borrow().has_separated_values()),
);
data_file.set_meta_bytes(footer_bytes);
output_files.push(Arc::new(data_file));
written_bytes = written_bytes.saturating_add(file_size as u64);
}
task.metrics.write_bytes_total.increment(written_bytes);
let mut level_edits: std::collections::BTreeMap<u8, LevelEdit> =
std::collections::BTreeMap::new();
for run in &task.sorted_runs {
let entry = level_edits.entry(run.level()).or_insert_with(|| LevelEdit {
level: run.level(),
removed_files: Vec::new(),
new_files: Vec::new(),
});
entry.removed_files.extend(run.files().iter().cloned());
}
let entry = level_edits
.entry(task.output_level)
.or_insert_with(|| LevelEdit {
level: task.output_level,
removed_files: Vec::new(),
new_files: Vec::new(),
});
entry.new_files = output_files.clone();
let edit = VersionEdit {
level_edits: level_edits.into_values().collect(),
};
let mut vlog_edit = VlogEdit::default();
for (file_seq, delta) in merge_collector
.as_ref()
.map(|collector| collector.borrow().removed_entry_deltas())
.unwrap_or_default()
{
vlog_edit.add_entry_delta(file_seq, delta);
}
let vlog_edit = (!vlog_edit.is_empty()).then_some(vlog_edit);
let output_bytes = output_files.iter().map(|file| file.size).sum::<usize>();
trace!(
"compaction complete output_level={} input_files={} input_bytes={} output_files={} output_bytes={}",
task.output_level,
task.sorted_runs
.iter()
.map(|run| run.files().len())
.sum::<usize>(),
task.sorted_runs
.iter()
.flat_map(|run| run.files().iter())
.map(|file| file.size)
.sum::<usize>(),
output_files.len(),
output_bytes
);
if task.output_files_readonly {
for file in &output_files {
task.file_manager.make_data_file_readonly(file.file_id)?;
}
}
Ok(CompactionResult::new(
task.lsm_tree_idx,
output_files,
edit,
vlog_edit,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::file::{FileSystemRegistry, TrackedFileId};
use crate::metrics_manager::MetricsManager;
use crate::parquet::ParquetWriterOptions;
use crate::parquet::{ParquetIterator, ParquetWriter};
use crate::schema::Schema;
use crate::sst::row_codec::encode_value;
use crate::sst::{SSTWriter, SSTWriterOptions};
use crate::r#type::Value;
use crate::r#type::{Column, ValueType, decode_merge_separated_array};
use crate::writer_options::WriterOptions;
fn make_value_bytes(data: &[u8], num_columns: usize) -> Vec<u8> {
let value = Value::new(vec![Some(Column::new(ValueType::Put, data.to_vec()))]);
encode_value(&value, num_columns).to_vec()
}
fn make_typed_value_bytes(value_type: ValueType, data: &[u8], num_columns: usize) -> Vec<u8> {
let value = Value::new(vec![Some(Column::new(value_type, data.to_vec()))]);
encode_value(&value, num_columns).to_vec()
}
fn schema_manager_for(num_columns: usize) -> Arc<SchemaManager> {
Arc::new(SchemaManager::from_schemas(
vec![Schema::new(0, num_columns, Vec::new())],
num_columns,
))
}
fn cleanup_test_dir(path: &str) {
let _ = std::fs::remove_dir_all(path);
}
fn create_test_sst(
file_manager: &Arc<FileManager>,
entries: Vec<(&[u8], &[u8])>,
) -> Result<Arc<DataFile>> {
let (file_id, writer_file) = file_manager.create_data_file_with_offload()?;
let mut writer = SSTWriter::new(
writer_file,
SSTWriterOptions {
bloom_filter_enabled: true,
..SSTWriterOptions::default()
},
);
for (key, value) in entries {
writer.add(key, value)?;
}
let (first_key, last_key, file_size, footer_bytes) = writer.finish_with_range()?;
let bucket_range = DataFile::bucket_range_from_keys(&first_key, &last_key);
let data_file = DataFile::new(
DataFileType::SSTable,
first_key,
last_key,
file_id,
TrackedFileId::new(file_manager, file_id),
0,
file_size,
bucket_range.clone(),
bucket_range,
)
.with_separated_values(true);
data_file.set_meta_bytes(footer_bytes);
Ok(Arc::new(data_file))
}
fn create_test_parquet(
file_manager: &Arc<FileManager>,
entries: Vec<(&[u8], &[u8])>,
) -> Result<Arc<DataFile>> {
let (file_id, writer_file) = file_manager.create_data_file_with_offload()?;
let mut writer = ParquetWriter::with_options(
writer_file,
ParquetWriterOptions {
num_columns: 1,
..ParquetWriterOptions::default()
},
)?;
for (key, value) in entries {
writer.add(key, value)?;
}
let (first_key, last_key, file_size, meta_bytes) = writer.finish()?;
let bucket_range = DataFile::bucket_range_from_keys(&first_key, &last_key);
let data_file = DataFile::new(
DataFileType::Parquet,
first_key,
last_key,
file_id,
TrackedFileId::new(file_manager, file_id),
0,
file_size,
bucket_range.clone(),
bucket_range,
);
data_file.set_meta_bytes(meta_bytes);
Ok(Arc::new(data_file))
}
#[test]
#[serial_test::serial(file)]
fn test_compaction_basic() {
let test_dir = "/tmp/compaction_basic_test";
cleanup_test_dir(test_dir);
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register(format!("file://{}", test_dir))
.unwrap();
let metrics_manager = Arc::new(MetricsManager::new("compaction-test"));
let file_manager = Arc::new(
FileManager::with_defaults(Arc::clone(&fs), Arc::clone(&metrics_manager)).unwrap(),
);
let num_columns = 1;
let file1 = create_test_sst(
&file_manager,
vec![
(b"a", &make_value_bytes(b"v1", num_columns)),
(b"c", &make_value_bytes(b"v3", num_columns)),
(b"e", &make_value_bytes(b"v5", num_columns)),
],
)
.unwrap();
let file2 = create_test_sst(
&file_manager,
vec![
(b"b", &make_value_bytes(b"v2", num_columns)),
(b"d", &make_value_bytes(b"v4", num_columns)),
(b"f", &make_value_bytes(b"v6", num_columns)),
],
)
.unwrap();
let run1 = SortedRun::new(0, vec![file1]);
let run2 = SortedRun::new(1, vec![file2]);
let options = CompactionConfig {
num_columns,
target_file_size: 1024 * 1024, bloom_filter_enabled: true,
bloom_bits_per_key: 10,
..Default::default()
};
let factory = crate::compaction::make_sst_builder_factory(SSTWriterOptions {
metrics: None,
block_size: options.block_size,
buffer_size: options.buffer_size,
num_columns: options.num_columns,
bloom_filter_enabled: options.bloom_filter_enabled,
bloom_bits_per_key: options.bloom_bits_per_key,
partitioned_index: options.partitioned_index,
compression: crate::SstCompressionAlgorithm::None,
});
let compaction_metrics = Arc::new(CompactionTaskMetrics::new("test"));
let sst_metrics = Arc::new(crate::sst::SSTIteratorMetrics::new("test"));
let task = CompactionTask::new(
compaction_metrics,
sst_metrics,
0,
vec![run1, run2],
1,
Arc::clone(&file_manager),
factory,
DataFileType::SSTable,
Arc::new(crate::ttl::TTLProvider::disabled()),
schema_manager_for(num_columns),
);
let executor = CompactionExecutor::new(options, Arc::new(DbLifecycle::new_open())).unwrap();
let result = executor.execute_blocking(task, None).unwrap();
assert_eq!(result.edit().level_edits.len(), 2);
assert!(
result
.edit()
.level_edits
.iter()
.any(|edit| edit.new_files.len() == 1)
);
assert!(!result.new_files().is_empty());
let first_file = &result.new_files()[0];
assert_eq!(first_file.start_key, b"a");
assert_eq!(first_file.end_key, b"f");
let reader = file_manager
.open_data_file_reader(first_file.file_id)
.unwrap();
let mut iter = crate::sst::SSTIterator::with_cache_and_file(
Box::new(reader),
first_file,
crate::sst::SSTIteratorOptions {
bloom_filter_enabled: true,
..crate::sst::SSTIteratorOptions::default()
},
None,
)
.unwrap();
iter.seek_to_first().unwrap();
let mut keys = vec![];
while iter.valid() {
let (key, _) = iter.current().unwrap().unwrap();
keys.push(key.to_vec());
iter.next().unwrap();
}
assert_eq!(
keys,
vec![
b"a".to_vec(),
b"b".to_vec(),
b"c".to_vec(),
b"d".to_vec(),
b"e".to_vec(),
b"f".to_vec()
]
);
cleanup_test_dir(test_dir);
}
#[test]
#[serial_test::serial(file)]
fn test_compaction_with_duplicates() {
let test_dir = "/tmp/compaction_duplicates_test";
cleanup_test_dir(test_dir);
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register(format!("file://{}", test_dir))
.unwrap();
let metrics_manager = Arc::new(MetricsManager::new("compaction-test"));
let file_manager = Arc::new(
FileManager::with_defaults(Arc::clone(&fs), Arc::clone(&metrics_manager)).unwrap(),
);
let num_columns = 1;
let file1 = create_test_sst(
&file_manager,
vec![
(b"a", &make_value_bytes(b"new_a", num_columns)),
(b"b", &make_value_bytes(b"new_b", num_columns)),
],
)
.unwrap();
let file2 = create_test_sst(
&file_manager,
vec![
(b"a", &make_value_bytes(b"old_a", num_columns)),
(b"b", &make_value_bytes(b"old_b", num_columns)),
(b"c", &make_value_bytes(b"old_c", num_columns)),
],
)
.unwrap();
let run1 = SortedRun::new(0, vec![file1]);
let run2 = SortedRun::new(0, vec![file2]);
let options = CompactionConfig {
num_columns,
bloom_filter_enabled: true,
bloom_bits_per_key: 10,
..Default::default()
};
let factory = crate::compaction::make_sst_builder_factory(SSTWriterOptions {
metrics: None,
block_size: options.block_size,
buffer_size: options.buffer_size,
num_columns: options.num_columns,
bloom_filter_enabled: options.bloom_filter_enabled,
bloom_bits_per_key: options.bloom_bits_per_key,
partitioned_index: options.partitioned_index,
compression: crate::SstCompressionAlgorithm::None,
});
let compaction_metrics = Arc::new(CompactionTaskMetrics::new("test"));
let sst_metrics = Arc::new(crate::sst::SSTIteratorMetrics::new("test"));
let task = CompactionTask::new(
compaction_metrics,
sst_metrics,
0,
vec![run1, run2],
1,
Arc::clone(&file_manager),
factory,
DataFileType::SSTable,
Arc::new(crate::ttl::TTLProvider::disabled()),
schema_manager_for(num_columns),
);
let executor = CompactionExecutor::new(options, Arc::new(DbLifecycle::new_open())).unwrap();
let result = executor.execute_blocking(task, None).unwrap();
assert_eq!(result.edit().level_edits.len(), 2);
assert!(
result
.edit()
.level_edits
.iter()
.any(|edit| edit.new_files.len() == 1)
);
assert_eq!(result.new_files().len(), 1);
let reader = file_manager
.open_data_file_reader(result.new_files()[0].file_id)
.unwrap();
let mut iter = crate::sst::SSTIterator::with_cache_and_file(
Box::new(reader),
result.new_files()[0].as_ref(),
crate::sst::SSTIteratorOptions {
bloom_filter_enabled: true,
num_columns,
..Default::default()
},
None,
)
.unwrap();
iter.seek_to_first().unwrap();
assert!(iter.valid());
let (key, mut value) = iter.current().unwrap().unwrap();
assert_eq!(&key[..], b"a");
let decoded = crate::sst::row_codec::decode_value(&mut value, num_columns).unwrap();
assert_eq!(
decoded.columns()[0].as_ref().unwrap().data().as_ref(),
b"new_a"
);
iter.next().unwrap();
assert!(iter.valid());
let (key, mut value) = iter.current().unwrap().unwrap();
assert_eq!(&key[..], b"b");
let decoded = crate::sst::row_codec::decode_value(&mut value, num_columns).unwrap();
assert_eq!(
decoded.columns()[0].as_ref().unwrap().data().as_ref(),
b"new_b"
);
iter.next().unwrap();
assert!(iter.valid());
let (key, mut value) = iter.current().unwrap().unwrap();
assert_eq!(&key[..], b"c");
let decoded = crate::sst::row_codec::decode_value(&mut value, num_columns).unwrap();
assert_eq!(
decoded.columns()[0].as_ref().unwrap().data().as_ref(),
b"old_c"
);
cleanup_test_dir(test_dir);
}
#[test]
#[serial_test::serial(file)]
fn test_compaction_lazy_merge_with_separated_values() {
let test_dir = "/tmp/compaction_separated_merge_test";
cleanup_test_dir(test_dir);
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register(format!("file://{}", test_dir))
.unwrap();
let metrics_manager = Arc::new(MetricsManager::new("compaction-test"));
let file_manager = Arc::new(
FileManager::with_defaults(Arc::clone(&fs), Arc::clone(&metrics_manager)).unwrap(),
);
let num_columns = 1;
let old_put_separated = [0x11u8; 8];
let new_merge_separated_a = [0x22u8; 8];
let new_merge_separated_b = [0x33u8; 8];
let file1 = create_test_sst(
&file_manager,
vec![
(
b"a",
&make_typed_value_bytes(
ValueType::MergeSeparated,
&new_merge_separated_a,
num_columns,
),
),
(
b"b",
&make_typed_value_bytes(
ValueType::MergeSeparated,
&new_merge_separated_b,
num_columns,
),
),
],
)
.unwrap();
let file2 = create_test_sst(
&file_manager,
vec![
(
b"a",
&make_typed_value_bytes(
ValueType::PutSeparated,
&old_put_separated,
num_columns,
),
),
(
b"b",
&make_typed_value_bytes(ValueType::Put, b"base_b", num_columns),
),
],
)
.unwrap();
let run1 = SortedRun::new(0, vec![file1]);
let run2 = SortedRun::new(0, vec![file2]);
let options = CompactionConfig {
num_columns,
bloom_filter_enabled: true,
bloom_bits_per_key: 10,
..Default::default()
};
let factory = crate::compaction::make_sst_builder_factory(SSTWriterOptions {
metrics: None,
block_size: options.block_size,
buffer_size: options.buffer_size,
num_columns: options.num_columns,
bloom_filter_enabled: options.bloom_filter_enabled,
bloom_bits_per_key: options.bloom_bits_per_key,
partitioned_index: options.partitioned_index,
compression: crate::SstCompressionAlgorithm::None,
});
let compaction_metrics = Arc::new(CompactionTaskMetrics::new("test"));
let sst_metrics = Arc::new(crate::sst::SSTIteratorMetrics::new("test"));
let task = CompactionTask::new(
compaction_metrics,
sst_metrics,
0,
vec![run1, run2],
1,
Arc::clone(&file_manager),
factory,
DataFileType::SSTable,
Arc::new(crate::ttl::TTLProvider::disabled()),
schema_manager_for(num_columns),
);
let executor = CompactionExecutor::new(options, Arc::new(DbLifecycle::new_open())).unwrap();
let result = executor.execute_blocking(task, None).unwrap();
assert_eq!(result.new_files().len(), 1);
let reader = file_manager
.open_data_file_reader(result.new_files()[0].file_id)
.unwrap();
let mut iter = crate::sst::SSTIterator::with_cache_and_file(
Box::new(reader),
result.new_files()[0].as_ref(),
crate::sst::SSTIteratorOptions {
bloom_filter_enabled: true,
num_columns,
..Default::default()
},
None,
)
.unwrap();
iter.seek_to_first().unwrap();
assert!(iter.valid());
let (key, mut value) = iter.current().unwrap().unwrap();
assert_eq!(&key[..], b"a");
let decoded = crate::sst::row_codec::decode_value(&mut value, num_columns).unwrap();
let column = decoded.columns()[0].as_ref().unwrap();
assert_eq!(column.value_type, ValueType::PutSeparatedArray);
let merged_items = decode_merge_separated_array(column.data()).unwrap();
assert_eq!(merged_items.len(), 2);
assert_eq!(merged_items[0].value_type, ValueType::PutSeparated);
assert_eq!(merged_items[0].data().as_ref(), old_put_separated);
assert_eq!(merged_items[1].value_type, ValueType::MergeSeparated);
assert_eq!(merged_items[1].data().as_ref(), new_merge_separated_a);
iter.next().unwrap();
assert!(iter.valid());
let (key, mut value) = iter.current().unwrap().unwrap();
assert_eq!(&key[..], b"b");
let decoded = crate::sst::row_codec::decode_value(&mut value, num_columns).unwrap();
let column = decoded.columns()[0].as_ref().unwrap();
assert_eq!(column.value_type, ValueType::PutSeparatedArray);
let merged_items = decode_merge_separated_array(column.data()).unwrap();
assert_eq!(merged_items.len(), 2);
assert_eq!(merged_items[0].value_type, ValueType::Put);
assert_eq!(merged_items[0].data(), b"base_b");
assert_eq!(merged_items[1].value_type, ValueType::MergeSeparated);
assert_eq!(merged_items[1].data().as_ref(), new_merge_separated_b);
cleanup_test_dir(test_dir);
}
#[test]
#[serial_test::serial(file)]
fn test_compaction_evolves_older_schema_values() {
let test_dir = "/tmp/compaction_schema_evolution_test";
cleanup_test_dir(test_dir);
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register(format!("file://{}", test_dir))
.unwrap();
let metrics_manager = Arc::new(MetricsManager::new("compaction-test"));
let file_manager = Arc::new(
FileManager::with_defaults(Arc::clone(&fs), Arc::clone(&metrics_manager)).unwrap(),
);
let old_num_columns = 1;
let old_file = create_test_sst(
&file_manager,
vec![(b"k", &make_value_bytes(b"old", old_num_columns))],
)
.unwrap();
let schema_manager = Arc::new(SchemaManager::new(old_num_columns));
let mut schema_builder = schema_manager.builder();
schema_builder.add_column(1, None, None).unwrap();
let target_schema = schema_builder.commit();
let options = CompactionConfig {
num_columns: target_schema.num_columns(),
bloom_filter_enabled: true,
bloom_bits_per_key: 10,
..Default::default()
};
let factory = crate::compaction::make_sst_builder_factory(SSTWriterOptions {
metrics: None,
block_size: options.block_size,
buffer_size: options.buffer_size,
num_columns: options.num_columns,
bloom_filter_enabled: options.bloom_filter_enabled,
bloom_bits_per_key: options.bloom_bits_per_key,
partitioned_index: options.partitioned_index,
compression: crate::SstCompressionAlgorithm::None,
});
let compaction_metrics = Arc::new(CompactionTaskMetrics::new("test"));
let sst_metrics = Arc::new(crate::sst::SSTIteratorMetrics::new("test"));
let task = CompactionTask::new(
compaction_metrics,
sst_metrics,
0,
vec![SortedRun::new(0, vec![old_file])],
1,
Arc::clone(&file_manager),
factory,
DataFileType::SSTable,
Arc::new(crate::ttl::TTLProvider::disabled()),
Arc::clone(&schema_manager),
);
let executor = CompactionExecutor::new(options, Arc::new(DbLifecycle::new_open())).unwrap();
let result = executor.execute_blocking(task, None).unwrap();
assert_eq!(result.new_files().len(), 1);
assert_eq!(result.new_files()[0].schema_id, target_schema.version());
let reader = file_manager
.open_data_file_reader(result.new_files()[0].file_id)
.unwrap();
let mut iter = crate::sst::SSTIterator::with_cache_and_file(
Box::new(reader),
result.new_files()[0].as_ref(),
crate::sst::SSTIteratorOptions {
bloom_filter_enabled: true,
num_columns: target_schema.num_columns(),
..Default::default()
},
None,
)
.unwrap();
iter.seek_to_first().unwrap();
let (_, mut value) = iter.current().unwrap().unwrap();
let decoded =
crate::sst::row_codec::decode_value(&mut value, target_schema.num_columns()).unwrap();
assert_eq!(decoded.columns().len(), 2);
assert_eq!(
decoded.columns()[0].as_ref().unwrap().data().as_ref(),
b"old"
);
assert!(decoded.columns()[1].is_none());
cleanup_test_dir(test_dir);
}
#[test]
#[serial_test::serial(file)]
fn test_compaction_multiple_output_files() {
let test_dir = "/tmp/compaction_multi_output_test";
cleanup_test_dir(test_dir);
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register(format!("file://{}", test_dir))
.unwrap();
let metrics_manager = Arc::new(MetricsManager::new("compaction-test"));
let file_manager = Arc::new(
FileManager::with_defaults(Arc::clone(&fs), Arc::clone(&metrics_manager)).unwrap(),
);
let num_columns = 1;
let mut entries = vec![];
for i in 0..100 {
let key = format!("key{:04}", i);
let value = format!("value{:04}_with_some_extra_padding_data", i);
entries.push((
key.into_bytes(),
make_value_bytes(value.as_bytes(), num_columns),
));
}
let (file_id, writer_file) = file_manager.create_data_file().unwrap();
let mut writer = SSTWriter::new(
writer_file,
SSTWriterOptions {
bloom_filter_enabled: true,
..SSTWriterOptions::default()
},
);
for (key, value) in &entries {
writer.add(key, value).unwrap();
}
let (first_key, last_key, file_size, footer_bytes) = writer.finish_with_range().unwrap();
let bucket_range = DataFile::bucket_range_from_keys(&first_key, &last_key);
let file = DataFile::new(
DataFileType::SSTable,
first_key,
last_key,
file_id,
TrackedFileId::new(&file_manager, file_id),
0,
file_size,
bucket_range.clone(),
bucket_range,
);
file.set_meta_bytes(footer_bytes);
let file = Arc::new(file);
let run = SortedRun::new(0, vec![file]);
let options = CompactionConfig {
num_columns,
target_file_size: 500, bloom_filter_enabled: true,
bloom_bits_per_key: 10,
..Default::default()
};
let factory = crate::compaction::make_sst_builder_factory(SSTWriterOptions {
metrics: None,
block_size: options.block_size,
buffer_size: options.buffer_size,
num_columns: options.num_columns,
bloom_filter_enabled: options.bloom_filter_enabled,
bloom_bits_per_key: options.bloom_bits_per_key,
partitioned_index: options.partitioned_index,
compression: crate::SstCompressionAlgorithm::None,
});
let compaction_metrics = Arc::new(CompactionTaskMetrics::new("test"));
let sst_metrics = Arc::new(crate::sst::SSTIteratorMetrics::new("test"));
let task = CompactionTask::new(
compaction_metrics,
sst_metrics,
0,
vec![run],
1,
Arc::clone(&file_manager),
factory,
DataFileType::SSTable,
Arc::new(crate::ttl::TTLProvider::disabled()),
schema_manager_for(num_columns),
);
let executor = CompactionExecutor::new(options, Arc::new(DbLifecycle::new_open())).unwrap();
let result = executor.execute_blocking(task, None).unwrap();
assert_eq!(result.edit().level_edits.len(), 2);
assert!(
result
.edit()
.level_edits
.iter()
.any(|edit| edit.new_files.len() > 1)
);
assert!(result.new_files().len() > 1);
for i in 1..result.new_files().len() {
let prev_file = &result.new_files()[i - 1];
let curr_file = &result.new_files()[i];
assert!(
prev_file.end_key < curr_file.start_key,
"Files should have non-overlapping, sorted key ranges"
);
}
cleanup_test_dir(test_dir);
}
#[test]
#[serial_test::serial(file)]
fn test_compaction_empty_input() {
let test_dir = "/tmp/compaction_empty_test";
cleanup_test_dir(test_dir);
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register(format!("file://{}", test_dir))
.unwrap();
let metrics_manager = Arc::new(MetricsManager::new("compaction-test"));
let file_manager = Arc::new(
FileManager::with_defaults(Arc::clone(&fs), Arc::clone(&metrics_manager)).unwrap(),
);
let options = CompactionConfig::default();
let factory = crate::compaction::make_sst_builder_factory(SSTWriterOptions {
metrics: None,
block_size: options.block_size,
buffer_size: options.buffer_size,
num_columns: options.num_columns,
bloom_filter_enabled: options.bloom_filter_enabled,
bloom_bits_per_key: options.bloom_bits_per_key,
partitioned_index: options.partitioned_index,
compression: crate::SstCompressionAlgorithm::None,
});
let compaction_metrics = Arc::new(CompactionTaskMetrics::new("test"));
let sst_metrics = Arc::new(crate::sst::SSTIteratorMetrics::new("test"));
let task = CompactionTask::new(
compaction_metrics,
sst_metrics,
0,
vec![],
1,
Arc::clone(&file_manager),
factory,
DataFileType::SSTable,
Arc::new(crate::ttl::TTLProvider::disabled()),
schema_manager_for(options.num_columns),
);
let executor =
CompactionExecutor::with_defaults(Arc::new(DbLifecycle::new_open())).unwrap();
let result = executor.execute_blocking(task, None).unwrap();
assert_eq!(result.edit().level_edits.len(), 1);
assert!(result.edit().level_edits[0].new_files.is_empty());
assert!(result.new_files().is_empty());
cleanup_test_dir(test_dir);
}
#[test]
#[serial_test::serial(file)]
fn test_compaction_tracks_vlog_entry_deletions_for_shadowed_values() {
let test_dir = "/tmp/compaction_vlog_entry_delta_test";
cleanup_test_dir(test_dir);
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register(format!("file://{}", test_dir))
.unwrap();
let metrics_manager = Arc::new(MetricsManager::new("compaction-test"));
let file_manager = Arc::new(
FileManager::with_defaults(Arc::clone(&fs), Arc::clone(&metrics_manager)).unwrap(),
);
let num_columns = 1;
let pointer = crate::vlog::VlogPointer::new(9, 0);
let older = create_test_sst(
&file_manager,
vec![(
b"k",
&make_typed_value_bytes(ValueType::PutSeparated, &pointer.to_bytes(), num_columns),
)],
)
.unwrap();
let newer = create_test_sst(
&file_manager,
vec![(b"k", &make_value_bytes(b"inline", num_columns))],
)
.unwrap();
let factory = crate::compaction::make_sst_builder_factory(SSTWriterOptions::default());
let compaction_metrics = Arc::new(CompactionTaskMetrics::new("test"));
let sst_metrics = Arc::new(crate::sst::SSTIteratorMetrics::new("test"));
let task = CompactionTask::new(
compaction_metrics,
sst_metrics,
0,
vec![
SortedRun::new(0, vec![newer]),
SortedRun::new(1, vec![older]),
],
1,
Arc::clone(&file_manager),
factory,
DataFileType::SSTable,
Arc::new(crate::ttl::TTLProvider::disabled()),
schema_manager_for(num_columns),
);
let executor =
CompactionExecutor::with_defaults(Arc::new(DbLifecycle::new_open())).unwrap();
let result = executor.execute_blocking(task, None).unwrap();
let deltas: std::collections::HashMap<u32, i64> = result
.vlog_edit()
.unwrap()
.entry_deltas()
.into_iter()
.collect();
assert_eq!(deltas.get(&9).copied(), Some(-1));
cleanup_test_dir(test_dir);
}
#[test]
#[serial_test::serial(file)]
fn test_compaction_output_parquet() {
let test_dir = "/tmp/compaction_output_parquet_test";
cleanup_test_dir(test_dir);
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register(format!("file://{}", test_dir))
.unwrap();
let metrics_manager = Arc::new(MetricsManager::new("compaction-test"));
let file_manager = Arc::new(
FileManager::with_defaults(Arc::clone(&fs), Arc::clone(&metrics_manager)).unwrap(),
);
let num_columns = 1;
let file1 = create_test_sst(
&file_manager,
vec![
(b"a", &make_value_bytes(b"v1", num_columns)),
(b"c", &make_value_bytes(b"v3", num_columns)),
],
)
.unwrap();
let file2 = create_test_sst(
&file_manager,
vec![
(b"b", &make_value_bytes(b"v2", num_columns)),
(b"d", &make_value_bytes(b"v4", num_columns)),
],
)
.unwrap();
let options = CompactionConfig {
num_columns,
target_file_size: 1024 * 1024,
..Default::default()
};
let factory = crate::compaction::make_data_file_builder_factory(WriterOptions::Parquet(
ParquetWriterOptions {
row_group_size_bytes: 256 * 1024,
buffer_size: options.buffer_size,
num_columns,
},
));
let task = CompactionTask::new(
Arc::new(CompactionTaskMetrics::new("test")),
Arc::new(crate::sst::SSTIteratorMetrics::new("test")),
0,
vec![
SortedRun::new(0, vec![file1]),
SortedRun::new(1, vec![file2]),
],
1,
Arc::clone(&file_manager),
factory,
DataFileType::Parquet,
Arc::new(crate::ttl::TTLProvider::disabled()),
schema_manager_for(num_columns),
);
let executor = CompactionExecutor::new(options, Arc::new(DbLifecycle::new_open())).unwrap();
let result = executor.execute_blocking(task, None).unwrap();
assert!(!result.new_files().is_empty());
assert!(
result
.new_files()
.iter()
.all(|file| file.file_type == DataFileType::Parquet)
);
let output = result.new_files()[0].clone();
let reader = file_manager.open_data_file_reader(output.file_id).unwrap();
let mut iter =
ParquetIterator::from_data_file(Box::new(reader), output.as_ref(), None).unwrap();
iter.seek_to_first().unwrap();
let mut keys = Vec::new();
while iter.valid() {
keys.push(iter.key().unwrap().unwrap().to_vec());
iter.next().unwrap();
}
assert_eq!(
keys,
vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec(), b"d".to_vec()]
);
cleanup_test_dir(test_dir);
}
#[test]
#[serial_test::serial(file)]
fn test_compaction_input_parquet_output_parquet() {
let test_dir = "/tmp/compaction_parquet_to_parquet_test";
cleanup_test_dir(test_dir);
let registry = FileSystemRegistry::new();
let fs = registry
.get_or_register(format!("file://{}", test_dir))
.unwrap();
let metrics_manager = Arc::new(MetricsManager::new("compaction-test"));
let file_manager = Arc::new(
FileManager::with_defaults(Arc::clone(&fs), Arc::clone(&metrics_manager)).unwrap(),
);
let num_columns = 1;
let file1 = create_test_parquet(
&file_manager,
vec![
(b"a", &make_value_bytes(b"new_a", num_columns)),
(b"b", &make_value_bytes(b"new_b", num_columns)),
],
)
.unwrap();
let file2 = create_test_parquet(
&file_manager,
vec![
(b"a", &make_value_bytes(b"old_a", num_columns)),
(b"c", &make_value_bytes(b"old_c", num_columns)),
],
)
.unwrap();
let options = CompactionConfig {
num_columns,
target_file_size: 1024 * 1024,
..Default::default()
};
let factory = crate::compaction::make_data_file_builder_factory(WriterOptions::Parquet(
ParquetWriterOptions {
row_group_size_bytes: 256 * 1024,
buffer_size: options.buffer_size,
num_columns,
},
));
let task = CompactionTask::new(
Arc::new(CompactionTaskMetrics::new("test")),
Arc::new(crate::sst::SSTIteratorMetrics::new("test")),
0,
vec![
SortedRun::new(0, vec![file1]),
SortedRun::new(1, vec![file2]),
],
1,
Arc::clone(&file_manager),
factory,
DataFileType::Parquet,
Arc::new(crate::ttl::TTLProvider::disabled()),
schema_manager_for(num_columns),
);
let executor = CompactionExecutor::new(options, Arc::new(DbLifecycle::new_open())).unwrap();
let result = executor.execute_blocking(task, None).unwrap();
let output = result.new_files()[0].clone();
let reader = file_manager.open_data_file_reader(output.file_id).unwrap();
let mut iter =
ParquetIterator::from_data_file(Box::new(reader), output.as_ref(), None).unwrap();
iter.seek_to_first().unwrap();
let mut rows = Vec::new();
while iter.valid() {
rows.push((
iter.key().unwrap().unwrap().to_vec(),
iter.value().unwrap().unwrap().to_vec(),
));
iter.next().unwrap();
}
assert_eq!(rows.len(), 3);
assert_eq!(rows[0].0, b"a".to_vec());
assert_eq!(rows[1].0, b"b".to_vec());
assert_eq!(rows[2].0, b"c".to_vec());
cleanup_test_dir(test_dir);
}
}