#include <string>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/table_properties_collectors.h"
#include "test_util/mock_time_env.h"
#include "test_util/sync_point.h"
#include "test_util/testutil.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
static std::string CompressibleString(Random* rnd, int len) {
std::string r;
test::CompressibleString(rnd, 0.8, len, &r);
return r;
}
class DBTestUniversalCompactionBase
: public DBTestBase,
public ::testing::WithParamInterface<std::tuple<int, bool>> {
public:
explicit DBTestUniversalCompactionBase(const std::string& path)
: DBTestBase(path, false) {}
void SetUp() override {
num_levels_ = std::get<0>(GetParam());
exclusive_manual_compaction_ = std::get<1>(GetParam());
}
int num_levels_;
bool exclusive_manual_compaction_;
};
class DBTestUniversalCompaction : public DBTestUniversalCompactionBase {
public:
DBTestUniversalCompaction()
: DBTestUniversalCompactionBase("/db_universal_compaction_test") {}
};
class DBTestUniversalCompaction2 : public DBTestBase {
public:
DBTestUniversalCompaction2()
: DBTestBase("db_universal_compaction_test2", false) {}
};
namespace {
void VerifyCompactionResult(
const ColumnFamilyMetaData& cf_meta,
const std::set<std::string>& overlapping_file_numbers) {
#ifndef NDEBUG
for (auto& level : cf_meta.levels) {
for (auto& file : level.files) {
assert(overlapping_file_numbers.find(file.name) ==
overlapping_file_numbers.end());
}
}
#endif
}
class KeepFilter : public CompactionFilter {
public:
bool Filter(int , const Slice& , const Slice& ,
std::string* ,
bool* ) const override {
return false;
}
const char* Name() const override { return "KeepFilter"; }
};
class KeepFilterFactory : public CompactionFilterFactory {
public:
explicit KeepFilterFactory(bool check_context = false)
: check_context_(check_context) {}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
if (check_context_) {
EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
}
return std::unique_ptr<CompactionFilter>(new KeepFilter());
}
const char* Name() const override { return "KeepFilterFactory"; }
bool check_context_;
std::atomic_bool expect_full_compaction_;
std::atomic_bool expect_manual_compaction_;
};
}
TEST_P(DBTestUniversalCompaction, UniversalCompactionSingleSortedRun) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.level0_file_num_compaction_trigger = 0;
options.compaction_options_universal.size_ratio = 10;
options.compaction_options_universal.min_merge_width = 2;
options.compaction_options_universal.max_size_amplification_percent = 0;
options.write_buffer_size = 105 << 10; options.arena_block_size = 4 << 10;
options.target_file_size_base = 32 << 10; KeepFilterFactory* filter = new KeepFilterFactory(true);
filter->expect_manual_compaction_.store(false);
options.compaction_filter_factory.reset(filter);
DestroyAndReopen(options);
ASSERT_EQ(1, db_->GetOptions().level0_file_num_compaction_trigger);
Random rnd(301);
int key_idx = 0;
filter->expect_full_compaction_.store(true);
for (int num = 0; num < 16; num++) {
GenerateNewFile(&rnd, &key_idx);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(0), 1);
}
ASSERT_OK(Put(Key(key_idx), ""));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(0), 1);
}
TEST_P(DBTestUniversalCompaction, OptimizeFiltersForHits) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.size_ratio = 5;
options.num_levels = num_levels_;
options.write_buffer_size = 105 << 10; options.arena_block_size = 4 << 10;
options.target_file_size_base = 32 << 10; options.level0_file_num_compaction_trigger = 4;
BlockBasedTableOptions bbto;
bbto.cache_index_and_filter_blocks = true;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.whole_key_filtering = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.optimize_filters_for_hits = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.memtable_factory.reset(test::NewSpecialSkipListFactory(3));
DestroyAndReopen(options);
env_->SetBackgroundThreads(1, Env::LOW);
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
ASSERT_OK(Put(Key(num * 10), "val"));
if (num) {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
ASSERT_OK(Put(Key(30 + num * 10), "val"));
ASSERT_OK(Put(Key(60 + num * 10), "val"));
}
ASSERT_OK(Put("", ""));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
for (int i = 5; i < 90; i += 10) {
ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
}
ASSERT_GT(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0);
auto prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
ASSERT_EQ(Get(Key(35)), "NOT_FOUND");
ASSERT_EQ(prev_counter + NumTableFilesAtLevel(0) - 1,
TestGetTickerCount(options, BLOOM_FILTER_USEFUL));
prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
sleeping_task_low.WakeUp();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
for (int i = 5; i < 90; i += 10) {
ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
}
ASSERT_EQ(prev_counter, TestGetTickerCount(options, BLOOM_FILTER_USEFUL));
}
TEST_P(DBTestUniversalCompaction, UniversalCompactionTrigger) {
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.size_ratio = 5;
options.num_levels = num_levels_;
options.write_buffer_size = 105 << 10; options.arena_block_size = 4 << 10;
options.level0_file_num_compaction_trigger = 4;
KeepFilterFactory* filter = new KeepFilterFactory(true);
filter->expect_manual_compaction_.store(false);
options.compaction_filter_factory.reset(filter);
options = CurrentOptions(options);
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBTestWritableFile.GetPreallocationStatus", [&](void* arg) {
ASSERT_TRUE(arg != nullptr);
size_t preallocation_size = *(static_cast<size_t*>(arg));
if (num_levels_ > 3) {
ASSERT_LE(preallocation_size, options.target_file_size_base * 1.1);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
int key_idx = 0;
filter->expect_full_compaction_.store(true);
for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
num++) {
GenerateNewFile(1, &rnd, &key_idx);
}
GenerateNewFile(1, &rnd, &key_idx);
ASSERT_EQ(NumSortedRuns(1), 1);
filter->expect_full_compaction_.store(false);
ASSERT_OK(Flush(1));
for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
num++) {
GenerateNewFile(1, &rnd, &key_idx);
ASSERT_EQ(NumSortedRuns(1), num + 3);
}
GenerateNewFile(1, &rnd, &key_idx);
ASSERT_EQ(NumSortedRuns(1), 2);
for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
num++) {
GenerateNewFile(1, &rnd, &key_idx);
ASSERT_EQ(NumSortedRuns(1), num + 3);
}
GenerateNewFile(1, &rnd, &key_idx);
ASSERT_EQ(NumSortedRuns(1), 3);
GenerateNewFile(1, &rnd, &key_idx);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(1), 4);
filter->expect_full_compaction_.store(true);
GenerateNewFile(1, &rnd, &key_idx);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(1), 1);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(DBTestUniversalCompaction, UniversalCompactionSizeAmplification) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.write_buffer_size = 100 << 10; options.target_file_size_base = 32 << 10; options.level0_file_num_compaction_trigger = 3;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
options.compaction_options_universal.max_size_amplification_percent = 110;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
Random rnd(301);
int key_idx = 0;
for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
num++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), rnd.RandomString(10000)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_EQ(NumSortedRuns(1), num + 1);
}
ASSERT_EQ(NumSortedRuns(1), 2);
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(1), 1);
}
TEST_P(DBTestUniversalCompaction, DynamicUniversalCompactionSizeAmplification) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
options.write_buffer_size = 100 << 10; options.target_file_size_base = 32 << 10; options.level0_file_num_compaction_trigger = 3;
options.compaction_options_universal.size_ratio = 100;
options.compaction_options_universal.min_merge_width = 100;
DestroyAndReopen(options);
int total_picked_compactions = 0;
int total_size_amp_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
if (arg) {
total_picked_compactions++;
Compaction* c = static_cast<Compaction*>(arg);
if (c->compaction_reason() ==
CompactionReason::kUniversalSizeAmplification) {
total_size_amp_compactions++;
}
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
MutableCFOptions mutable_cf_options;
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
int key_idx = 0;
for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
num++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), rnd.RandomString(10000)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_EQ(NumSortedRuns(1), num + 1);
}
ASSERT_EQ(NumSortedRuns(1), 2);
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(1), 3);
ASSERT_EQ(dbfull()
->GetOptions(handles_[1])
.compaction_options_universal.max_size_amplification_percent,
200U);
ASSERT_OK(dbfull()->SetOptions(handles_[1],
{{"compaction_options_universal",
"{max_size_amplification_percent=110;}"}}));
ASSERT_EQ(dbfull()
->GetOptions(handles_[1])
.compaction_options_universal.max_size_amplification_percent,
110u);
ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
&mutable_cf_options));
ASSERT_EQ(110u, mutable_cf_options.compaction_options_universal
.max_size_amplification_percent);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(1), 1);
ASSERT_EQ(total_picked_compactions, 1);
ASSERT_EQ(total_size_amp_compactions, 1);
}
TEST_P(DBTestUniversalCompaction, DynamicUniversalCompactionReadAmplification) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
options.write_buffer_size = 100 << 10; options.target_file_size_base = 32 << 10; options.level0_file_num_compaction_trigger = 3;
options.compaction_options_universal.max_size_amplification_percent = 2000;
options.compaction_options_universal.size_ratio = 0;
options.compaction_options_universal.min_merge_width = 100;
DestroyAndReopen(options);
int total_picked_compactions = 0;
int total_size_ratio_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
if (arg) {
total_picked_compactions++;
Compaction* c = static_cast<Compaction*>(arg);
if (c->compaction_reason() == CompactionReason::kUniversalSizeRatio) {
total_size_ratio_compactions++;
}
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
MutableCFOptions mutable_cf_options;
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
int key_idx = 0;
for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), rnd.RandomString(10000)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_EQ(NumSortedRuns(1), num + 1);
}
ASSERT_EQ(NumSortedRuns(1), options.level0_file_num_compaction_trigger);
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(1), options.level0_file_num_compaction_trigger + 1);
ASSERT_EQ(total_picked_compactions, 0);
ASSERT_OK(dbfull()->SetOptions(
handles_[1],
{{"compaction_options_universal",
"{min_merge_width=2;max_merge_width=2;size_ratio=100;}"}}));
ASSERT_EQ(dbfull()
->GetOptions(handles_[1])
.compaction_options_universal.min_merge_width,
2u);
ASSERT_EQ(dbfull()
->GetOptions(handles_[1])
.compaction_options_universal.max_merge_width,
2u);
ASSERT_EQ(
dbfull()->GetOptions(handles_[1]).compaction_options_universal.size_ratio,
100u);
ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
&mutable_cf_options));
ASSERT_EQ(mutable_cf_options.compaction_options_universal.size_ratio, 100u);
ASSERT_EQ(mutable_cf_options.compaction_options_universal.min_merge_width,
2u);
ASSERT_EQ(mutable_cf_options.compaction_options_universal.max_merge_width,
2u);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(1), 2);
ASSERT_EQ(total_picked_compactions, 2);
ASSERT_EQ(total_size_ratio_compactions, 2);
}
TEST_P(DBTestUniversalCompaction, CompactFilesOnUniversalCompaction) {
const int kTestKeySize = 16;
const int kTestValueSize = 984;
const int kEntrySize = kTestKeySize + kTestValueSize;
const int kEntriesPerBuffer = 10;
ChangeCompactOptions();
Options options;
options.create_if_missing = true;
options.compaction_style = kCompactionStyleLevel;
options.num_levels = 1;
options.target_file_size_base = options.write_buffer_size;
options.compression = kNoCompression;
options = CurrentOptions(options);
options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(options.compaction_style, kCompactionStyleUniversal);
Random rnd(301);
for (int key = 1024 * kEntriesPerBuffer; key >= 0; --key) {
ASSERT_OK(Put(1, std::to_string(key), rnd.RandomString(kTestValueSize)));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
std::vector<std::string> compaction_input_file_names;
for (const auto& file : cf_meta.levels[0].files) {
if (rnd.OneIn(2)) {
compaction_input_file_names.push_back(file.name);
}
}
if (compaction_input_file_names.size() == 0) {
compaction_input_file_names.push_back(cf_meta.levels[0].files[0].name);
}
ASSERT_FALSE(dbfull()
->CompactFiles(CompactionOptions(), handles_[1],
compaction_input_file_names, 1)
.ok());
ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), handles_[1],
compaction_input_file_names, 0));
dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
VerifyCompactionResult(
cf_meta, std::set<std::string>(compaction_input_file_names.begin(),
compaction_input_file_names.end()));
compaction_input_file_names.clear();
compaction_input_file_names.push_back(cf_meta.levels[0].files[0].name);
compaction_input_file_names.push_back(
cf_meta.levels[0].files[cf_meta.levels[0].files.size() - 1].name);
ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), handles_[1],
compaction_input_file_names, 0));
dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
ASSERT_EQ(cf_meta.levels[0].files.size(), 1U);
}
TEST_P(DBTestUniversalCompaction, UniversalCompactionTargetLevel) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100 << 10; options.num_levels = 7;
options.disable_auto_compactions = true;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 210; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
}
ASSERT_OK(Flush());
for (int i = 200; i < 300; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
}
ASSERT_OK(Flush());
for (int i = 250; i < 260; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(100)));
}
ASSERT_OK(Flush());
ASSERT_EQ("3", FilesPerLevel(0));
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 4;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
}
#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
class DBTestUniversalCompactionMultiLevels
: public DBTestUniversalCompactionBase {
public:
DBTestUniversalCompactionMultiLevels()
: DBTestUniversalCompactionBase(
"/db_universal_compaction_multi_levels_test") {}
};
TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionMultiLevels) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.write_buffer_size = 100 << 10; options.level0_file_num_compaction_trigger = 8;
options.max_background_compactions = 3;
options.target_file_size_base = 32 * 1024;
CreateAndReopenWithCF({"pikachu"}, options);
options.compaction_options_universal.max_size_amplification_percent = 110;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
Random rnd(301);
int num_keys = 100000;
for (int i = 0; i < num_keys * 2; i++) {
ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
for (int i = num_keys; i < num_keys * 2; i++) {
ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
}
}
TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) {
int32_t trivial_move = 0;
int32_t non_trivial_move = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:TrivialMove",
[&](void* ) { trivial_move++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
non_trivial_move++;
ASSERT_TRUE(arg != nullptr);
int output_level = *(static_cast<int*>(arg));
ASSERT_EQ(output_level, 0);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.allow_trivial_move = true;
options.num_levels = 3;
options.write_buffer_size = 100 << 10; options.level0_file_num_compaction_trigger = 3;
options.max_background_compactions = 2;
options.target_file_size_base = 32 * 1024;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
options.compaction_options_universal.max_size_amplification_percent = 110;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
Random rnd(301);
int num_keys = 150000;
for (int i = 0; i < num_keys; i++) {
ASSERT_OK(Put(1, Key(i), Key(i)));
}
std::vector<std::string> values;
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(trivial_move, 0);
ASSERT_GT(non_trivial_move, 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(MultiLevels, DBTestUniversalCompactionMultiLevels,
::testing::Combine(::testing::Values(3, 20),
::testing::Bool()));
class DBTestUniversalCompactionParallel : public DBTestUniversalCompactionBase {
public:
DBTestUniversalCompactionParallel()
: DBTestUniversalCompactionBase("/db_universal_compaction_prallel_test") {
}
};
TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.env = env_;
options.write_buffer_size = 1 << 10; options.level0_file_num_compaction_trigger = 3;
options.max_background_compactions = 3;
options.max_background_flushes = 3;
options.target_file_size_base = 1 * 1024;
options.compaction_options_universal.max_size_amplification_percent = 110;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
std::atomic<int> num_compactions_running(0);
std::atomic<bool> has_parallel(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():Start", [&](void* ) {
if (num_compactions_running.fetch_add(1) > 0) {
has_parallel.store(true);
return;
}
for (int nwait = 0; nwait < 20000; nwait++) {
if (has_parallel.load() || num_compactions_running.load() > 1) {
has_parallel.store(true);
break;
}
env_->SleepForMicroseconds(1000);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():End",
[&](void* ) { num_compactions_running.fetch_add(-1); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
Random rnd(301);
int num_keys = 30000;
for (int i = 0; i < num_keys * 2; i++) {
ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(num_compactions_running.load(), 0);
ASSERT_TRUE(has_parallel.load());
for (int i = num_keys; i < num_keys * 2; i++) {
ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
}
ReopenWithColumnFamilies({"default", "pikachu"}, options);
for (int i = num_keys; i < num_keys * 2; i++) {
ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
}
}
TEST_P(DBTestUniversalCompactionParallel, PickByFileNumberBug) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.write_buffer_size = 1 * 1024; options.level0_file_num_compaction_trigger = 7;
options.max_background_compactions = 2;
options.target_file_size_base = 1024 * 1024;
options.compaction_options_universal.max_size_amplification_percent =
UINT_MAX;
DestroyAndReopen(options);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBTestUniversalCompactionParallel::PickByFileNumberBug:0",
"BackgroundCallCompaction:0"},
{"UniversalCompactionBuilder::PickCompaction:Return",
"DBTestUniversalCompactionParallel::PickByFileNumberBug:1"},
{"DBTestUniversalCompactionParallel::PickByFileNumberBug:2",
"CompactionJob::Run():Start"}});
int total_picked_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
if (arg) {
total_picked_compactions++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
int key_idx = 1;
for (int i = 1; i <= 70; i++) {
std::string k = Key(key_idx++);
ASSERT_OK(Put(k, k));
if (i % 10 == 0) {
ASSERT_OK(Flush());
}
}
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
int num_keys = 1000;
for (int i = 0; i < 3; i++) {
for (int j = 1; j <= num_keys; j++) {
std::string k = Key(key_idx++);
ASSERT_OK(Put(k, k));
}
ASSERT_OK(Flush());
num_keys -= 100;
}
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
ASSERT_OK(dbfull()->TEST_WaitForCompact());
EXPECT_EQ(total_picked_compactions, 1);
EXPECT_EQ(TotalTableFiles(), 4);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
key_idx = 1;
total_picked_compactions = 0;
DestroyAndReopen(options);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 1; i <= 70; i++) {
std::string k = Key(key_idx++);
ASSERT_OK(Put(k, k));
if (i % 10 == 0) {
ASSERT_OK(Flush());
}
}
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
num_keys = 1000;
for (int i = 0; i < 8; i++) {
for (int j = 1; j <= num_keys; j++) {
std::string k = Key(key_idx++);
ASSERT_OK(Put(k, k));
}
ASSERT_OK(Flush());
num_keys -= 100;
}
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
ASSERT_OK(dbfull()->TEST_WaitForCompact());
EXPECT_GE(total_picked_compactions, 2);
}
INSTANTIATE_TEST_CASE_P(Parallel, DBTestUniversalCompactionParallel,
::testing::Combine(::testing::Values(1, 10),
::testing::Values(false)));
#endif
TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 105 << 10; options.arena_block_size = 4 << 10; options.target_file_size_base = 32 << 10; options.level0_file_num_compaction_trigger = 4;
options.num_levels = num_levels_;
options.compaction_options_universal.compression_size_percent = -1;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
int key_idx = 0;
for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(1, Key(key_idx), rnd.RandomString(990)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
if (num < options.level0_file_num_compaction_trigger - 1) {
ASSERT_EQ(NumSortedRuns(1), num + 1);
}
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(1), 1);
}
TEST_P(DBTestUniversalCompaction, UniversalCompactionStopStyleSimilarSize) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 105 << 10; options.arena_block_size = 4 << 10; options.target_file_size_base = 32 << 10; options.level0_file_num_compaction_trigger = 4;
options.compaction_options_universal.size_ratio = 10;
options.compaction_options_universal.stop_style =
kCompactionStopStyleSimilarSize;
options.num_levels = num_levels_;
DestroyAndReopen(options);
Random rnd(301);
int key_idx = 0;
for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
num++) {
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(key_idx), rnd.RandomString(990)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(NumSortedRuns(), num + 1);
}
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(key_idx), rnd.RandomString(990)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(), 1);
ASSERT_OK(dbfull()->Flush(FlushOptions()));
for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
num++) {
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(key_idx), rnd.RandomString(990)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(NumSortedRuns(), num + 3);
}
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(key_idx), rnd.RandomString(990)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(), 3);
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(key_idx), rnd.RandomString(990)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(), 4);
}
TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio1) {
if (!Snappy_Supported()) {
return;
}
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100 << 10; options.target_file_size_base = 32 << 10; options.level0_file_num_compaction_trigger = 2;
options.num_levels = num_levels_;
options.compaction_options_universal.compression_size_percent = 70;
DestroyAndReopen(options);
Random rnd(301);
int key_idx = 0;
for (int num = 0; num < 2; num++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ASSERT_LT(TotalSize(), 110000U * 2 * 0.9);
for (int num = 0; num < 2; num++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ASSERT_LT(TotalSize(), 110000 * 4 * 0.9);
for (int num = 0; num < 2; num++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ASSERT_LT(TotalSize(), 110000 * 6 * 0.9);
for (int num = 0; num < 8; num++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ASSERT_GT(TotalSize(), 110000 * 11 * 0.8 + 110000 * 2);
}
TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio2) {
if (!Snappy_Supported()) {
return;
}
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100 << 10; options.target_file_size_base = 32 << 10; options.level0_file_num_compaction_trigger = 2;
options.num_levels = num_levels_;
options.compaction_options_universal.compression_size_percent = 95;
DestroyAndReopen(options);
Random rnd(301);
int key_idx = 0;
for (int num = 0; num < 14; num++) {
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ASSERT_LT(TotalSize(), 120000U * 12 * 0.82 + 120000 * 2);
}
#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest1) {
int32_t trivial_move = 0;
int32_t non_trivial_move = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:TrivialMove",
[&](void* ) { trivial_move++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
non_trivial_move++;
ASSERT_TRUE(arg != nullptr);
int output_level = *(static_cast<int*>(arg));
ASSERT_EQ(output_level, 0);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.allow_trivial_move = true;
options.num_levels = 2;
options.write_buffer_size = 100 << 10; options.level0_file_num_compaction_trigger = 3;
options.max_background_compactions = 1;
options.target_file_size_base = 32 * 1024;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
options.compaction_options_universal.max_size_amplification_percent = 110;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
Random rnd(301);
int num_keys = 250000;
for (int i = 0; i < num_keys; i++) {
ASSERT_OK(Put(1, Key(i), Key(i)));
}
std::vector<std::string> values;
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(trivial_move, 0);
ASSERT_GT(non_trivial_move, 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest2) {
int32_t trivial_move = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:TrivialMove",
[&](void* ) { trivial_move++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
ASSERT_TRUE(arg != nullptr);
int output_level = *(static_cast<int*>(arg));
ASSERT_EQ(output_level, 0);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.allow_trivial_move = true;
options.num_levels = 15;
options.write_buffer_size = 100 << 10; options.level0_file_num_compaction_trigger = 8;
options.max_background_compactions = 2;
options.target_file_size_base = 64 * 1024;
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
options.compaction_options_universal.max_size_amplification_percent = 110;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
Random rnd(301);
int num_keys = 500000;
for (int i = 0; i < num_keys; i++) {
ASSERT_OK(Put(1, Key(i), Key(i)));
}
std::vector<std::string> values;
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(trivial_move, 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
#endif
TEST_P(DBTestUniversalCompaction, UniversalCompactionFourPaths) {
Options options = CurrentOptions();
options.db_paths.emplace_back(dbname_, 300 * 1024);
options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024);
options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024);
options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024);
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.size_ratio = 5;
options.write_buffer_size = 111 << 10; options.arena_block_size = 4 << 10;
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 1;
std::vector<std::string> filenames;
if (env_->GetChildren(options.db_paths[1].path, &filenames).ok()) {
for (size_t i = 0; i < filenames.size(); ++i) {
ASSERT_OK(
env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]));
}
ASSERT_OK(env_->DeleteDir(options.db_paths[1].path));
}
Reopen(options);
Random rnd(301);
int key_idx = 0;
for (int num = 0; num < 3; num++) {
GenerateNewFile(&rnd, &key_idx);
}
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
Reopen(options);
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
Destroy(options);
}
TEST_P(DBTestUniversalCompaction, UniversalCompactionCFPathUse) {
Options options = CurrentOptions();
options.db_paths.emplace_back(dbname_, 300 * 1024);
options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024);
options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024);
options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024);
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.size_ratio = 10;
options.write_buffer_size = 111 << 10; options.arena_block_size = 4 << 10;
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 1;
std::vector<Options> option_vector;
option_vector.emplace_back(options);
ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 300 * 1024);
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 300 * 1024);
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 500 * 1024);
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_4", 1024 * 1024 * 1024);
option_vector.emplace_back(DBOptions(options), cf_opt1);
CreateColumnFamilies({"one"}, option_vector[1]);
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 300 * 1024);
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 300 * 1024);
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 500 * 1024);
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_4", 1024 * 1024 * 1024);
option_vector.emplace_back(DBOptions(options), cf_opt2);
CreateColumnFamilies({"two"}, option_vector[2]);
ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
Random rnd(301);
int key_idx = 0;
int key_idx1 = 0;
int key_idx2 = 0;
auto generate_file = [&]() {
GenerateNewFile(0, &rnd, &key_idx);
GenerateNewFile(1, &rnd, &key_idx1);
GenerateNewFile(2, &rnd, &key_idx2);
};
auto check_sstfilecount = [&](int path_id, int expected) {
ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
};
auto check_getvalues = [&]() {
for (int i = 0; i < key_idx; i++) {
auto v = Get(0, Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
for (int i = 0; i < key_idx1; i++) {
auto v = Get(1, Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
for (int i = 0; i < key_idx2; i++) {
auto v = Get(2, Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
};
for (int num = 0; num < 3; num++) {
generate_file();
}
generate_file();
check_sstfilecount(2, 1);
generate_file();
check_sstfilecount(2, 1);
check_sstfilecount(0, 1);
generate_file();
check_sstfilecount(2, 1);
check_sstfilecount(1, 1);
check_sstfilecount(0, 0);
generate_file();
check_sstfilecount(2, 1);
check_sstfilecount(1, 1);
check_sstfilecount(0, 0);
generate_file();
check_sstfilecount(3, 1);
generate_file();
check_sstfilecount(3, 1);
check_sstfilecount(0, 1);
generate_file();
check_sstfilecount(3, 1);
check_sstfilecount(1, 1);
generate_file();
check_sstfilecount(3, 1);
check_sstfilecount(1, 1);
check_sstfilecount(0, 0);
generate_file();
check_sstfilecount(2, 1);
check_sstfilecount(3, 1);
generate_file();
check_sstfilecount(3, 1);
check_sstfilecount(2, 1);
check_sstfilecount(0, 0);
check_getvalues();
ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
check_getvalues();
Destroy(options, true);
}
TEST_P(DBTestUniversalCompaction, IncreaseUniversalCompactionNumLevels) {
std::function<void(int)> verify_func = [&](int num_keys_in_db) {
std::string keys_in_db;
Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
keys_in_db.append(iter->key().ToString());
keys_in_db.push_back(',');
}
EXPECT_OK(iter->status());
delete iter;
std::string expected_keys;
for (int i = 0; i <= num_keys_in_db; i++) {
expected_keys.append(Key(i));
expected_keys.push_back(',');
}
ASSERT_EQ(keys_in_db, expected_keys);
};
Random rnd(301);
int max_key1 = 200;
int max_key2 = 600;
int max_key3 = 800;
const int KNumKeysPerFile = 10;
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
options.write_buffer_size = 200 << 10; options.level0_file_num_compaction_trigger = 3;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(KNumKeysPerFile));
options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options);
for (int i = 0; i <= max_key1; i++) {
ASSERT_OK(Put(1, Key(i), rnd.RandomString(10000)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 4;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
verify_func(max_key1);
for (int i = max_key1 + 1; i <= max_key2; i++) {
ASSERT_OK(Put(1, Key(i), rnd.RandomString(10000)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
verify_func(max_key2);
ASSERT_GT(NumTableFilesAtLevel(options.num_levels - 1, 1), 0);
options.num_levels = 4;
options.target_file_size_base = INT_MAX;
ReopenWithColumnFamilies({"default", "pikachu"}, options);
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 0;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
ASSERT_OK(
dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
for (int i = max_key2 + 1; i <= max_key3; i++) {
ASSERT_OK(Put(1, Key(i), rnd.RandomString(10000)));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
ASSERT_OK(Flush(1));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
verify_func(max_key3);
}
TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
if (!Snappy_Supported()) {
return;
}
Options options = CurrentOptions();
options.db_paths.emplace_back(dbname_, 500 * 1024);
options.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024);
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.size_ratio = 5;
options.write_buffer_size = 111 << 10; options.arena_block_size = 4 << 10;
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 1;
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
std::vector<std::string> filenames;
if (env_->GetChildren(options.db_paths[1].path, &filenames).ok()) {
for (size_t i = 0; i < filenames.size(); ++i) {
ASSERT_OK(
env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]));
}
ASSERT_OK(env_->DeleteDir(options.db_paths[1].path));
}
Reopen(options);
Random rnd(301);
int key_idx = 0;
for (int num = 0; num < 3; num++) {
GenerateNewFile(&rnd, &key_idx);
}
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
Reopen(options);
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
Destroy(options);
}
TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) {
if (num_levels_ == 1) {
return;
}
const int kNumFilesTrigger = 3;
Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
for (bool universal_reduce_file_locking : {true, false}) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.reduce_file_locking =
universal_reduce_file_locking;
options.max_background_compactions = 2;
options.num_levels = num_levels_;
options.write_buffer_size = 100 << 10; options.target_file_size_base = 32 << 10; options.level0_file_num_compaction_trigger = kNumFilesTrigger;
options.compaction_options_universal.max_size_amplification_percent = 110;
DestroyAndReopen(options);
auto pressure_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
if (universal_reduce_file_locking) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{ {"DBImpl::BackgroundCompaction():AfterPickCompactionBottomPri",
"DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0"},
{"DBImpl::BackgroundCompaction:NonTrivial:BeforeRun",
"DBImpl::BackgroundCompaction:NonTrivial:BeforeRunBottomPri"}});
} else {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{ {"DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
"DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0"},
{"DBImpl::BackgroundCompaction:NonTrivial:BeforeRun",
"DBImpl::BackgroundCompaction:NonTrivial:BeforeRunBottomPri"}});
}
SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
for (int i = 0; i < 2; ++i) {
for (int num = 0; num < kNumFilesTrigger; num++) {
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx, true );
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
if (i == 0) {
TEST_SYNC_POINT(
"DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0");
}
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumSortedRuns(), 2);
ASSERT_GT(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(num_levels_ - 1), 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
}
TEST_P(DBTestUniversalCompaction, RecalculateScoreAfterPicking) {
const int kNumFilesTrigger = 8;
Options options = CurrentOptions();
options.memtable_factory.reset(
test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
options.compaction_options_universal.max_merge_width = kNumFilesTrigger / 2;
options.compaction_options_universal.max_size_amplification_percent =
static_cast<unsigned int>(-1);
options.compaction_style = kCompactionStyleUniversal;
options.level0_file_num_compaction_trigger = kNumFilesTrigger;
options.num_levels = num_levels_;
Reopen(options);
std::atomic<int> num_compactions_attempted(0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:Start",
[&](void* ) { ++num_compactions_attempted; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
for (int num = 0; num < kNumFilesTrigger; num++) {
ASSERT_EQ(NumSortedRuns(), num);
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(1, num_compactions_attempted);
ASSERT_EQ(NumSortedRuns(), 5);
}
TEST_P(DBTestUniversalCompaction, FinalSortedRunCompactFilesConflict) {
if (exclusive_manual_compaction_) {
return;
}
Options opts = CurrentOptions();
opts.compaction_style = kCompactionStyleUniversal;
opts.compaction_options_universal.max_size_amplification_percent = 50;
opts.compaction_options_universal.min_merge_width = 2;
opts.compression = kNoCompression;
opts.level0_file_num_compaction_trigger = 2;
opts.max_background_compactions = 2;
opts.num_levels = num_levels_;
Reopen(opts);
auto stop_token =
dbfull()->TEST_write_controler().GetCompactionPressureToken();
ASSERT_OK(Put("key", "val"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(NumTableFilesAtLevel(num_levels_ - 1), 1);
ColumnFamilyMetaData cf_meta;
ColumnFamilyHandle* default_cfh = db_->DefaultColumnFamily();
dbfull()->GetColumnFamilyMetaData(default_cfh, &cf_meta);
ASSERT_EQ(1, cf_meta.levels[num_levels_ - 1].files.size());
std::string first_sst_filename =
cf_meta.levels[num_levels_ - 1].files[0].name;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"CompactFilesImpl:0",
"DBTestUniversalCompaction:FinalSortedRunCompactFilesConflict:0"},
{"DBImpl::BackgroundCompaction():AfterPickCompaction",
"CompactFilesImpl:1"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
port::Thread compact_files_thread([&]() {
ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), default_cfh,
{first_sst_filename}, num_levels_ - 1));
});
TEST_SYNC_POINT(
"DBTestUniversalCompaction:FinalSortedRunCompactFilesConflict:0");
for (int i = 0; i < 2; ++i) {
ASSERT_OK(Put("key", "val"));
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
compact_files_thread.join();
}
INSTANTIATE_TEST_CASE_P(NumLevels, DBTestUniversalCompaction,
::testing::Combine(::testing::Values(1, 3, 5),
::testing::Bool()));
class DBTestUniversalManualCompactionOutputPathId
: public DBTestUniversalCompactionBase {
public:
DBTestUniversalManualCompactionOutputPathId()
: DBTestUniversalCompactionBase(
"/db_universal_compaction_manual_pid_test") {}
};
TEST_P(DBTestUniversalManualCompactionOutputPathId,
ManualCompactionOutputPathId) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.db_paths.emplace_back(dbname_, 1000000000);
options.db_paths.emplace_back(dbname_ + "_2", 1000000000);
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.target_file_size_base = 1 << 30; options.level0_file_num_compaction_trigger = 10;
Destroy(options);
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
MakeTables(3, "p", "q", 1);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(2, TotalLiveFiles(1));
ASSERT_EQ(2, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
CompactRangeOptions compact_options;
compact_options.target_path_id = 1;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
ASSERT_EQ(1, TotalLiveFiles(1));
ASSERT_EQ(0, TotalLiveFilesAtPath(1, options.db_paths[0].path));
ASSERT_EQ(1, TotalLiveFilesAtPath(1, options.db_paths[1].path));
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(1, TotalLiveFiles(1));
ASSERT_EQ(0, TotalLiveFilesAtPath(1, options.db_paths[0].path));
ASSERT_EQ(1, TotalLiveFilesAtPath(1, options.db_paths[1].path));
MakeTables(1, "p", "q", 1);
ASSERT_EQ(2, TotalLiveFiles(1));
ASSERT_EQ(1, TotalLiveFilesAtPath(1, options.db_paths[0].path));
ASSERT_EQ(1, TotalLiveFilesAtPath(1, options.db_paths[1].path));
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, TotalLiveFiles(1));
ASSERT_EQ(1, TotalLiveFilesAtPath(1, options.db_paths[0].path));
ASSERT_EQ(1, TotalLiveFilesAtPath(1, options.db_paths[1].path));
compact_options.target_path_id = 0;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
ASSERT_EQ(1, TotalLiveFiles(1));
ASSERT_EQ(1, TotalLiveFilesAtPath(1, options.db_paths[0].path));
ASSERT_EQ(0, TotalLiveFilesAtPath(1, options.db_paths[1].path));
compact_options.target_path_id = 2;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
.IsInvalidArgument());
}
INSTANTIATE_TEST_CASE_P(OutputPathId,
DBTestUniversalManualCompactionOutputPathId,
::testing::Combine(::testing::Values(1, 8),
::testing::Bool()));
TEST_F(DBTestUniversalCompaction2, BasicL0toL1) {
const int kNumKeys = 3000;
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
Options opts = CurrentOptions();
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 2;
opts.compression = kNoCompression;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
Reopen(opts);
int i;
for (i = 0; i < 2000; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (i = 1999; i < kNumKeys; ++i) {
if (i >= kNumKeys - kWindowSize &&
i < kNumKeys - kWindowSize + kNumDelsTrigger) {
ASSERT_OK(Delete(Key(i)));
} else {
ASSERT_OK(Put(Key(i), "val"));
}
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
#if defined(ENABLE_SINGLE_LEVEL_DTC)
TEST_F(DBTestUniversalCompaction2, SingleLevel) {
const int kNumKeys = 3000;
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
Options opts = CurrentOptions();
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 2;
opts.compression = kNoCompression;
opts.num_levels = 1;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
Reopen(opts);
int i;
for (i = 0; i < 2000; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
for (i = 1999; i < kNumKeys; ++i) {
if (i >= kNumKeys - kWindowSize &&
i < kNumKeys - kWindowSize + kNumDelsTrigger) {
ASSERT_OK(Delete(Key(i)));
} else {
ASSERT_OK(Put(Key(i), "val"));
}
}
ASSERT_OK(Flush()(;
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
#endif
TEST_F(DBTestUniversalCompaction2, MultipleLevels) {
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
Options opts = CurrentOptions();
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 4;
opts.compression = kNoCompression;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
Reopen(opts);
int i;
for (i = 0; i < 500; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
for (i = 500; i < 1000; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
for (i = 1000; i < 1500; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
for (i = 1500; i < 2000; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(6), 0);
for (i = 1999; i < 2333; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
for (i = 2333; i < 2666; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
for (i = 2666; i < 2999; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(6), 0);
ASSERT_GT(NumTableFilesAtLevel(5), 0);
for (i = 1900; i < 2100; ++i) {
ASSERT_OK(Delete(Key(i)));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(1));
ASSERT_EQ(0, NumTableFilesAtLevel(2));
ASSERT_EQ(0, NumTableFilesAtLevel(3));
ASSERT_EQ(0, NumTableFilesAtLevel(4));
ASSERT_EQ(0, NumTableFilesAtLevel(5));
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
TEST_F(DBTestUniversalCompaction2, OverlappingL0) {
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
Options opts = CurrentOptions();
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 5;
opts.compression = kNoCompression;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
Reopen(opts);
int i;
for (i = 0; i < 2000; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
for (i = 2000; i < 3000; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
for (i = 3500; i < 4000; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
for (i = 2900; i < 3100; ++i) {
ASSERT_OK(Delete(Key(i)));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(2, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
TEST_F(DBTestUniversalCompaction2, IngestBehind) {
for (bool cf_option : {false, true}) {
SCOPED_TRACE("cf_option = " + std::to_string(cf_option));
const int kNumKeys = 3000;
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
Options opts = CurrentOptions();
opts.table_properties_collector_factories.emplace_back(
NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 2;
opts.compression = kNoCompression;
if (cf_option) {
opts.cf_allow_ingest_behind = true;
} else {
opts.allow_ingest_behind = true;
}
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
Reopen(opts);
int i;
for (i = 0; i < 2000; ++i) {
ASSERT_OK(Put(Key(i), "val"));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (i = 1999; i < kNumKeys; ++i) {
if (i >= kNumKeys - kWindowSize &&
i < kNumKeys - kWindowSize + kNumDelsTrigger) {
ASSERT_OK(Delete(Key(i)));
} else {
ASSERT_OK(Put(Key(i), "val"));
}
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(0, NumTableFilesAtLevel(6));
ASSERT_GT(NumTableFilesAtLevel(5), 0);
if (cf_option) {
ColumnFamilyHandle* new_cfh;
Options new_cf_option;
new_cf_option.compaction_style = kCompactionStyleUniversal;
new_cf_option.num_levels = 7;
ASSERT_OK(db_->CreateColumnFamily(new_cf_option, "new_cf", &new_cfh));
for (i = 0; i < 10; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), new_cfh, Key(i), "val"));
}
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), new_cfh, nullptr, nullptr));
std::string property;
EXPECT_TRUE(db_->GetProperty(
new_cfh, "rocksdb.num-files-at-level" + std::to_string(6),
&property));
ASSERT_EQ(1, atoi(property.c_str()));
ASSERT_OK(db_->DropColumnFamily(new_cfh));
ASSERT_OK(db_->DestroyColumnFamilyHandle(new_cfh));
}
}
}
TEST_F(DBTestUniversalCompaction2, PeriodicCompactionDefault) {
Options options;
options.compaction_style = kCompactionStyleUniversal;
options.env = env_;
KeepFilterFactory* filter = new KeepFilterFactory(true);
options.compaction_filter_factory.reset(filter);
Reopen(options);
ASSERT_EQ(30 * 24 * 60 * 60,
dbfull()->GetOptions().periodic_compaction_seconds);
KeepFilter df;
options.compaction_filter_factory.reset();
options.compaction_filter = &df;
Reopen(options);
ASSERT_EQ(30 * 24 * 60 * 60,
dbfull()->GetOptions().periodic_compaction_seconds);
options.ttl = 60 * 24 * 60 * 60;
options.compaction_filter = nullptr;
Reopen(options);
ASSERT_EQ(30 * 24 * 60 * 60,
dbfull()->GetOptions().periodic_compaction_seconds);
options.periodic_compaction_seconds = 45 * 24 * 60 * 60;
options.ttl = 50 * 24 * 60 * 60;
Reopen(options);
ASSERT_EQ(45 * 24 * 60 * 60,
dbfull()->GetOptions().periodic_compaction_seconds);
options.periodic_compaction_seconds = 0;
options.ttl = 50 * 24 * 60 * 60;
Reopen(options);
ASSERT_EQ(50 * 24 * 60 * 60,
dbfull()->GetOptions().periodic_compaction_seconds);
}
TEST_F(DBTestUniversalCompaction2, PeriodicCompaction) {
Options opts = CurrentOptions();
opts.env = env_;
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 10;
opts.max_open_files = -1;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
opts.periodic_compaction_seconds = 48 * 60 * 60; opts.num_levels = 5;
env_->SetMockSleep();
Reopen(opts);
int periodic_compactions = 0;
int start_level = -1;
int output_level = -1;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UniversalCompactionPicker::PickPeriodicCompaction:Return",
[&](void* arg) {
Compaction* compaction = static_cast<Compaction*>(arg);
ASSERT_TRUE(arg != nullptr);
ASSERT_TRUE(compaction->compaction_reason() ==
CompactionReason::kPeriodicCompaction);
start_level = compaction->start_level();
output_level = compaction->output_level();
periodic_compactions++;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
ASSERT_EQ(0, periodic_compactions);
env_->MockSleepForSeconds(48 * 60 * 60 + 100);
ASSERT_OK(Put("foo", "bar2"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(1, periodic_compactions);
ASSERT_EQ(0, start_level);
ASSERT_EQ(4, output_level);
periodic_compactions = 0;
ASSERT_OK(Put("foo", "bar2"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, periodic_compactions);
ASSERT_OK(Put("foo", "bar2"));
env_->MockSleepForSeconds(48 * 60 * 60 + 100);
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(1, periodic_compactions);
ASSERT_EQ(0, start_level);
ASSERT_EQ(4, output_level);
}
TEST_F(DBTestUniversalCompaction2, PeriodicCompactionOffpeak) {
constexpr int kSecondsPerDay = 86400;
constexpr int kSecondsPerHour = 3600;
constexpr int kSecondsPerMinute = 60;
Options opts = CurrentOptions();
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 10;
opts.max_open_files = -1;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
opts.periodic_compaction_seconds = 5 * kSecondsPerDay; opts.num_levels = 5;
Random rnd(test::RandomSeed());
int days = rnd.Uniform(100);
int periodic_compactions = 0;
int start_level = -1;
int output_level = -1;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UniversalCompactionPicker::PickPeriodicCompaction:Return",
[&](void* arg) {
Compaction* compaction = static_cast<Compaction*>(arg);
ASSERT_TRUE(arg != nullptr);
ASSERT_TRUE(compaction->compaction_reason() ==
CompactionReason::kPeriodicCompaction);
start_level = compaction->start_level();
output_level = compaction->output_level();
periodic_compactions++;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
for (std::string preset_offpeak_time : {"", "00:30-04:30", "10:30-02:30"}) {
SCOPED_TRACE("preset_offpeak_time=" + preset_offpeak_time);
for (std::string new_offpeak_time : {"", "23:30-02:30"}) {
SCOPED_TRACE("new_offpeak_time=" + new_offpeak_time);
std::vector<std::pair<int, int>> times_to_test = {
{0, 0}, {2, 30}, {3, 15}, {5, 10}, {13, 30}, {23, 30}};
for (std::pair<int, int> now : times_to_test) {
int now_hour = now.first;
int now_minute = now.second;
SCOPED_TRACE("now=" + std::to_string(now_hour) + ":" +
std::to_string(now_minute));
auto mock_clock =
std::make_shared<MockSystemClock>(env_->GetSystemClock());
auto mock_env = std::make_unique<CompositeEnvWrapper>(env_, mock_clock);
opts.env = mock_env.get();
mock_clock->SetCurrentTime(days * kSecondsPerDay +
now_hour * kSecondsPerHour +
now_minute * kSecondsPerMinute);
opts.daily_offpeak_time_utc = preset_offpeak_time;
Reopen(opts);
ASSERT_OK(Put("foo", "bar1"));
ASSERT_OK(Flush());
ASSERT_EQ(0, periodic_compactions);
mock_clock->MockSleepForSeconds(8 * kSecondsPerHour);
ASSERT_OK(Put("foo", "bar2"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, periodic_compactions);
mock_clock->MockSleepForSeconds(4 * kSecondsPerDay);
ASSERT_OK(Put("foo", "bar3"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
int64_t mock_now;
ASSERT_OK(mock_clock->GetCurrentTime(&mock_now));
auto offpeak_time_info =
dbfull()->GetVersionSet()->offpeak_time_option().GetOffpeakTimeInfo(
mock_now);
if (offpeak_time_info.is_now_offpeak &&
offpeak_time_info.seconds_till_next_offpeak_start /
kSecondsPerHour >
16) {
ASSERT_EQ(1, periodic_compactions);
} else {
ASSERT_EQ(0, periodic_compactions);
if (preset_offpeak_time != new_offpeak_time) {
ASSERT_OK(dbfull()->SetDBOptions(
{{"daily_offpeak_time_utc", new_offpeak_time}}));
ASSERT_OK(Put("foo", "bar4"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
offpeak_time_info = dbfull()
->GetVersionSet()
->offpeak_time_option()
.GetOffpeakTimeInfo(mock_now);
if (offpeak_time_info.is_now_offpeak &&
offpeak_time_info.seconds_till_next_offpeak_start /
kSecondsPerHour >
16) {
ASSERT_OK(Put("foo", "bar5"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(1, periodic_compactions);
}
}
if (periodic_compactions == 0) {
mock_clock->MockSleepForSeconds(1 * kSecondsPerDay);
ASSERT_OK(Put("foo", "bar6"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
}
ASSERT_EQ(1, periodic_compactions);
ASSERT_EQ(0, start_level);
ASSERT_EQ(4, output_level);
Destroy(opts);
periodic_compactions = 0;
}
}
}
}
}
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}